在前面章节我们介绍了怎样使用 RabbitMQ Java 客户端去发送/消费消息,在发送/消费消息之前,我们会通过 Java 代码动态声明一个交换器和队列。在创建交换器或队列时可以配置一些可选的属性来获得不同的功能,比如:x-message-ttl、x-expires、x-max-length 等等。
但是,通过 Java 客户端为交换器或队列设定的属性参数一旦设置成功就不能在改变,除非将原来的交换器或队列删除,重新创建新的交换器或队列。难道 RabbitMQ 没有提供可以动态修改这些属性的方法吗?当然提供了的,Policy(策略)就能很好的解决上面的问题。
Policy 是一种特殊的运行时参数的用法。Policy 是 vhost 级别的。一个 Policy 可以匹配一个或多个交换器或队列,这样便于批量管理。
Policy 支持动态地修改一些属性参数,这就解决了 RabbitMQ 客户端创建的交换器和队列不能修改的问题,也大大提高了应用的灵活性。
我们可以通过 RabbitMQ 的 rabbitmq_management 插件使用可视化的方式对 Policy 进行管理。登录进入 RabbitMQ 页面,打开 “Admin”->“Policies”,如下图:

点击“Add / update a policy”链接,添加一个 Policy,如下图:

上面需要用户输入很多信息,其中 Name 和 Pattern 是必输的。每个输入项的含义如下:
Virtual host :表示当前 Policy 所在的 vhost 虚拟机是哪个
Name :表示当前 Policy 的名称
Pattern:一个正则表达式,用来匹配相关的队列或者交换器。例如:^test.+ 将匹配所有以 test 开头的交换器或队列
Apply to:用来指定当前 Policy 作用范围,取值如下:
Exchanges and queues:表示作用与 Pattern 所匹配的所有队列和交换器
Exchanges:表示作用于 Pattern 所匹配的所有交换器
Queues:表示作用于与 Pattern 所匹配的所有队列
Priority:定义 Policy 的优先级。如果有多个 Policy 作用于同一个交换器或者队列,那么 Priority 最大的那个 Policy 才会有用
Definition:定义一组或者多组键值对,为匹配的交换器或者队列附加相应的功能
通过上面介绍的页面创建一个名为 policy-test 的 Policy 策略。该策略将应用到所有以 policy 开头的队列中,设置队列中消息的 message-ttl 为 3 秒中。如下图:

然后,编写如下测试代码,一次性发送 10 个消息到 policy-PolicyDemo1 队列。然后观察队列消息的变化,所有消息在过了3秒后将自动被删除。代码如下:
import com.rabbitmq.client.*;
public class PolicyDemo1 {
    /** 交换器名称 */
    private final String EXCHANGE_NAME = "exchange-" + PolicyDemo1.class.getSimpleName();
    /** 队列名称 */
    private final String QUEUE_NAME = "policy-" + PolicyDemo1.class.getSimpleName();
    public static void main(String[] args) throws Exception {
        PolicyDemo1 demo = new PolicyDemo1();
        demo.sender();
    }
    /**
     * 生产者
     * @throws Exception
     */
    private void sender() throws Exception {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.queueDeclare(QUEUE_NAME, true, false, false, null) ;
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.hxstrive.com");
        // 发送消息
        System.out.println("[Sender] Send Message...");
        for(int i = 0; i < 10; i++) {
            String message = "policy message i = " + i;
            channel.basicPublish(EXCHANGE_NAME, "www.hxstrive.com",
                    null, message.getBytes());
            System.out.println("[Sender] message = " + message);
        }
    }
}
            