RabbitMQ 教程

Headers Exchange 示例

标头交换(Headers Exchange)设计用于在多个属性上进行路由,这些属性比路由键更容易表示为消息标头。标头交换忽略路由键属性。相反,用于路由的属性取自 headers 属性。如果标头的值等于绑定时指定的值,则认为消息匹配。

可以使用多个用于匹配的标头将队列绑定到标头交换。在这种情况下,代理需要应用程序开发人员提供的更多信息,即,它应该考虑与任何标头匹配的消息,还是所有这些消息?这就是 “x-match” 绑定参数的用途。当 “x-match” 参数设置为 “any” 时,只需一个匹配的标头值就足够了。或者,将 “x-match” 设置为 “all” 要求所有值必须匹配。

标头交换可以被视为 “类固醇的直接交换”。因为它们都基于标头值进行路由,所以它们可以用作路由键不必是字符串的直接交换;例如,它可以是整数或哈希(字典)。

请注意,以字符串 x- 开头的标头不会用于评估匹配项。

示例

该示例定义了一个生产者(P)和两个消费者(C1 或 C2)。其中,生产者将向 headers_test 交换器发送两个消息,每个消息都包含特定消息头。如下图:

消费者(Send.java)

该消费者将发送两条消息,这些消息均发布到 headers_test 交换器中,它们分别用于测试 x-match=any 和 x-match=all 匹配模式。代码如下:

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 发送消息
 * @author Administrator
 */
public class Send {
	// exchange名称
	private static final String EXCHANGE_NAME = "headers_test";
	// 主机名
	private static final String HOST_NAME = "localhost";

	public static void main(String[] args) {
		Connection connection = null;
		try {
			// 创建连接
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(HOST_NAME);
			connection = factory.newConnection();
			
			// 创建通道
			Channel channel = connection.createChannel();
			
			// 声明类型为headers的exchange
			channel.exchangeDeclare(EXCHANGE_NAME, "headers");
			
			// 绑定queue与exchange
			String queueName = channel.queueDeclare().getQueue();
			channel.queueBind(queueName, EXCHANGE_NAME, "", null);
			System.out.println("send message...");
			
			/**
			 * 发送消息,测试x-match=any(任意匹配一个或多个)
			 */
			// 消息正文
			String messageAny = "hi! The headers type of message(x-match=any).";
			// 设置消息头
			Map<String,Object> headersAny = new Hashtable<String, Object>();
			headersAny.put("level", "error");
			headersAny.put("package", "com.hxstrive");
			AMQP.BasicProperties.Builder propertiesAny = new AMQP.BasicProperties.Builder();
			propertiesAny.headers(headersAny);
			// 发送消息
			channel.basicPublish(EXCHANGE_NAME, "", propertiesAny.build(), messageAny.getBytes());
			System.out.println("Send Message(x-match=any) :: " + messageAny);
			
			/**
			 * 发送消息,测试x-match=all(全部匹配)
			 */
			// 消息正文
			String messageAll = "hi! The headers type of message(x-match=all).";
			// 设置消息头
			Map<String,Object> headersAll = new Hashtable<String, Object>();
			headersAll.put("level", "info");
			headersAll.put("package", "com.hxstrive");
			AMQP.BasicProperties.Builder propertiesAll = new AMQP.BasicProperties.Builder();
			propertiesAll.headers(headersAll);
			// 发送消息
			channel.basicPublish(EXCHANGE_NAME, "", propertiesAll.build(), messageAll.getBytes());
			System.out.println("Send Message(x-match=all) :: " + messageAll);
		} catch(Exception e) {
			e.printStackTrace();
		} finally {
			if ( connection != null ) {
				try {
					connection.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

消费者(Receive01.java)

该消费者订阅了 headers_test 交换器中的消息,消息匹配采用 x-match=any 模式。只要发送到 headers_test 交换器中的消息包含 level=error 或 package=com.hxstrive 任意一个标头,则消费该消息。代码如下:

import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 接收消息,要求任意匹配一个或多个即可(x-match=any)
 * @author Administrator
 */
public class Receive01 {
	// exchange名称
	private static final String EXCHANGE_NAME = "headers_test";
	// 主机名
	private static final String HOST_NAME = "localhost";
	
	public static void main(String[] args) {
		try {
			// 创建连接
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(HOST_NAME);
			Connection connection = factory.newConnection();
			
			// 创建通道
			Channel channel = connection.createChannel();
			
			// 声明headers类型的exchange
			channel.exchangeDeclare(EXCHANGE_NAME, "headers");
			
			/**
			 * 绑定exchange与queue
			 */
			String queueName = channel.queueDeclare().getQueue();
			// 绑定的header关键字
			Map<String,Object> headers = new HashMap<String,Object>();
			/*
			 * x-match是必须的,用来设置匹配方式;
			 * all - 完全成功匹配定义的所有key=value
			 * any - 只需成功匹配一个定义的key=value
			 */
			headers.put("x-match", "any");
			headers.put("level", "info");
			headers.put("package", "com.hxstrive");
			channel.queueBind(queueName, EXCHANGE_NAME, "", headers);
			System.out.println("[Receive01] Waiting Message(x-match=any)...");
			
			/**
			 * 消费消息
			 */
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(queueName, true, consumer);
			while(true) {
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String body = new String(delivery.getBody());
				System.out.println("[Receive01] Receive Message :: " + body);
			}
		} catch(Exception e) {
			e.printStackTrace();
		}
	}
}

消费者(Receive02.java)

该消费者订阅了 headers_test 交换器中的消息,消息匹配采用 x-match=any 模式。只要发送到 headers_test 交换器中的消息包含 level=info 和 package=com.hxstrive 标头,则消费该消息。代码如下:

import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 接收消息,要求匹配所有的key=value对(x-match=all)
 * @author Administrator
 */
public class Receive02 {
	// exchange名称
	private static final String EXCHANGE_NAME = "headers_test";
	// 主机名
	private static final String HOST_NAME = "localhost";
	
	public static void main(String[] args) {
		try {
			// 创建连接
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(HOST_NAME);
			Connection connection = factory.newConnection();
			
			// 创建通道
			Channel channel = connection.createChannel();
			
			// 声明headers类型的exchange
			channel.exchangeDeclare(EXCHANGE_NAME, "headers");
			
			/**
			 * 绑定exchange与queue
			 */
			String queueName = channel.queueDeclare().getQueue();
			// 绑定的header关键字
			Map<String,Object> headers = new HashMap<String,Object>();
			/*
			 * x-match是必须的,用来设置匹配方式;
			 * all - 完全成功匹配定义的所有key=value
			 * any - 只需成功匹配一个定义的key=value
			 */
			headers.put("x-match", "all");
			headers.put("level", "info");
			headers.put("package", "com.hxstrive");
			channel.queueBind(queueName, EXCHANGE_NAME, "", headers);
			System.out.println("[Receive02] Waiting Message(x-match=all)...");
			
			/**
			 * 消费消息
			 */
			QueueingConsumer consumer = new QueueingConsumer(channel);
			channel.basicConsume(queueName, true, consumer);
			while(true) {
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				String body = new String(delivery.getBody());
				System.out.println("[Receive02] Receive Message :: " + body);
			}
		} catch(Exception e) {
			e.printStackTrace();
		}
	}
}

运行上面代码,观察输出结果。


说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号