RabbitMQ 教程

Fanout Exchange 示例

🎉摘要:RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

扇出交换(Fanout Exchange)将消息路由到绑定到它的所有队列,并且忽略路由键。如果 N 个队列绑定到一个扇出交换器,则当一条新消息发布到该交换器时,该消息的副本将传递到所有 N 个队列。扇出交换是消息广播路由的理想选择。

因为扇出交换向绑定到它的每个队列传递消息的副本,所以它的用例非常相似:

  • 大型多人在线 (MMO) 游戏可以将其用于排行榜更新或其他全球活动

  • 体育新闻网站可以使用扇出交换近乎实时地向移动客户端分发分数更新

  • 分布式系统可以广播各种状态和配置更新

  • 群聊可以使用扇出交换在参与者之间分发消息(虽然 AMQP 没有内置的出席概念,所以 XMPP 可能是更好的选择)

跟多关于 AMQP 协议知识请阅读 “AMQP 0-9-1 模型解释

示例

该示例创建了一个生产者(P),生产者每秒向名为“fanout_test”的交换器发送消息。另外,我们还创建了两个消费者,分别为 C1 和 C2。其中,C1 使用绑定KEY“*.fanout1”与交换器进行绑定;C2使用绑定KEY“*.fanout2”与交换器进行绑定。如下图:

当我们采用 Fanout 类型的交换器时,所有发送到交换器的消息均转发一份副本到消费者 C1 和 C2,即使他们的路由KEY不一致(Fanout 交换器将忽略路由KEY)。

生产者(Send.java)

该生产者将每秒发送一个简单的文本消息到 fanout_test 交换器,并且采用 test.fanout1 路由KEY,虽然路由KEY无效。代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 发送简单消息到指定的Exchange
 * @author Administrator
 */
public class Send {
	// 定义exchange名称
	private static final String EXCHANGE_NAME = "fanout_test";
	// 主机地址
	private static final String HOST_NAME = "localhost";
	// 路由Key
	private static final String ROUTING_KEY = "test.fanout1";
	
	public static void main(String[] args) throws Exception {
		Connection connection = null;
		
		try {
			// 创建连接
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(HOST_NAME);
			connection = factory.newConnection();
			Channel channel = connection.createChannel();
			
			// 发送消息
			while(true) {
				String message = "Happiness is not because have much but caring less.";
				channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
				channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
				System.out.println(" Send Message :: " + message);
				
				// 暂停1秒钟
				Thread.sleep(1000);
			}
		} finally {
			// 关闭连接
			if ( null != connection ) {
				connection.close();
			}
		}
	}
	
}

消费者(Receive01.java

该消费者使用 *.fanout1 绑定KEY将自己绑定到 fanout_test 交换机,代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 创建一个消费者
 * @author Administrator
 */
public class Receive01 {
	// 定义exchange名称
	private static final String EXCHANGE_NAME = "fanout_test";
	// 主机地址
	private static final String HOST_NAME = "localhost";
	// 模式
	private static final String BIND_KEY = "*.fanout1";
	
	public static void main(String[] args) throws Exception {
		// 创建连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(HOST_NAME);
		Connection connection = factory.newConnection();
		
		// 获取一个通道
		Channel channel = connection.createChannel();
		
		// 声明一个类型为fanout的exchange
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		
		// 绑定Queue和Exchange
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, EXCHANGE_NAME, BIND_KEY);
		
		System.out.println("Waiting Message...");
		
		// 消费消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String routingKey = delivery.getEnvelope().getRoutingKey();
			System.out.println("routingKey :: " + routingKey);
			System.out.println("      body :: " + new String(delivery.getBody()) );
		}
	}
	
}

消费者(Receive02.java)

该消费者使用 *.fanout2 绑定KEY将自己绑定到 fanout_test 交换机,代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 创建一个消费者
 * @author Administrator
 */
public class Receive02 {
	// 定义exchange名称
	private static final String EXCHANGE_NAME = "fanout_test";
	// 主机地址
	private static final String HOST_NAME = "localhost";
	// 模式
	private static final String BIND_KEY = "*.fanout2";
	
	public static void main(String[] args) throws Exception {
		// 创建连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(HOST_NAME);
		Connection connection = factory.newConnection();
		
		// 获取一个通道
		Channel channel = connection.createChannel();
		
		// 声明一个类型为fanout的exchange
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		
		// 绑定Queue和Exchange
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, EXCHANGE_NAME, BIND_KEY);
		
		System.out.println("Waiting Message...");
		
		// 消费消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String routingKey = delivery.getEnvelope().getRoutingKey();
			System.out.println("routingKey :: " + routingKey);
			System.out.println("      body :: " + new String(delivery.getBody()) );
		}
	}
	
}

运行上面代码你会发现生产者发送的消息,在消费者 C1 和 C2 中均可以进行消费。

说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号