RabbitMQ 教程

RabbitMQ “Hello World”示例

本章节将演示怎样使用 Java 程序发送和接收消息。

Maven 依赖

在 pom.xml 中,添加如下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

示例

该示例是一个简单的 RabbitMQ 实例,接收端创建一个名为 “hello” 的队列,然后发送消息到该队列,消费者从该队列中接收消息。如下图:

上图中:

  • P:表示生成者

  • C:表示消费者

Send.java(生产者)

发送消息,即消息生产者。代码如下:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 直接发送消息到Queue(消息生产者)
 * @author Administrator
 */
public class Send {
	/**
	 * 队列名称
	 */
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] argv) throws Exception {
		new Send().run();
	}

	public void run() throws Exception {
		/**
		 * 建立与RabbitMQ消息服务器的连接
		 */
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1"); // 连接到本地服务器
		factory.setPort(AMQP.PROTOCOL.PORT); // 5672
		Connection connection = factory.newConnection();

		/**
		 * 创建一个通道是大多数API完成获取数据的所在地
		 */
		Channel channel = connection.createChannel();

		/**
		 * 发送消息,我们必须声明我们发送到哪里的队列。然后,我们能发布消息到这个队列中;
		 *
		 * 声明一个队列
		 * channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
		 * queue - 队列名称
		 * durable - 如果为true则声明一个持久化队列
		 * exclusive - 如果为true则声明一个独占队列
		 * autoDelete - 如果为true则声明一个自动删除队列
		 * arguments - 队列其他属性
		 */
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);

		/**
		 * 发布一个消息
		 * basicPublish(exchange, routingKey, props, body)
		 * exchange -
		 * routingKey - 路由的关键字
		 * props - 其他消息属性,路由报头等等
		 * body - 主体消息
		 */
		String message = "Hello World!";
		channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
		System.out.println("[x] Sent '" + message + "'");

		/**
		 * 关闭连接,释放资源
		 */
		channel.close();
		connection.close();
	}

}

Receive.java(消费者)

接收消息,即消息消费者。代码如下:

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 从Queue中接收消息(消费消息)
 * @author Administrator
 */
public class Receive {
	/**
	 * 队列名称
	 */
	private final static String QUEUE_NAME = "hello";

	public static void main(String[] args) throws Exception {
		new Receive().run();
	}

	public void run() throws Exception {
		/**
		 * 建立与RabbitMQ消息服务器的连接
		 */
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1"); // 连接到本地服务器
		factory.setPort(AMQP.PROTOCOL.PORT); // 5672
		Connection connection = factory.newConnection();

		/**
		 * 创建一个通道是大多数API完成获取数据的所在地
		 */
		Channel channel = connection.createChannel();

		/**
		 * 发送消息,我们必须声明我们发送到哪里的队列。然后,我们能发布消息到这个队列中;
		 *
		 * 声明一个队列
		 * channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
		 * queue      - 队列名称
		 * durable    - 如果为true则声明一个持久化队列
		 * exclusive  - 如果为true则声明一个独占队列
		 * autoDelete - 如果为true则声明一个自动删除队列
		 * arguments  - 队列其他属性
		 */
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);

		/**
		 * 接收消息
		 */
		System.out.println("[*] Waiting for messages. To exit press CTRL+C");

		/**
		 * 消费一个消息
		 * basicConsume(queue, autoAck, callback)
		 * queue - 队列名称
		 * autoAck -
		 * callback - 消费对象的接口
		 */
		channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					AMQP.BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("[x] Received '" + message + "'");
			}
		});
	}

}
说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号