RabbitMQ之远程过程调用(RPC)

一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复一个响应消息。为了收到服务端一个响应,我们需要发送一个'回调'的请求的队列地址。

一般使用RabbitMQ做RPC很容易。客户端发送一个请求消息然后服务器回复一个响应消息。为了收到服务器的一个响应,我们需要发送一个“回调”请求的队列地址。我们可以使用默认队列,但是Java客户端除外。

AMQP协议给消息定义了14个属性。大部分的属性很少使用,下面将介绍几个经常使用的属性: 

  • deliveryMode 将消息标记为持久(值为2)或瞬态(任何其他值)。你可能记得在第二个教程中使用了这个属性。

  • contentType 用来设置mime类型。例如经常使用的JSON格式数据,就需要将此属性设置为:application/json。

  • replyTo 通常用来命名一个回调队列

  • correlationId 用来关联RPC请求的响应

RPC工作流程:

RabbitMQ RPC

1)、客户端启动时,创建了一个匿名的回调队列。

2)、在一个RPC请求中,客户端发送一个消息,它有两个属性:

  • REPLYTO 用来设置回调队列名

  • correlationId 对于每个请求都被设置成唯一的值

3)、请求被发送到rpc_queue队列

4)、RPC工作者(服务器)等待接收该队列的请求。当收到一个请求,它就会处理并把结果发送给客户端,使用的队列是replyTo字段指定的。

5)、客户端等待接收回调队列中的数据。当接到一个消息,它会检查它的correlationId属性。如果它和设置的相匹配,就会把响应返回给应用程序。

1、RPC服务器的RPCServer.java,接收消息调用rpc并返回结果

package com.bug315;

import java.security.MessageDigest;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 
 * @author Administrator
 * @date 2016年10月22日 20:52:15
 */
public class RPCServer {
	private static final String RPC_QUEUE_NAME = "rpc_queue";

	public static void main(String[] args) throws Exception {
		// 先建立连接、通道,并声明队列
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
//		factory.setUsername("admin");
//		factory.setPassword("admin");
		factory.setPort(AMQP.PROTOCOL.PORT);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
		
		// 可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。
		channel.basicQos(1);
		QueueingConsumer consumer = new QueueingConsumer(channel);
		
		// 打开应答机制autoAck=false
		channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
		System.out.println(" [x] Awaiting RPC requests");
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			BasicProperties props = delivery.getProperties();
			BasicProperties replyProps = new BasicProperties.Builder()
					.correlationId(props.getCorrelationId()).build();
			String message = new String(delivery.getBody());
			System.out.println("[.] getMd5String(" + message + ")");
			String response = getMd5String(message);
			// 返回处理结果队列
			channel.basicPublish("", props.getReplyTo(), replyProps,
					response.getBytes());
			// 发送应答
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}

	/**
	 * 模拟RPC方法 获取MD5字符串
	 * @param str
	 * @return
	 */
	public static String getMd5String(String str) {
		MessageDigest md5 = null;
		try {
			md5 = MessageDigest.getInstance("MD5");
		} catch (Exception e) {
			System.out.println(e.toString());
			e.printStackTrace();
			return "";
		}
		char[] charArray = str.toCharArray();
		byte[] byteArray = new byte[charArray.length];
		for (int i = 0; i < charArray.length; i++)
			byteArray[i] = (byte) charArray[i];
		byte[] md5Bytes = md5.digest(byteArray);
		StringBuffer hexValue = new StringBuffer();
		for (int i = 0; i < md5Bytes.length; i++) {
			int val = ((int) md5Bytes[i]) & 0xff;
			if (val < 16)
				hexValue.append("0");
			hexValue.append(Integer.toHexString(val));
		}
		return hexValue.toString();
	}
}

2、客户端RPCClient.java,发送rpc调用消息,接收结果

package com.bug315;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * 
 * @author Administrator
 * @date 2016年10月22日 20:56:32
 */
public class RPCClient {
	private Connection connection;
	private Channel channel;
	private String requestQueueName = "rpc_queue";
	private String replyQueueName;
	private QueueingConsumer consumer;

	public RPCClient() throws Exception {
		// 先建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
//		factory.setUsername("admin");
//		factory.setPassword("admin");
		factory.setPort(AMQP.PROTOCOL.PORT);
		connection = factory.newConnection();
		channel = connection.createChannel();
		
		// 注册'回调'队列,这样就可以收到RPC响应
		replyQueueName = channel.queueDeclare().getQueue();
		consumer = new QueueingConsumer(channel);
		channel.basicConsume(replyQueueName, true, consumer);
	}

	// 发送RPC请求
	public String call(String message) throws Exception {
		String response = null;
		String corrId = java.util.UUID.randomUUID().toString();
		// 发送请求消息,消息使用了两个属性:replyto和correlationId
		BasicProperties props = new BasicProperties.Builder()
				.correlationId(corrId).replyTo(replyQueueName).build();
		channel.basicPublish("", requestQueueName, props, message.getBytes());
		// 等待接收结果
		while (true) {
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			// 检查它的correlationId是否是我们所要找的那个
			if (delivery.getProperties().getCorrelationId().equals(corrId)) {
				response = new String(delivery.getBody());
				break;
			}
		}
		return response;
	}

	public void close() throws Exception {
		connection.close();
	}
	
	public static void main(String[] args) {
		try {
			RPCClient client = new RPCClient();
			client.call("hello rcp");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

先运行服务端,再运行RPCClient ,发送消息调用RPC。

这里介绍的是该设计不是实现RPC服务的唯一可能,但它有一些重要的优点:

1)如果RPC服务器速度太慢,你可以通过运行多个RPC服务器。尝试在一个新的控制台上运行RPCServer。

2)RPC客户端只发送和接收一个消息。不需要queueDeclare那样要求同步调用。因此,RPC客户端只需要在一个网络上发送和接收为一个单一的RPC请求。

生活总会给你答案的,但不会马上把一切都告诉你。只要你肯等一等,生活的美好,总在你不经意的时候,盛装莅临。
0 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号