RabbitMQ 教程

发送消息

前面章节介绍了怎样连接到 RabbitMQ、怎样创建交换器和怎样创建队列。本章将介绍如何使用 RabbitMQ Java 客户端发送消息。

如果要发送一个消息,可以使用 Channel(信道)类的 basicPublish() 方法,例如:发送一条内容为“Hello World!”的消息,代码如下:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(Receive.EXCHANGE_NAME, "topic");
// 发送消息
byte[] msg = "hello wrold".getBytes();
channel.basicPublish(Receive.EXCHANGE_NAME, "www.hxstrive.com", null, msg);

为了更好地控制发送,可以使用 mandatory 这个参数,或者可以发送一些特定属性的消息:

// 发送消息
boolean mandatory = true;
byte[] msg = "hello wrold".getBytes();
channel.basicPublish(Receive.EXCHANGE_NAME, "www.hxstrive.com",
        mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);

上面代码发送了一条消息,这条消息的投递模式(delivery mode)设置为2,即消息会被持久化到服务器磁盘中。同时,这条消息的优先级(priority)设置为0,content-type 为“text/plain”。MessageProperties.PERSISTENT_TEXT_PLAIN 定义如下:

当然,你也可以自己设定消息属性,代码如下:

byte[] msg = "hello wrold".getBytes();
channel.basicPublish(Receive.EXCHANGE_NAME, "www.hxstrive.com",
      new AMQP.BasicProperties.Builder()
            .contentType("text/plain")
            .deliveryMode(2)
            .priority(1)
            // 仅当创建连接的用户名和这里指定的 userId 一致时,才能将消息发送出去
            .userId("guest")
            .build(), msg);

也可以发送一条带有 headers 的消息,代码如下:

byte[] msg = "hello wrold".getBytes();
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("location", "here");
headers.put("time", "today");
channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
      new AMQP.BasicProperties.Builder()
            .headers(headers)
            .build(), msg);

还可以发送一条带有过期时间(expiration)的消息:

channel.basicPublish(exchangeName, routingKey, 
    new AMQP.BasicProperties.Builder() 
        .expiration("6000")
        .build(),
    messageBodyBytes) ;

basicPublish 方法

信道的 basicPublish() 方法用来发送一个消息到 RabbitMQ 服务。它有三个重载方法,定义如下: 

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

参数说明:

  • exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。

  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中

  • props:消息的基本属性集,其包含 14 个属性成员,分别有 contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationId、 replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。

  • byte[] body:消息体( payload ),真正需要发送的消息

  • mandatory:如果将 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,则消息直接被丢弃。

  • immediate:当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic .Return 返回至生产者。

点击查看发送消息的一些完整示例程序:

  • PushMessage1.java 验证利用信道 Channel 的 basicPublish() 简单发送消

  • PushMessage2.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,且指定 mandatory 为 true

  • PushMessage3.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,发送消息时指定 userI

  • PushMessage4.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,发送一条带有 headers 的消息

  • PushMessage5.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,为消息指定过期时

  • PushMessage6.java 验证通过信道 Channel 的 basicPublish() 方法发送消息,将队列指定为超时队列

说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号