扇出交换(Fanout Exchange)将消息路由到绑定到它的所有队列,并且忽略路由键。如果 N 个队列绑定到一个扇出交换器,则当一条新消息发布到该交换器时,该消息的副本将传递到所有 N 个队列。扇出交换是消息广播路由的理想选择。
因为扇出交换向绑定到它的每个队列传递消息的副本,所以它的用例非常相似:
大型多人在线 (MMO) 游戏可以将其用于排行榜更新或其他全球活动
体育新闻网站可以使用扇出交换近乎实时地向移动客户端分发分数更新
分布式系统可以广播各种状态和配置更新
群聊可以使用扇出交换在参与者之间分发消息(虽然 AMQP 没有内置的出席概念,所以 XMPP 可能是更好的选择)
跟多关于 AMQP 协议知识请阅读 “AMQP 0-9-1 模型解释”
该示例创建了一个生产者(P),生产者每秒向名为“fanout_test”的交换器发送消息。另外,我们还创建了两个消费者,分别为 C1 和 C2。其中,C1 使用绑定KEY“*.fanout1”与交换器进行绑定;C2使用绑定KEY“*.fanout2”与交换器进行绑定。如下图:

当我们采用 Fanout 类型的交换器时,所有发送到交换器的消息均转发一份副本到消费者 C1 和 C2,即使他们的路由KEY不一致(Fanout 交换器将忽略路由KEY)。
该生产者将每秒发送一个简单的文本消息到 fanout_test 交换器,并且采用 test.fanout1 路由KEY,虽然路由KEY无效。代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 发送简单消息到指定的Exchange
* @author Administrator
*/
public class Send {
// 定义exchange名称
private static final String EXCHANGE_NAME = "fanout_test";
// 主机地址
private static final String HOST_NAME = "localhost";
// 路由Key
private static final String ROUTING_KEY = "test.fanout1";
public static void main(String[] args) throws Exception {
Connection connection = null;
try {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
connection = factory.newConnection();
Channel channel = connection.createChannel();
// 发送消息
while(true) {
String message = "Happiness is not because have much but caring less.";
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println(" Send Message :: " + message);
// 暂停1秒钟
Thread.sleep(1000);
}
} finally {
// 关闭连接
if ( null != connection ) {
connection.close();
}
}
}
}该消费者使用 *.fanout1 绑定KEY将自己绑定到 fanout_test 交换机,代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 创建一个消费者
* @author Administrator
*/
public class Receive01 {
// 定义exchange名称
private static final String EXCHANGE_NAME = "fanout_test";
// 主机地址
private static final String HOST_NAME = "localhost";
// 模式
private static final String BIND_KEY = "*.fanout1";
public static void main(String[] args) throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
Connection connection = factory.newConnection();
// 获取一个通道
Channel channel = connection.createChannel();
// 声明一个类型为fanout的exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 绑定Queue和Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, BIND_KEY);
System.out.println("Waiting Message...");
// 消费消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("routingKey :: " + routingKey);
System.out.println(" body :: " + new String(delivery.getBody()) );
}
}
}该消费者使用 *.fanout2 绑定KEY将自己绑定到 fanout_test 交换机,代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 创建一个消费者
* @author Administrator
*/
public class Receive02 {
// 定义exchange名称
private static final String EXCHANGE_NAME = "fanout_test";
// 主机地址
private static final String HOST_NAME = "localhost";
// 模式
private static final String BIND_KEY = "*.fanout2";
public static void main(String[] args) throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
Connection connection = factory.newConnection();
// 获取一个通道
Channel channel = connection.createChannel();
// 声明一个类型为fanout的exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 绑定Queue和Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, BIND_KEY);
System.out.println("Waiting Message...");
// 消费消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("routingKey :: " + routingKey);
System.out.println(" body :: " + new String(delivery.getBody()) );
}
}
}运行上面代码你会发现生产者发送的消息,在消费者 C1 和 C2 中均可以进行消费。