主题交换(Topic Exchange)基于消息路由键与用于将队列绑定到交换的模式之间的匹配将消息路由到一个或多个队列。主题交换类型通常用于实现各种发布/订阅模式变体。主题交换通常用于消息的多播路由。
主题交换有非常广泛的用例。每当一个问题涉及多个消费者/应用程序选择性地选择他们想要接收的消息类型时,就应该考虑使用主题交换。
示例用途:
分发与特定地理位置相关的数据,例如销售点
由多个工作人员完成的后台任务处理,每个工作人员都能够处理特定的任务集
股票价格更新(以及其他类型财务数据的更新)
涉及分类或标记的新闻更新(例如,仅针对特定运动或团队)
云中各种服务的编排
分布式架构/特定于操作系统的软件构建或打包,其中每个构建器只能处理一个架构或操作系统
该示例创建了一个生产者(P)和一个消费者(C),消费者使用 “*.orange.*” 路由键绑定到 topic_logs 交换器。如下图:
该生产者使用 “haha.orange.com” 路由键发送一条消息到 topic_logs 交换器。代码如下:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; /** * 发送消息给服务器,根据路由key发送消息 * @author Administrator */ public class Send { // 交换器名称 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { // 获取路由key String routingKey = "haha.orange.com"; // 获取待发送信息 String message = "hi! How are you."; Connection connection = null; Channel channel = null; try { // 获取连接对象工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); // 声明类型为topic的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 发送消息 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { ignore.printStackTrace(); } } } } }
该消费者使用 “*.orange.*” 绑定键绑定到 topic_logs 交换器,代码如下:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; /** * 按照给定的规则(绑定key)接收消息 * 注意: * (1)* 匹配一个单词 * (2)# 匹配0个或多个单词 * (3). 分割单词 * @author Administrator */ public class Receive { // 交换器名称 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { // 绑定Key String bindingKey = "*.orange.*"; Connection connection = null; Channel channel = null; try { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); // 声明一个exchange通道 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 将用户自定义的key绑定到exchange队列中 String queueName = channel.queueDeclare().getQueue(); System.out.println("Queue Name " + queueName); // 将Queue和Exchange使用bindingKey进行绑定 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); System.out.println(" [*]: Waiting Message. To exit press CTRL+C"); /** * 开始消费消息 */ QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { // 等待下一个消息,且将消息返回 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // 接收消息 String message = new String(delivery.getBody()); // 获取路由key String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x]: Received '" + routingKey + "':'" + message + "'"); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭连接,释放资源 if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }