下面直接上代码,需要注意的地方将在注释中给出说明。
配置类配置了 RabbitMQ 的连接工厂(ConnectionFactory),根据配置动态声明队列、交换器和绑定队列和交换器。最后,还配置了一个 SimpleMessageListenerContainer 消息监听器容器。代码如下:
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; /** * RabbitMQ 配置类 * @author hxstrive.com 2022/1/11 */ @Configuration public class RabbitMqConfig { /** 交换器名称 */ public final static String EXCHANGE_NAME ="exchange-demo"; /** 队列名称 */ public final static String QUEUE_NAME = "queue-demo"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 虚拟主机 connectionFactory.setVirtualHost("/"); // RabbitMQ连接地址 connectionFactory.setAddresses("127.0.0.1:5672"); return connectionFactory; } /** * 动态创建队列 * @param connectionFactory * @return * @throws IOException */ @Bean public String queue(ConnectionFactory connectionFactory) throws IOException { // 声明交换器 connectionFactory.createConnection() // 通道不开启事务 .createChannel(false) // 交换器类型为 topic,开启持久化功能 .exchangeDeclare(EXCHANGE_NAME, "topic", true); // 声明队列 connectionFactory.createConnection() .createChannel(false) // durable 队列开启持久化功能 // exclusive 不声明为独占队列 // autoDelete 允许触发自动删除 .queueDeclare(QUEUE_NAME, true, false,false, null); // 队列绑定交换器 connectionFactory.createConnection() .createChannel(false) // 将队列和交换器使用绑定键进行绑定 .queueBind(QUEUE_NAME, EXCHANGE_NAME, "#.hxstrive.com"); return QUEUE_NAME; } @Bean public Consumer consumer() { return new Consumer(); } @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer( ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 设置要从中接收消息的队列的名称 container.setQueueNames(QUEUE_NAME); // 设置是否将侦听器 Rabbit Channel 公开给注册的 ChannelAwareMessageListener 以及 // org.springframework.amqp.rabbit.core.RabbitTemplate 调用。 // 默认为 "true",重用监听器的Channel。关闭此选项可公开从相同的底层 Rabbit Connection 获取的新的 Rabbit Channel。 // 注意:由外部事务管理器管理的通道总是会暴露给 org.springframework.amqp.rabbit.core.RabbitTemplate 调用。 // 因此,就 RabbitTemplate 公开而言,这个设置只影响本地事务的通道。 container.setExposeListenerChannel(true); // 设置每个消费者获取的最大消息个数,值设置大可提高吞吐量 container.setPrefetchCount(100); // 指定要创建的并发使用者的数量。默认值为1。 container.setConcurrentConsumers(1); // 设置确认模式为手工模式 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置监听器处理类 container.setMessageListener(consumer()); return container; } }
消费者将消费所有从 RabbitMQ 服务器发送的所有消息,该消费者实现了 ChannelAwareMessageListener 接口,实现了 onMessage() 方法。代码如下:
import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; /** * 消息消费者 * @author hxstrive.com 2022/11/16 */ public class Consumer implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println(JSONObject.toJSONString(message, true)); System.out.println(JSONObject.toJSONString(channel, true)); // 手动确认 MessageProperties messageProperties = message.getMessageProperties(); channel.basicAck(messageProperties.getDeliveryTag(), false); } }
通过 @SpringBootTest 注解创建一个简单的客户端代码,使用 RabbitTemplate 向 RabbitMQ 服务器发送 10 个消息。代码如下:
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /** * RabbitMQ 客户端 * @author hxstrive.com 2022/11/11 */ @SpringBootTest class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void contextLoads() { for(int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, "demo.hxstrive.com", "Hello RabbitMQ......." + i); } } }