RabbitMQ 教程

ReplyToQueueDemo.java

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;

/**
 * 回复队列示例
 * @author hxstrive.com 2022/2/28
 */
public class ReplyToQueueDemo {
    /** RPC队列名称 */
    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws Exception {
        ReplyToQueueDemo demo = new ReplyToQueueDemo();
        demo.new RpcServer();
        demo.new RpcClient().start().close();
    }

    /** RPC服务端 */
    class RpcServer {
        private Connection connection;
        private Channel channel;

        public RpcServer() throws Exception {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            connection = factory.newConnection();

            // 创建信道
            channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false,
                    false, false, null) ;
            // 指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。
            // 队列中没有被消费的消息不会被删除,还是存在于队列中
            channel.basicQos(1) ;

            System.out.println("[RpcServer] Awaiting RPC requests");
            channel.basicConsume(RPC_QUEUE_NAME, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                        AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String response = "";
                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);
                        System.out.println("[RpcServer] fib(" + message + ")");
                        // 计算斐波那契数列
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println("[RpcServer] " + e.toString());
                    } finally {
                        // 回复消息属性
                        AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                                // 消息的唯一标识回传
                                .correlationId(properties.getCorrelationId())
                                .build();
                        // 将响应消息写入到回复队列,回复队列有客户端通过 replyTo 指定
                        channel.basicPublish("", properties.getReplyTo(),
                                replyProps, response.getBytes("UTF-8"));
                        // 手动确认消息
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            });
        }

        /** 计算斐波那契额数列 */
        private int fib(int n) {
            if(n == 0) return 0;
            if(n == 1) return 1;
            return fib(n - 1) + (n - 2);
        }
    }

    /** RPC客户端 */
    class RpcClient {
        private Connection connection ;
        private Channel channel;
        private String replyQueueName;
        private QueueingConsumer consumer;

        public RpcClient() throws Exception {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            connection = factory.newConnection();

            // 创建信道
            channel = connection.createChannel();
            replyQueueName = channel.queueDeclare().getQueue();

            // 消费者
            consumer = new QueueingConsumer(channel) ;
            channel.basicConsume(replyQueueName, true, consumer);
        }

        /** 发起RPC调用 */
        public RpcClient start() throws Exception {
            System.out.println("[RpcClient] Requesting fib(30)");
            String response = callRpc("30");
            System.out.println("[RpcClient] Got '" + response + "'");
            return this;
        }

        /** 关闭连接 */
        public void close() throws Exception {
            channel.close();
            connection.close();
        }

        /** 发起RPC调用请求 */
        public String callRpc(String message) throws Exception {
            // 发送消息给RPC服务端
            String corrId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
            channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());

            // 等待接收服务端响应
            String response;
            while (true) {
                QueueingConsumer.Delivery delivery= consumer.nextDelivery();
                if(delivery.getProperties().getCorrelationId().equals(corrId)) {
                    response = new String(delivery.getBody());
                    break;
                }
            }
            return response;
        }
    }

}
说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号