下面直接上代码,需要注意的地方将在注释中给出说明。
配置类配置了 RabbitMQ 的连接工厂(ConnectionFactory),根据配置动态声明队列、交换器和绑定队列和交换器。最后,还配置了一个 SimpleMessageListenerContainer 消息监听器容器。代码如下:
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* RabbitMQ 配置类
* @author hxstrive.com 2022/1/11
*/
@Configuration
public class RabbitMqConfig {
/** 交换器名称 */
public final static String EXCHANGE_NAME ="exchange-demo";
/** 队列名称 */
public final static String QUEUE_NAME = "queue-demo";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 虚拟主机
connectionFactory.setVirtualHost("/");
// RabbitMQ连接地址
connectionFactory.setAddresses("127.0.0.1:5672");
return connectionFactory;
}
/**
* 动态创建队列
* @param connectionFactory
* @return
* @throws IOException
*/
@Bean
public String queue(ConnectionFactory connectionFactory) throws IOException {
// 声明交换器
connectionFactory.createConnection()
// 通道不开启事务
.createChannel(false)
// 交换器类型为 topic,开启持久化功能
.exchangeDeclare(EXCHANGE_NAME, "topic", true);
// 声明队列
connectionFactory.createConnection()
.createChannel(false)
// durable 队列开启持久化功能
// exclusive 不声明为独占队列
// autoDelete 允许触发自动删除
.queueDeclare(QUEUE_NAME, true, false,false, null);
// 队列绑定交换器
connectionFactory.createConnection()
.createChannel(false)
// 将队列和交换器使用绑定键进行绑定
.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#.hxstrive.com");
return QUEUE_NAME;
}
@Bean
public Consumer consumer() {
return new Consumer();
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 设置要从中接收消息的队列的名称
container.setQueueNames(QUEUE_NAME);
// 设置是否将侦听器 Rabbit Channel 公开给注册的 ChannelAwareMessageListener 以及
// org.springframework.amqp.rabbit.core.RabbitTemplate 调用。
// 默认为 "true",重用监听器的Channel。关闭此选项可公开从相同的底层 Rabbit Connection 获取的新的 Rabbit Channel。
// 注意:由外部事务管理器管理的通道总是会暴露给 org.springframework.amqp.rabbit.core.RabbitTemplate 调用。
// 因此,就 RabbitTemplate 公开而言,这个设置只影响本地事务的通道。
container.setExposeListenerChannel(true);
// 设置每个消费者获取的最大消息个数,值设置大可提高吞吐量
container.setPrefetchCount(100);
// 指定要创建的并发使用者的数量。默认值为1。
container.setConcurrentConsumers(1);
// 设置确认模式为手工模式
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置监听器处理类
container.setMessageListener(consumer());
return container;
}
}消费者将消费所有从 RabbitMQ 服务器发送的所有消息,该消费者实现了 ChannelAwareMessageListener 接口,实现了 onMessage() 方法。代码如下:
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
/**
* 消息消费者
* @author hxstrive.com 2022/11/16
*/
public class Consumer implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(JSONObject.toJSONString(message, true));
System.out.println(JSONObject.toJSONString(channel, true));
// 手动确认
MessageProperties messageProperties = message.getMessageProperties();
channel.basicAck(messageProperties.getDeliveryTag(), false);
}
}通过 @SpringBootTest 注解创建一个简单的客户端代码,使用 RabbitTemplate 向 RabbitMQ 服务器发送 10 个消息。代码如下:
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* RabbitMQ 客户端
* @author hxstrive.com 2022/11/11
*/
@SpringBootTest
class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
for(int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,
"demo.hxstrive.com",
"Hello RabbitMQ......." + i);
}
}
}