RabbitMQ 教程

回复队列(replyTo)

在 RabbitMQ 中,回复队列主要用于接收 RPC 调用的响应消息。

RPC 是 Remote Procedure Call 的简称(即远程过程调用)。RPC 是一种通过网络请求远程计算机上的服务,而不需要了解网络的底层技术。

RPC 的主要作用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性(即调用远程方法或函数像调用本地方法或函数一样方便)。通俗点来说,假设有两台服务器 A 和 B,有一个应用部署在 A 服务器上,A 服务器上的应用想要调用 B 服务器上应用提供的服务(函数或方法),由于两台服务器上的应用在不同的服务器上,因此不能直接调用,需要通过网络来表达调用的语义和传输调用参数数据。

目前支持 RPC 的协议有很多,例如最早的 CORBA、Java RMI、Web Service RPC 风格、Hessian、Thrift 甚至还有 Restful API。

RabbitMQ 不仅可以当做消息(MQ)服务器,还能进行 RPC 调用,而且使用方法还很简单。客户端发送请求消息,服务端回复响应的消息。为了接收服务器响应的消息,我们需要在请求消息中指定一个回复队列。回复队列通过发送消息时的 replyTo 属性进行指定,关键代码如下:

Channel channel = connection.createChannel();
// 由 RabbitMQ 自动创建队列名
String replyQueueName = channel.queueDeclare().getQueue();
// 消息唯一标识,消费的时候需要进行对别
String corrId = UUID.randomUUID().toString();
// 通过属性设置回复队列
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .correlationId(corrId)
        .replyTo(replyQueueName)
        .build();
// 发送一个消息
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());

示例代码

该示例分别创建了一个生产者(RPC客户端)和一个消费者(RPC服务端),生产者向一个普通的队列发送一条消息(该消息需要设置一个回复队列,通过 replyTo() 进行设置,以及一个消息唯一标识符 correlationId),消费者订阅该队列。当消费者收到消息时,进行业务处理,业务处理完成后,将处理结果和收到消息的唯一标识符一并打包发送到回复队列(回复队列从收到消息的 replyTo 中获取),这条消息可以称为响应消息。然后,生产者从回复队列中接收响应消息,并根据消息唯一标识符进行处理。整个过程如下图:

注意:不要为每个 RPC 请求创建一个回复队列,因为这样非常低效。只需要为每一个客户端创建一个回复队列即可,同时,这也到这了一个新问题?如果你发送了4次RPC调用,那么收到的RPC响应消息分别对应那一次RPC调用呢!要解决这个问题就需要用到上面 correlationId(消息唯一标识符),通过这个就可以区分开每一个响应属于哪一个请求。

(1)服务端关键代码,如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
connection = factory.newConnection();

// 创建信道
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
// 指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。
// 队列中没有被消费的消息不会被删除,还是存在于队列中
channel.basicQos(1) ;

System.out.println("[RpcServer] Awaiting RPC requests");
channel.basicConsume(RPC_QUEUE_NAME, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
            AMQP.BasicProperties properties, byte[] body) throws IOException {
        String response = "";
        try {
            String message = new String(body, "UTF-8");
            int n = Integer.parseInt(message);
            System.out.println("[RpcServer] fib(" + message + ")");
            // 计算斐波那契数列
            // 调用业务方法
            response += fib(n);
        } catch (RuntimeException e) {
            System.out.println("[RpcServer] " + e.toString());
        } finally {
            // 回复消息属性
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                    // 消息的唯一标识回传
                    .correlationId(properties.getCorrelationId())
                    .build();
            // 将响应消息写入到回复队列,回复队列有客户端通过 replyTo 指定
            channel.basicPublish("", properties.getReplyTo(),
                    replyProps, response.getBytes("UTF-8"));
            // 手动确认消息
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
});

(2)客户端关键代码,如下:

// 发送消息给RPC服务端
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
        .correlationId(corrId)
        .replyTo(replyQueueName)
        .build();
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());

// 等待接收服务端响应
String response;
while (true) {
    QueueingConsumer.Delivery delivery= consumer.nextDelivery();
    if(delivery.getProperties().getCorrelationId().equals(corrId)) {
        response = new String(delivery.getBody());
        break;
    }
}

点击查看完整示例代码(ReplyToQueueDemo.java)。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号