import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; /** * 回复队列示例 * @author hxstrive.com 2022/2/28 */ public class ReplyToQueueDemo { /** RPC队列名称 */ private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ReplyToQueueDemo demo = new ReplyToQueueDemo(); demo.new RpcServer(); demo.new RpcClient().start().close(); } /** RPC服务端 */ class RpcServer { private Connection connection; private Channel channel; public RpcServer() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); connection = factory.newConnection(); // 创建信道 channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ; // 指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。 // 队列中没有被消费的消息不会被删除,还是存在于队列中 channel.basicQos(1) ; System.out.println("[RpcServer] Awaiting RPC requests"); channel.basicConsume(RPC_QUEUE_NAME, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String response = ""; try { String message = new String(body, "UTF-8"); int n = Integer.parseInt(message); System.out.println("[RpcServer] fib(" + message + ")"); // 计算斐波那契数列 response += fib(n); } catch (RuntimeException e) { System.out.println("[RpcServer] " + e.toString()); } finally { // 回复消息属性 AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() // 消息的唯一标识回传 .correlationId(properties.getCorrelationId()) .build(); // 将响应消息写入到回复队列,回复队列有客户端通过 replyTo 指定 channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); // 手动确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }); } /** 计算斐波那契额数列 */ private int fib(int n) { if(n == 0) return 0; if(n == 1) return 1; return fib(n - 1) + (n - 2); } } /** RPC客户端 */ class RpcClient { private Connection connection ; private Channel channel; private String replyQueueName; private QueueingConsumer consumer; public RpcClient() throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); connection = factory.newConnection(); // 创建信道 channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); // 消费者 consumer = new QueueingConsumer(channel) ; channel.basicConsume(replyQueueName, true, consumer); } /** 发起RPC调用 */ public RpcClient start() throws Exception { System.out.println("[RpcClient] Requesting fib(30)"); String response = callRpc("30"); System.out.println("[RpcClient] Got '" + response + "'"); return this; } /** 关闭连接 */ public void close() throws Exception { channel.close(); connection.close(); } /** 发起RPC调用请求 */ public String callRpc(String message) throws Exception { // 发送消息给RPC服务端 String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes()); // 等待接收服务端响应 String response; while (true) { QueueingConsumer.Delivery delivery= consumer.nextDelivery(); if(delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } } }