RabbitMQ 教程

连接到 RabbitMQ

在前面的 AMQP 协议介绍 中,生产者生产消息的代码如下:

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 关闭连接,释放资源
channel.close();
connection.close();

生产者发送消息的主要步骤如下:

  • 创建连接

  • 创建信道

  • 声明队列

  • 发送消息

  • 关闭连接,释放资源

本章节将详细介绍怎样通过 RabbitMQ Java 客户端代码创建连接。创建连接最简单的方式就是直接 new 一个 ConnectionFactory 对象,然后设置 RabbitMQ Broker 的主机地址和端口,然后调用 ConnectionFactory 的 newConnection() 方法。代码如下:

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 创建连接
Connection connection = factory.newConnection();

创建连接时,除了指定主机地址和端口外,我们还可以指定用户名(Username)、密码(Password)、虚拟主机(Virtual Host)等。代码如下:

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest"); 
factory.setPassword("guest");
factory.setVirtualHost("hxstrive.virtual"); 
// 创建连接
Connection connection = factory.newConnection();

注意:虚拟主机可以通过 RabbitMQ 的管理界面的“Admin”>“Virtual hosts”进行添加。如下图:

除了上面的创建方式,你还可以选择使用 URL 的方式来创建连接,代码如下:

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://guest:guest@127.0.0.1:5672/hxstrive.virtual");

// 创建连接
Connection connection = factory.newConnection();
System.out.println(connection);

// 关闭连接
connection.close();

在连接 Connection 创建成功后,就可以通过连接创建信道 Channel。AMQP URI格式如下:

amqp://username:password@hostIp:port/virtualHost

注意:

Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程重新创建一个 Channel。在某些情况下 Channel 的操作可以并发运行,但是其它情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认机制的运行,所以多线程间共享 Channel 实例是非线程安全的。

Channel 或者 Connection 中都有一个 isOpen() 方法,该方法可以用来检测 Channel/Connection 是否已经处于开启状态。但是,并不推荐在生产环境下使用 isOpen() 方法,这个方法的返回值依赖于 shutdownCause 的存在,有可能会产生竞争。

isOpen() 源码如下:

// com.rabbitmq.client.impl.ShutdownNotifierComponent
public boolean isOpen() {
    synchronized(this.monitor) {
        return this.shutdownCause == null;
    }
}

通常情况下,在调用 createChannel() 或者 newConnection() 方法之后,可以简单认为 Connection 或者 Channel 已经成功处于开启状态,而并不需要我们在代码中使用 isOpen() 方法去检查它们。即使在使用 Channel 的时候它处于关闭状态,程序会抛出 com.rabbitmq.client.ShutdownSignalException,捕获且处理该异常即可。

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号