RabbitMQ 4.2 为流引入了 SQL 过滤表达式,实现了强大的代理端消息过滤功能。
在我们的基准测试中,将 SQL 过滤器与布隆过滤器结合使用,在高选择性场景和高输入速率下实现了每秒超过 400 万条消息的过滤率。这意味着只有消费者真正关心的消息才会离开代理,从而大幅减少网络流量和客户端处理开销。
高吞吐量事件流通常向消费者传递海量数据,其中大部分可能与消费者无关。在实际系统中,可能存在数万个主题(事件类型、租户、区域、SKU等),为每个主题创建专用流既不现实也难以扩展。
RabbitMQ Streams 通过代理端过滤机制解决了这一问题。
布隆过滤器可跳过不包含目标值的整块数据,而 SQL 过滤表达式能精确评估每条消息的谓词条件,仅允许匹配消息通过网络传输。这有效降低了网络流量,减少了客户端CPU和内存消耗,同时简化了应用程序代码。
代理端过滤需求由来已久——Kafka用户多年来持续呼吁(参见KAFKA-6020),但该功能至今仍缺失。RabbitMQ的布隆+SQL双重过滤机制,使大规模选择性消费成为现实。
下面通过实践案例进行演示。
要在你的环境中运行此示例:
1. 使用单个调度程序线程启动 RabbitMQ:
docker run -it --rm --name rabbitmq -p 5672:5672 -e ERL_AFLAGS="+S 1" rabbitmq:4.2.0-beta.3
2. 从示例应用程序的根目录运行:
mvn clean compile exec:java
该示例应用程序使用 RabbitMQ AMQP 1.0 Java 客户端,因为 SQL 过滤表达式是 AMQP 1.0 的一项功能。
试想一个典型的电子商务平台,它会生成持续不断的客户事件流:
product.search
product.view
cart.add
cart.remove
order.created
以及许多其他事件
我们的示例应用程序向流中发布了1000万个此类事件,其中 order.created 事件每 10 万条消息出现一次——仅占总量的 0.001%。
每条消息都包含一个布隆过滤器注释,该注释设置为其事件类型,从而实现高效的分块级过滤:
publisher
.message(body.getBytes(StandardCharsets.UTF_8))
.priority(priority)
// 设置布隆过滤器值
.annotation("x-stream-filter-value", eventType)
.subject(eventType)
.creationTime(creationTime)
// 设置应用程序属性,例如地区、价格或高级客户
.property("region", region);假设你只想处理满足以下所有条件的高价值订单:
事件类型为 order.created
订单是在过去一小时内创建的
订单来源于 AMER, EMEA, 或 APJ 区域
并且至少满足以下条件之一:
优先级 > 4
价格 ≥ 99.99
高级客户
在我们的演示中,仅有1000万条消息中的10条符合这些标准——这种高度选择性的过滤场景在实际应用中十分常见。
传统方法需要处理全部1000万条消息并在客户端进行过滤,这将导致巨大的网络开销和资源浪费。
SQL过滤表达式通过在代理端完成所有过滤操作,优雅地解决了这个问题:
String SQL =
"properties.subject = 'order.created' AND " +
"properties.creation_time > UTC() - 3600000 AND " +
"region IN ('AMER', 'EMEA', 'APJ') AND " +
"(header.priority > 4 OR price >= 99.99 OR premium_customer = TRUE)";消费者的实现变得简单直接:
ConsumerBuilder.StreamOptions builder = connection.consumerBuilder()
.queue(STREAM_NAME)
.stream()
.offset(FIRST);
if (useBloomFilter) {
// 阶段1:布隆过滤器——快速跳过没有order.created事件的块
builder = builder.filterValues("order.created");
}
Consumer consumer = builder
// 阶段2:SQL过滤器——精确的代理端每条消息过滤
.filter()
.sql(SQL)
.stream()
.builder()
.messageHandler((ctx, msg) -> {
System.out.printf(" [%s] Received: %s\n",
consumerType, new String(msg.body(), StandardCharsets.UTF_8));
latch.countDown();
ctx.accept();
})
.build();Received 10 messages in 24.71 seconds using SQL filter only Broker-side filtering rate: 404,645 messages/second
消费者会收到恰好10条符合条件的消息。所有过滤都在代理上进行,每秒处理超过40万条消息,同时仅通过网络传输相关数据。
Received 10 messages in 2.05 seconds using Bloom + SQL filters Broker-side filtering rate: 4,868,549 messages/second
通过将两个过滤阶段结合,性能提升了一个数量级。布隆过滤器(第一阶段)在从磁盘读取数据前,就剔除了不包含order.created事件的整块数据;而SQL过滤器(第二阶段)则对剩余消息应用精确的业务逻辑。
提示:通过将布隆过滤器与SQL过滤表达式相结合,RabbitMQ实现了两者的优势互补:第一阶段采用高效的分块级过滤,避免不必要的磁盘I/O、CPU和内存消耗;第二阶段则进行精确的消息级过滤以处理复杂业务逻辑——所有操作均在消息代理服务器上完成。
请注意,布隆过滤的实际性能取决于每个数据块的消息数量,而这一数量会随消息进入速率而变化。
原文地址:https://www.rabbitmq.com/blog/2025/09/23/sql-filter-expressions