import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
/**
* 回复队列示例
* @author hxstrive.com 2022/2/28
*/
public class ReplyToQueueDemo {
/** RPC队列名称 */
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws Exception {
ReplyToQueueDemo demo = new ReplyToQueueDemo();
demo.new RpcServer();
demo.new RpcClient().start().close();
}
/** RPC服务端 */
class RpcServer {
private Connection connection;
private Channel channel;
public RpcServer() throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
connection = factory.newConnection();
// 创建信道
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false,
false, false, null) ;
// 指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。
// 队列中没有被消费的消息不会被删除,还是存在于队列中
channel.basicQos(1) ;
System.out.println("[RpcServer] Awaiting RPC requests");
channel.basicConsume(RPC_QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println("[RpcServer] fib(" + message + ")");
// 计算斐波那契数列
response += fib(n);
} catch (RuntimeException e) {
System.out.println("[RpcServer] " + e.toString());
} finally {
// 回复消息属性
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
// 消息的唯一标识回传
.correlationId(properties.getCorrelationId())
.build();
// 将响应消息写入到回复队列,回复队列有客户端通过 replyTo 指定
channel.basicPublish("", properties.getReplyTo(),
replyProps, response.getBytes("UTF-8"));
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
}
/** 计算斐波那契额数列 */
private int fib(int n) {
if(n == 0) return 0;
if(n == 1) return 1;
return fib(n - 1) + (n - 2);
}
}
/** RPC客户端 */
class RpcClient {
private Connection connection ;
private Channel channel;
private String replyQueueName;
private QueueingConsumer consumer;
public RpcClient() throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
connection = factory.newConnection();
// 创建信道
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
// 消费者
consumer = new QueueingConsumer(channel) ;
channel.basicConsume(replyQueueName, true, consumer);
}
/** 发起RPC调用 */
public RpcClient start() throws Exception {
System.out.println("[RpcClient] Requesting fib(30)");
String response = callRpc("30");
System.out.println("[RpcClient] Got '" + response + "'");
return this;
}
/** 关闭连接 */
public void close() throws Exception {
channel.close();
connection.close();
}
/** 发起RPC调用请求 */
public String callRpc(String message) throws Exception {
// 发送消息给RPC服务端
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());
// 等待接收服务端响应
String response;
while (true) {
QueueingConsumer.Delivery delivery= consumer.nextDelivery();
if(delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
}
}