Redis Stream(流)是 Redis 5.0 版本引入的一个新的数据类型。
Stream 以更抽象的方式模拟日志数据结构,但日志仍然是完整的:就像一个日志文件,通常实现为以只附加模式打开的文件,Redis流主要是一个仅附加数据结构。
本文将介绍怎样通过 jedis 库操作 Redis 的 Streams 数据类型。
注意,低版本的 jedis 不支持流,jedis 至少需要 3.1.0 版本以上,本文将使用最新版本 4.3.1,依赖如下:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>4.3.1</version> </dependency>
用来将指定的数据添加到流中,返回条目ID,方法定义如下:
byte[] xadd(byte[] key, XAddParams params, Map<byte[],byte[]> hash)
StreamEntryID xadd(String key, StreamEntryID id, Map<String,String> hash) 对应的 Redis 命令为 XADD key ID field string [field string ...]
StreamEntryID xadd(String key, XAddParams params, Map<String,String> hash)
示例:
// 添加一个条目到流
Map<String,String> map = new HashMap<>();
map.put("id", "1000");
map.put("name", "Helen");
StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
System.out.println(id); // 1679289623159-0
// 注意:流条目内容为 {"id":"1000","name":"Helen"}
// 添加一个条目到流,指定最大长度,以及是否近乎精确
for(int i = 0; i < 5; i++) {
   map = new HashMap<>();
   map.put("id", Integer.valueOf(2000 + i).toString());
   map.put("name", "Bill-" + i);
   XAddParams xAddParams = new XAddParams();
   xAddParams.maxLen(2); // 最大长度
   xAddParams.approximateTrimming(); // 近乎精确分割
   //xAddParams.exactTrimming(); // 精确分割
   xAddParams.id(StreamEntryID.NEW_ENTRY); // ID策略,即 *
   id = jedis.xadd("mystream", xAddParams, map);
   System.out.println(id);
}其中,StreamEntryID 用来指定流 ID 的生成策略,取值如下:
StreamEntryID.LAST_ENTRY 应该只与 XGROUP CREATE 一起使用,如:XGROUP CREATE mystream -group-name $
StreamEntryID.NEW_ENTRY 应该只与 XADD 一起使用,如: XADD mystream * field1 value1
StreamEntryID.UNRECEIVED_ENTRY 应该只与 XREADGROUP 一起使用,如: XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
用来从流中读取条目,方法定义如下:
List<byte[]> xread(XReadParams xReadParams, Map.Entry<byte[],byte[]>... streams)
List<Map.Entry<String,List<StreamEntry>>> xread(XReadParams xReadParams, Map<String,StreamEntryID> streams) 对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[],byte[]>... streams)
List<Map.Entry<String,List<StreamEntry>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String,StreamEntryID> streams) 对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
示例:
// 准备数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679375078037-0, 1679375078038-0, 1679375078038-1, 1679375078038-2, 1679375078039-0]
// 从 mystream 流中获取条目 ID 大于 1679375078038-1 的两个条目
Map<String,StreamEntryID> params = new HashMap<>();
params.put("mystream", idList.get(2));
XReadParams xReadParams = new XReadParams();
xReadParams.count(2); // 最大去读个数
xReadParams.block(1000); // 阻塞时间,毫秒
List<Map.Entry<String, List<StreamEntry>>> result = jedis.xread(xReadParams, params);
for(Map.Entry<String, List<StreamEntry>> entry : result) {
   String key = entry.getKey();
   List<StreamEntry> value = entry.getValue();
   System.out.println("key = " + key); // key = mystream
   for(StreamEntry streamEntry : value) {
       System.out.println(streamEntry.getID() + " = " + JSONObject.toJSONString(streamEntry.getFields()));
   }
   // 1679375078038-2 = {"uuid":"d79024c2-e60d-42c5-9beb-4da6c302d7fc"}
   // 1679375078039-0 = {"uuid":"16c1caf4-6048-4add-8b79-a6ddbc10ec8b"}
}用来读取指定两个消息ID(开始ID,结束ID)之间的消息,方法定义如下:
List<byte[]> xrange(byte[] key, byte[] start, byte[] end)
List<byte[]> xrange(byte[] key, byte[] start, byte[] end, int count)
List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end) 对应的 Redis 命令为 XRANGE key start end
List<StreamEntry> xrange(String key, StreamEntryID start, StreamEntryID end, int count) 对应的 Redis 命令为 XRANGE key start end COUNT count
List<StreamEntry> xrange(String key, String start, String end)
List<StreamEntry> xrange(String key, String start, String end, int count)
示例:
// 准备数据,添加十条数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679373956962-0, 1679373956963-0, 1679373956964-0, 1679373956964-1, 1679373956964-2]
// 根据ID范围获取数据
StreamEntryID startId = idList.get(0);
StreamEntryID endId = idList.get(idList.size() - 1);
List<StreamEntry> streamEntryList = jedis.xrange("mystream", startId, endId, 5);
for(StreamEntry streamEntry : streamEntryList) {
   System.out.println(streamEntry.getID() + " = " + JSONObject.toJSONString(streamEntry.getFields()));
}
// 1679373956962-0 = {"uuid":"5beca5dc-a1b8-4b59-a5a5-c7357baf287c"}
// 1679373956963-0 = {"uuid":"82227f64-eddf-470e-86ab-de8fcf1ac1c3"}
// 1679373956964-0 = {"uuid":"2647c5a4-8e77-454f-a1d1-a1772688aa4f"}
// 1679373956964-1 = {"uuid":"d5c17352-8622-4d00-b7c0-2143aaae0ce7"}
// 1679373956964-2 = {"uuid":"a3400f40-ae40-46a0-bb9c-1a907795fd13"}用来获取指定流的长度,即条目个数,方法定义如下:
Long xlen(String key)
示例:
// 准备数据,添加十条数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679374066012-0, 1679374066018-0, 1679374066019-0, 1679374066019-1, 1679374066021-0]
// 获取流条目数
Long len = jedis.xlen("mystream");
System.out.println("len = " +len); // len = 5用来,方法定义如下:
long xtrim(byte[] key, long maxLen, boolean approximateLength)
long xtrim(byte[] key, XTrimParams params)
long xtrim(String key, long maxLen, boolean approximateLength) 对应的 Redis 命令 XTRIM key MAXLEN [~] count
long xtrim(String key, XTrimParams params)
示例:
// 准备数据,添加十条数据
List<StreamEntryID> idList = new ArrayList<>();
for(int i = 0; i < 5; i++) {
   Map<String, String> map = new HashMap<>();
   map.put("uuid", UUID.randomUUID().toString());
   StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
   idList.add(id);
}
System.out.println(Arrays.toString(idList.toArray()));
// [1679633388548-0, 1679633388550-0, 1679633388550-1, 1679633388551-0, 1679633388551-1]
long len = jedis.xlen("mystream");
System.out.println("old len=" + len); // old len=5
// 第三个参数 approximateLength 用来指定是否采用近乎准确的格式
// 如果 approximateLength = true,则采用近乎精确的方式,即指定 2 不一定会裁剪到只剩下2个消息
// 如果 approximateLength = false,则采用精确的方式,即指定 2 则流会被裁剪到长度为 2
jedis.xtrim("mystream", 2, false);
len = jedis.xlen("mystream");
System.out.println("new len=" + len); // new len=2用来从流中删除一个或多个消息,方法定义如下:
long xdel(String key, StreamEntryID... ids) 对应的 Redis 命令 XDEL key ID [ID ...]
实例:
// 准备数据
Map<String, String> map = new HashMap<>();
map.put("uuid", UUID.randomUUID().toString());
StreamEntryID id = jedis.xadd("mystream", StreamEntryID.NEW_ENTRY, map);
System.out.println("id=" + id); // id=1679633800258-0
// 删除流中指定条目ID的条目
long size = jedis.xdel("mystream", id);
System.out.println("size=" + size); // size=1用来在指定的流中创建指定名称的消费组或者删除消费组,方法定义如下:
String xgroupCreate(String key, String groupname, StreamEntryID id, boolean makeStream) 对应 Redis 的命令 XGROUP CREATE
long xgroupDestroy(String key, String groupname) 对应 Redis 的命令为 XGROUP DESTROY
实例:
// 在 mystream 流中创建名为 mygroup 的消费组
// makeStream 参数用来指定当流不存在时是否自动创建流,true-自动创建,false-不自动创建
// 如果流不存在,且 makeStream = false,则将抛出异常
// ERR The XGROUP subcommand requires the key to exist.
// Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true);
System.out.println("str=" + str); // str=OK
// 删除消费组
long ln = jedis.xgroupDestroy("mystream", "mygroup");
System.out.println("ln=" + ln); // ln=1用来在指定的流中的消费组中创建消费者,或者删除消费者,方法定义如下:
long xgroupDelConsumer(byte[] key, byte[] groupName, byte[] consumerName)
long xgroupDelConsumer(String key, String groupName, String consumerName) 对应 Redis 的命令为 XGROUP DELCONSUMER
注意:jedis 没有提供单独创建消费者的方法,而是通过 readGroup() 方法,当我们读取消息时,将会自动创建消费者。
示例:
// 创建消费组
String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true);
System.out.println("str=" + str); // str=OK
// 从消费组读取消息,将自动创建消费者
Map<String,StreamEntryID> params = new HashMap<>();
params.put("mystream", StreamEntryID.UNRECEIVED_ENTRY); // 指示读取未确认消息
XReadGroupParams xReadGroupParams = new XReadGroupParams();
xReadGroupParams.count(5); // 限制最大读取5个
xReadGroupParams.block(1000); // 阻塞1秒
xReadGroupParams.noAck(); // 自动确认
List<Map.Entry<String, List<StreamEntry>>> result = jedis.xreadGroup(
       "mygroup", "myconsumer", xReadGroupParams, params);
System.out.println("result=" + JSONObject.toJSONString(result));
// 删除消费者 myconsumer
long val = jedis.xgroupDelConsumer("mystream", "mygroup", "myconsumer");
System.out.println("删除消费者 val=" + val); // 删除消费者 val=0用来通过消费组的方式从流中消费消息,方法定义如下:
List<byte[]> xreadGroup(byte[] groupName, byte[] consumer, XReadGroupParams xReadGroupParams, Map.Entry<byte[],byte[]>... streams)
List<Map.Entry<String,List<StreamEntry>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String,StreamEntryID> streams) 对应的 Redis 命令为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
示例:
// 创建消费组
String str = jedis.xgroupCreate("mystream", "mygroup", StreamEntryID.LAST_ENTRY, true);
System.out.println("str=" + str); // str=OK
// 从消费组读取消息,将自动创建消费者
Map<String,StreamEntryID> params = new HashMap<>();
params.put("mystream", StreamEntryID.UNRECEIVED_ENTRY); // 指示读取未确认消息
boolean run = true;
while(run) {
   // 用来从流的消费组中读取消息,自动创建 myconsumer 消费者
   // 每次从流中最多读取 5 条消息,如果没有可用的消息则阻塞 500 毫秒
   XReadGroupParams xReadGroupParams = new XReadGroupParams();
   xReadGroupParams.count(5); // 限制最大读取5个
   xReadGroupParams.block(500); // 阻塞1秒
   xReadGroupParams.noAck(); // 自动确认
   List<Map.Entry<String, List<StreamEntry>>> result = jedis.xreadGroup(
           "mygroup", "myconsumer", xReadGroupParams, params);
   if(null != result && result.size() > 0) {
       for(Map.Entry<String, List<StreamEntry>> entry : result) {
           List<StreamEntry> value = entry.getValue();
           for(StreamEntry streamEntry : value) {
               String val = streamEntry.getFields().get("message");
               System.out.println("message = " + val);
               // 退出消息接受
               if("exit".equalsIgnoreCase(val)) {
                   run = false;
               }
           }
       }
   }
}
// 演示,先运行上面代码,等待消费消息
// 然后,使用 redis-cli 连接到 redis 服务,执行下面命令添加消息到流
// xadd mystream * message value1
// xadd mystream * message value2
// xadd mystream * message exit   则退出 while 循环
            