import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * 使用死信交换器 + TTL消息模拟延迟队列 * @author hxstrive.com 2022/2/25 */ public class ExchangeDlx2 { /** 死信交换器名称 */ private final String exchangeDlxName = "exchange_" + getClass().getSimpleName() + "_DLX"; /** 普通交换器名称 */ private final String exchangeName = "exchange_" + getClass().getSimpleName(); /** 死信队列名称 */ private final String queueDlxName = "queue_" + getClass().getSimpleName() + "_DLX"; /** 队列名称 */ private final String queueName = "queue_" + getClass().getSimpleName(); public static void main(String[] args) throws Exception { ExchangeDlx2 demo = new ExchangeDlx2(); demo.consumer(); demo.sender(); } /** * 生产者 * @throws Exception */ private void sender() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明死信交换器和死信队列 channel.exchangeDeclare(exchangeDlxName, "fanout"); channel.queueDeclare(queueDlxName, true, false, true, null); channel.queueBind(queueDlxName, exchangeDlxName, ""); // 声明普通交换器和队列 channel.exchangeDeclare(exchangeName, "topic"); Map<String,Object> queueArgs = new HashMap<String, Object>(); // 为队列设置死信交换器 queueArgs.put("x-dead-letter-exchange", exchangeDlxName); channel.queueDeclare(queueName, true, false, false, queueArgs); channel.queueBind(queueName, exchangeName, "*.hxstrive.com"); // 发送带有TTL过期时间的消息 System.out.println("[Sender] Send Message..."); String message = "exchange DLX message"; AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 持久化消息 .expiration("10000") // 设置 TTL=10秒 .build(); channel.basicPublish(exchangeName, "www.hxstrive.com", properties, message.getBytes()); System.out.println("[Sender] message = '" + message + "'"); // 关闭连接 channel.close(); connection.close(); } /** * 消费消息,从死信队列消费消息 * @throws Exception */ private void consumer() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection(); // 创建信道 final Channel channel = connection.createChannel(); // 声明队列 // 声明死信交换器和死信队列 channel.exchangeDeclare(exchangeDlxName, "fanout"); channel.queueDeclare(queueDlxName, true, false, true, null); channel.queueBind(queueDlxName, exchangeDlxName, ""); // 消费消息 System.out.println("[Consumer] Waiting Message..."); channel.basicConsume(queueDlxName, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("[Consumer] body = " + new String(body)); } }); } }