import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* 使用死信交换器 + TTL消息模拟延迟队列
* @author hxstrive.com 2022/2/25
*/
public class ExchangeDlx2 {
/** 死信交换器名称 */
private final String exchangeDlxName = "exchange_" + getClass().getSimpleName() + "_DLX";
/** 普通交换器名称 */
private final String exchangeName = "exchange_" + getClass().getSimpleName();
/** 死信队列名称 */
private final String queueDlxName = "queue_" + getClass().getSimpleName() + "_DLX";
/** 队列名称 */
private final String queueName = "queue_" + getClass().getSimpleName();
public static void main(String[] args) throws Exception {
ExchangeDlx2 demo = new ExchangeDlx2();
demo.consumer();
demo.sender();
}
/**
* 生产者
* @throws Exception
*/
private void sender() throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明死信交换器和死信队列
channel.exchangeDeclare(exchangeDlxName, "fanout");
channel.queueDeclare(queueDlxName, true, false, true, null);
channel.queueBind(queueDlxName, exchangeDlxName, "");
// 声明普通交换器和队列
channel.exchangeDeclare(exchangeName, "topic");
Map<String,Object> queueArgs = new HashMap<String, Object>();
// 为队列设置死信交换器
queueArgs.put("x-dead-letter-exchange", exchangeDlxName);
channel.queueDeclare(queueName, true, false, false, queueArgs);
channel.queueBind(queueName, exchangeName, "*.hxstrive.com");
// 发送带有TTL过期时间的消息
System.out.println("[Sender] Send Message...");
String message = "exchange DLX message";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化消息
.expiration("10000") // 设置 TTL=10秒
.build();
channel.basicPublish(exchangeName, "www.hxstrive.com", properties, message.getBytes());
System.out.println("[Sender] message = '" + message + "'");
// 关闭连接
channel.close();
connection.close();
}
/**
* 消费消息,从死信队列消费消息
* @throws Exception
*/
private void consumer() throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建信道
final Channel channel = connection.createChannel();
// 声明队列
// 声明死信交换器和死信队列
channel.exchangeDeclare(exchangeDlxName, "fanout");
channel.queueDeclare(queueDlxName, true, false, true, null);
channel.queueBind(queueDlxName, exchangeDlxName, "");
// 消费消息
System.out.println("[Consumer] Waiting Message...");
channel.basicConsume(queueDlxName, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("[Consumer] body = " + new String(body));
}
});
}
}