RabbitMQ 教程

延迟队列

延迟队列顾名思义就是存放延迟消息的队列,所谓“延迟消息”是指消息被发送后,并不想让消费者立刻进行消费,而是需要等待指定的时间后,消费者才能够进行消费。

延迟队列的使用场景有很多,例如:

  • 订单支付 —— 相信读者有过网上购物的习惯,当我们下了订单后,需要支付。而支付一般需要在30分钟(也可能是20分钟等)内完成支付。如果你没有在30分钟内完成支付,那么订单将变成无效的订单。此时,这个订单支付的时效功能就可以采用延迟队列来处理。

  • 定时控制 —— 不知道读者是否玩过类似远程控制的设备,我们只需要在设备对应的APP/WEB应用中定义一个定时控制的指令,指定该指令在多少分钟后进行远程操作。例如:远程定时控制扫地机器人进行扫地、远程定时控制空调等等。这些远程控制的功能,也可以使用延迟队列来实现。

注意:在 AMQP 协议或者 RabbitMQ 中,并没有提供延迟队列的直接支持。但是,我们可以结合前面章节介绍的死信交换器(DLX)和过期时间(TTL)来模拟延迟队列的功能。

模拟延迟队列的思路

消息生产者发送一个带有 TTL(延迟时间)的消息到正常的交换器,且正常交换器绑定的队列没有被任何消费者订阅(也就不会被消费)。当时间慢慢过去,一旦时间超过我们为消息设置的 TTL 时间后,RabbitMQ 将会自动将过期的消息发送到队列指定的死信交换器,死信交换器将消息路由到死信队列。此时,如果有消费者订阅了死信队列,则这些消息将被消费者消费,就这样成功模拟了一个延迟队列。消息流转过程如下图:

实现延迟队列

示例的关键代码如下:

(1)消费者代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建信道
Channel channel = connection.createChannel();

// 声明死信交换器和死信队列,且将两者绑定到一起
channel.exchangeDeclare(exchangeDlxName, "fanout");
channel.queueDeclare(queueDlxName, true, false, true, null);
channel.queueBind(queueDlxName, exchangeDlxName, "");

// 声明普通交换器和队列,且将两者绑定到一起
channel.exchangeDeclare(exchangeName, "topic");
Map<String,Object> queueArgs = new HashMap<String, Object>();
// 为队列设置死信交换器
queueArgs.put("x-dead-letter-exchange", exchangeDlxName);
channel.queueDeclare(queueName, true, false, false, queueArgs);
channel.queueBind(queueName, exchangeName, "*.hxstrive.com");

// 发送带有TTL过期时间的消息(过期消息设置为10秒,即消息将延迟10秒后被执行)
System.out.println("[Sender] Send Message...");
String message = "exchange DLX message";
// 通过消息属性设置设置消息TTL时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2) // 持久化消息
        .expiration("10000") // 设置 TTL=10秒
        .build();
channel.basicPublish(exchangeName, "www.hxstrive.com", properties, message.getBytes());
System.out.println("[Sender] message = '" + message + "'");

(2)消费者代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();

// 创建信道
final Channel channel = connection.createChannel();

// 声明队列
// 声明死信交换器和死信队列,且将两者绑定
channel.exchangeDeclare(exchangeDlxName, "fanout");
channel.queueDeclare(queueDlxName, true, false, true, null);
channel.queueBind(queueDlxName, exchangeDlxName, "");

// 消费消息
System.out.println("[Consumer] Waiting Message...");
channel.basicConsume(queueDlxName, false, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
            AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("[Consumer] body = " + new String(body));
    }
});

点击查看完整代码(ExchangeDlx2.java)。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号