Spring Data Redis 中,所有发送到流(Stream)中的任何记录都需要序列化为二进制格式。
由于流(Stream)与哈希(Hash)的数据结构非常接近,因此流键(Key)、字段名(Field Name)和值(Field Value)均使用了 RedisTemplate 上配置的相应序列化器。下面列出了对应的序列化和反序列化器:
key 使用 keySerializer 的序列化器,用于 Record.getStream() 方法
field 使用 hashKeySerializer 的序列化器,用于有效载荷 Map 中的每个键
value 使用 hashValueSerializer 的序列化器,用于有效载荷 Map 中的每个值
注意:请确保检查正在使用的RedisSerializers,并注意,如果您决定不使用任何序列化程序,则需要确保这些值已经是二进制的。
StreamOperations 允许通过 ObjectRecord 将简单的值直接追加到流中,而不需要将这些值放到 Map 结构中。然后,该值将被分配到一个有效载荷字段,在读回该值时可以提取。例如:
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;
/**
* Spring Data Redis 的 Redis Stream 示例(简单值 Simple Value)
* @author hxstrive.com 2022/2/26
*/
@SpringBootTest
public class RedisStreamForSimpleValue {
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Test
public void contextLoads() {
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in("my-stream")
.ofObject("my-value");
// 向流添加一个记录,对应的 Redis 命令如下:
// XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"
redisTemplate.opsForStream().add(record);
List<ObjectRecord<String, String>> records = redisTemplate.opsForStream()
.read(String.class, StreamOffset.fromStart("my-stream"));
for(ObjectRecord<String, String> rec : records) {
System.out.println("id=" + rec.getId());
System.out.println("stream=" + rec.getStream());
System.out.println("value=" + rec.getValue());
}
}
}运行示例,输出结果如下:
id=1667279087680-0 stream=my-stream value=my-value
注意,ObjectRecord 通过与所有其他记录完全相同的序列化过程,因此也可以使用返回 MapRecord 的非类型化读取操作来获取 Record。
向流(Stream)中添加一个复杂值可以通过三种方式完成:
(1)将复杂值转换成简单值,如:使用 JSON 字符串表示
(2)使用合适的 RedisSerializer 序列化值
(3)使用 HashMapper 将值转换为适合序列化的 Map
第一个变体是最直接的变体,但忽略了流结构所提供的字段值功能,流中的值对其他消费者来说仍然是可读的。
第二种方案的好处与第一种方案相同,但可能会导致非常特殊的消费者限制,因为所有的消费者都必须实现非常相似的序列化机制。
第三种方案,HashMapper 方法是一种更复杂的方法,它使用了 Steams 的哈希结构,但对源代码进行了扁平化。只要选择了合适的序列化程序组合,其他用户仍然能够读取记录。
HashMappers 将有效载荷转换为具有特定类型的Map。请确保使用能够(去/反)序列化哈希的 Hash-Key 和 Hash-Value 序列化器。例如:
(1)用户实体类,代码如下:
/**
* 用户实体
* @author hxstrive.com 2022/7/5
*/
public class User {
/** 用户ID */
private int id;
/** 用户名 */
private String name;
public User() {}
public User(int id, String name) {
this.id = id;
this.name = name;
}
//...忽略 getter 和 setter 方法...
}(2)测试类,代码如下:
import com.hxstrive.redis.entity.User;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;
/**
* Spring Data Redis 的 Redis Stream 示例(复杂值 Complex Value)
* @author hxstrive.com 2022/2/26
*/
@SpringBootTest
public class RedisStreamForComplexValue {
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Test
public void contextLoads() {
ObjectRecord<String, User> record = StreamRecords.newRecord()
.in("user-logon")
.ofObject(new User(1000, "Tom"));
// 向流添加一个记录,对应的 Redis 命令如下:
// XADD user-logon * "_class" "com.hxstrive.redis.entity.User" "firstname" "night" "lastname" "angel"
redisTemplate.opsForStream()
.add(record);
List<ObjectRecord<String, User>> records = redisTemplate.opsForStream()
.read(User.class, StreamOffset.fromStart("user-logon"));
for(ObjectRecord<String, User> rec : records) {
System.out.println("id=" + rec.getId());
System.out.println("stream=" + rec.getStream());
System.out.println("value=" + rec.getValue());
}
}
}运行示例,输出结果如下:
id=1667279949643-0 stream=user-logon value=com.hxstrive.redis.entity.User@36224f93
默认情况下,StreamOperations 使用 ObjectHashMapper。获取 StreamOperations 时,您可以提供适合您要求的 HashMapper。例如:
redisTemplate.opsForStream(new Jackson2HashMapper(true)) // 对应的 Redis 命令如下: // XADD user-logon * "firstname" "night" "@class" "com.example.User" "lastname" "angel" .add(record);
StreamMessageListenerContainer 可能不知道域类型上使用的任何 @TypeAlias,因为这些类型需要通过 MappingContext 进行解析。确保使用 initialEntitySet() 初始化 RedisMappingContext。例如:
@Bean
RedisMappingContext redisMappingContext() {
RedisMappingContext ctx = new RedisMappingContext();
ctx.setInitialEntitySet(Collections.singleton(Person.class));
return ctx;
}
@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
return new MappingRedisConverter(mappingContext);
}
@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
return new ObjectHashMapper(converter);
}
@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
.objectMapper(hashMapper)
.build();
return StreamMessageListenerContainer.create(connectionFactory, options);
}