Spring Boot 手动声明 RabbitMQ 队列&交换器

本文将介绍怎样通过 Spring Boot 手动声明 RabbitMQ 队列(Queue)、交换器(Exchange)。

下面直接上代码,需要注意的地方将在注释中给出说明。

RabbitMQ 配置类

配置类配置了 RabbitMQ 的连接工厂(ConnectionFactory),根据配置动态声明队列、交换器和绑定队列和交换器。最后,还配置了一个 SimpleMessageListenerContainer 消息监听器容器。代码如下:

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;

/**
 * RabbitMQ 配置类
 * @author hxstrive.com 2022/1/11
 */
@Configuration
public class RabbitMqConfig {
    /** 交换器名称 */
    public final static String EXCHANGE_NAME ="exchange-demo";
    /** 队列名称 */
    public final static String QUEUE_NAME = "queue-demo";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 虚拟主机
        connectionFactory.setVirtualHost("/");
        // RabbitMQ连接地址
        connectionFactory.setAddresses("127.0.0.1:5672");
        return connectionFactory;
    }

    /**
     * 动态创建队列
     * @param connectionFactory
     * @return
     * @throws IOException
     */
    @Bean
    public String queue(ConnectionFactory connectionFactory) throws IOException {
        // 声明交换器
        connectionFactory.createConnection()
                // 通道不开启事务
                .createChannel(false)
                // 交换器类型为 topic,开启持久化功能
                .exchangeDeclare(EXCHANGE_NAME, "topic", true);
        // 声明队列
        connectionFactory.createConnection()
                .createChannel(false)
                // durable 队列开启持久化功能
                // exclusive 不声明为独占队列
                // autoDelete 允许触发自动删除
                .queueDeclare(QUEUE_NAME, true,  false,false, null);
        // 队列绑定交换器
        connectionFactory.createConnection()
                .createChannel(false)
                // 将队列和交换器使用绑定键进行绑定
                .queueBind(QUEUE_NAME, EXCHANGE_NAME, "#.hxstrive.com");
        return QUEUE_NAME;
    }

    @Bean
    public Consumer consumer() {
        return new Consumer();
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 设置要从中接收消息的队列的名称
        container.setQueueNames(QUEUE_NAME);
        // 设置是否将侦听器 Rabbit Channel 公开给注册的 ChannelAwareMessageListener 以及
        // org.springframework.amqp.rabbit.core.RabbitTemplate 调用。
        // 默认为 "true",重用监听器的Channel。关闭此选项可公开从相同的底层 Rabbit Connection 获取的新的 Rabbit Channel。
        // 注意:由外部事务管理器管理的通道总是会暴露给 org.springframework.amqp.rabbit.core.RabbitTemplate 调用。
        // 因此,就 RabbitTemplate 公开而言,这个设置只影响本地事务的通道。
        container.setExposeListenerChannel(true);
        // 设置每个消费者获取的最大消息个数,值设置大可提高吞吐量
        container.setPrefetchCount(100);
        // 指定要创建的并发使用者的数量。默认值为1。
        container.setConcurrentConsumers(1);
        // 设置确认模式为手工模式
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置监听器处理类
        container.setMessageListener(consumer());
        return container;
    }

}

消费者(Consumer)

消费者将消费所有从 RabbitMQ 服务器发送的所有消息,该消费者实现了 ChannelAwareMessageListener 接口,实现了 onMessage() 方法。代码如下:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

/**
 * 消息消费者
 * @author hxstrive.com 2022/11/16
 */
public class Consumer implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println(JSONObject.toJSONString(message, true));
        System.out.println(JSONObject.toJSONString(channel, true));
        // 手动确认
        MessageProperties messageProperties = message.getMessageProperties();
        channel.basicAck(messageProperties.getDeliveryTag(), false);
    }

}

客户端(Client)

通过 @SpringBootTest 注解创建一个简单的客户端代码,使用 RabbitTemplate 向 RabbitMQ 服务器发送 10 个消息。代码如下:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * RabbitMQ 客户端
 * @author hxstrive.com 2022/11/11
 */
@SpringBootTest
class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void contextLoads() {
        for(int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,
                    "demo.hxstrive.com",
                    "Hello RabbitMQ......." + i);
        }
    }

}
有时候读书是一种巧妙地避开思考的方法。——赫尔普斯
1 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号