使用 RabbitMQ 流的代理端 SQL 过滤

RabbitMQ 4.2 为流引入了 SQL 过滤表达式,实现了强大的代理端消息过滤功能。在我们的基准测试中,将 SQL 过滤器与布隆过滤器结合使用,在高选择性场景和高输入速率下实现了每秒超过 400 万条消息的过滤率。这意味着只有消费者真正关心的消息才会离开代理,从而大幅减少网络流量和客户端处理开销。

RabbitMQ 4.2 为流引入了 SQL 过滤表达式,实现了强大的代理端消息过滤功能。

在我们的基准测试中,将 SQL 过滤器与布隆过滤器结合使用,在高选择性场景和高输入速率下实现了每秒超过 400 万条消息的过滤率。这意味着只有消费者真正关心的消息才会离开代理,从而大幅减少网络流量和客户端处理开销。

动机

高吞吐量事件流通常向消费者传递海量数据,其中大部分可能与消费者无关。在实际系统中,可能存在数万个主题(事件类型、租户、区域、SKU等),为每个主题创建专用流既不现实也难以扩展。

RabbitMQ Streams 通过代理端过滤机制解决了这一问题。

布隆过滤器可跳过不包含目标值的整块数据,而 SQL 过滤表达式能精确评估每条消息的谓词条件,仅允许匹配消息通过网络传输。这有效降低了网络流量,减少了客户端CPU和内存消耗,同时简化了应用程序代码。

代理端过滤需求由来已久——Kafka用户多年来持续呼吁(参见KAFKA-6020),但该功能至今仍缺失。RabbitMQ的布隆+SQL双重过滤机制,使大规模选择性消费成为现实。

下面通过实践案例进行演示。

运行示例应用程序

要在你的环境中运行此示例:

1. 使用单个调度程序线程启动 RabbitMQ:

docker run -it --rm --name rabbitmq -p 5672:5672 -e ERL_AFLAGS="+S 1" rabbitmq:4.2.0-beta.3

2. 从示例应用程序的根目录运行:

mvn clean compile exec:java

该示例应用程序使用 RabbitMQ AMQP 1.0 Java 客户端,因为 SQL 过滤表达式是 AMQP 1.0 的一项功能。

发布事件

试想一个典型的电子商务平台,它会生成持续不断的客户事件流:

  • product.search

  • product.view

  • cart.add

  • cart.remove

  • order.created

  • 以及许多其他事件

我们的示例应用程序向流中发布了1000万个此类事件,其中 order.created 事件每 10 万条消息出现一次——仅占总量的 0.001%。

每条消息都包含一个布隆过滤器注释,该注释设置为其事件类型,从而实现高效的分块级过滤:

publisher
    .message(body.getBytes(StandardCharsets.UTF_8))
    .priority(priority)
    // 设置布隆过滤器值
    .annotation("x-stream-filter-value", eventType)
    .subject(eventType)
    .creationTime(creationTime)
    // 设置应用程序属性,例如地区、价格或高级客户
    .property("region", region);

定义你的过滤器

假设你只想处理满足以下所有条件的高价值订单:

  • 事件类型为 order.created

  • 订单是在过去一小时内创建的

  • 订单来源于 AMER, EMEA, 或 APJ 区域

  • 并且至少满足以下条件之一:

    • 优先级 > 4

    • 价格 ≥ 99.99

    • 高级客户

在我们的演示中,仅有1000万条消息中的10条符合这些标准——这种高度选择性的过滤场景在实际应用中十分常见。

传统方法需要处理全部1000万条消息并在客户端进行过滤,这将导致巨大的网络开销和资源浪费。

SQL过滤表达式通过在代理端完成所有过滤操作,优雅地解决了这个问题:

String SQL =
     "properties.subject = 'order.created' AND " +
     "properties.creation_time > UTC() - 3600000 AND " +
     "region IN ('AMER', 'EMEA', 'APJ') AND " +
     "(header.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)";

消费者的实现变得简单直接:

ConsumerBuilder.StreamOptions builder = connection.consumerBuilder()
    .queue(STREAM_NAME)
    .stream()
    .offset(FIRST);

if (useBloomFilter) {
    // 阶段1:布隆过滤器——快速跳过没有order.created事件的块
    builder = builder.filterValues("order.created");
}

Consumer consumer = builder
    // 阶段2:SQL过滤器——精确的代理端每条消息过滤
    .filter()
        .sql(SQL)
    .stream()
    .builder()
    .messageHandler((ctx, msg) -> {
        System.out.printf("  [%s] Received: %s\n",
            consumerType, new String(msg.body(), StandardCharsets.UTF_8));
        latch.countDown();
        ctx.accept();
    })
    .build();

性能结果

仅SQL过滤

Received 10 messages in 24.71 seconds using SQL filter only
Broker-side filtering rate: 404,645 messages/second

消费者会收到恰好10条符合条件的消息。所有过滤都在代理上进行,每秒处理超过40万条消息,同时仅通过网络传输相关数据。

布隆+SQL过滤

Received 10 messages in 2.05 seconds using Bloom + SQL filters
Broker-side filtering rate: 4,868,549 messages/second

通过将两个过滤阶段结合,性能提升了一个数量级。布隆过滤器(第一阶段)在从磁盘读取数据前,就剔除了不包含order.created事件的整块数据;而SQL过滤器(第二阶段)则对剩余消息应用精确的业务逻辑。

提示:通过将布隆过滤器与SQL过滤表达式相结合,RabbitMQ实现了两者的优势互补:第一阶段采用高效的分块级过滤,避免不必要的磁盘I/O、CPU和内存消耗;第二阶段则进行精确的消息级过滤以处理复杂业务逻辑——所有操作均在消息代理服务器上完成。

请注意,布隆过滤的实际性能取决于每个数据块的消息数量,而这一数量会随消息进入速率而变化。

原文地址:https://www.rabbitmq.com/blog/2025/09/23/sql-filter-expressions

  

我们愈是学习,愈觉得自己的贫乏。 —— 雪莱
0 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
其他应用
公众号