Spring Data Redis 教程

Redis 管道

Redis 提供了对管道(Pipeline)的支持,这涉及将多个命令发送到服务器,而无需等待服务器回复,然后在单个步骤中读取回复。当您需要在一行中发送多个命令时,管道可以提高性能,例如:向同一列表(List)中添加许多元素。

Spring Data Redis 提供了几种用于在管道中运行命令的 RedisTemplate 方法。如果您不关心管道操作的结果,可以使用标准的 execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) 方法,将 pipeline 参数设置为 true。

或者使用 executePipelined(RedisCallback<?> action) 或 executePipelined(SessionCallback<?> session) 方法,在回调接口的回调方法中进行管道操作。例如:

// 从队列(Queue)中弹出(Pop)指定数量的项
List<Object> results = stringRedisTemplate.executePipelined(
    new RedisCallback<Object>() {
        public Object doInRedis(RedisConnection connection) throws DataAccessException {
            StringRedisConnection stringRedisConn = (StringRedisConnection)connection;
            for(int i=0; i< batchSize; i++) {
                stringRedisConn.rPop("myqueue");
            }
            return null;
        }
    });

前面的示例从管道中的队列中运行大量右弹(rPop,right pop)出项,结果列表(results 变量)包含所有弹出的项目。RedisTemplate 使用 key、hash-key 和 hash-value 的序列化器在返回之前对所有结果进行反序列化,因此前面的示例中返回的项是 String。还有其他 executePipelined 方法,允许您为流水线结果传递自定义序列化程序。如下:

  • List<Object> executePipelined(RedisCallback<?> action, RedisSerializer<?> resultSerializer)

  • List<Object> executePipelined(SessionCallback<?> session, RedisSerializer<?> resultSerializer)

请注意,RedisCallback 返回的值必须为空,因为为了返回管道命令的结果,该值被丢弃。如果 RedisCallback 的返回值不为 null,则抛出如下错误信息 “org.springframework.dao.InvalidDataAccessApiUsageException: Callback cannot return a non-null value as it gets overwritten by the pipeline”。

Lettuce 驱动程序支持更细粒度的刷新控制,允许在命令出现时刷新命令、缓冲或在连接关闭时发送命令。例如:

// Lettuce 驱动链接
LettuceConnectionFactory factory = //...
// 设置管道刷新策略:本地缓冲区在每3个命令后刷新
factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3));

提示:

从 1.1 版开始,RedisConnection 和 RedisTemplate 的 exec 方法进行了重要更改。以前,这些方法直接从连接器返回事务的结果。这意味着数据类型通常与 RedisConnection 方法返回的数据类型不同。例如,zAdd 返回一个布尔值,指示元素是否已添加到排序集。大多数连接器将此值作为 long 值返回,Spring Data Redis 执行转换。另一个常见的区别是,大多数连接器都会为诸如 set 之类的操作返回状态回复(通常是字符串 OK)。这些回复通常被 Spring Data Redis 丢弃。

在 1.1 之前,这些转换不会对 exec() 的结果执行。此外,结果没有在 RedisTemplate 中反序列化,因此它们通常包括原始字节数组。如果此更改破坏了应用程序,请在 RedisConnectionFactory 上将 convertPipelineAndTxResults() 设置为 false 以禁用此行为。

示例

使用管道命令从 myqueue 列表中 rPop 多个元素,如下:

package com.hxstrive.redis.pipeline;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.StringRedisConnection;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.Arrays;
import java.util.List;
import java.util.Set;

/**
 * Spring Data Redis 管道
 * @author hxstrive.com 2022/2/26
 */
@SpringBootTest
public class PipelineDemo {

    @Autowired
    private RedisTemplate<String,String> redisTemplate;

    @Test
    public void contextLoads() {
        // 向列表写入 50 个数据
        ListOperations<String,String> ops = redisTemplate.opsForList();
        for(int i = 0; i < 50; i ++) {
            ops.leftPush("myqueue", "value-" + i);
        }

        // 使用管道读取数据
        List<Object> results = redisTemplate.executePipelined(
            new RedisCallback<Object>() {
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    for(int i=0; i< 10; i++) {
                        connection.rPop("myqueue".getBytes());
                    }
                    // 必须返回 null,否则抛出异常
                    return null;
                }
            }
        );
        System.out.println(Arrays.toString(results.toArray()));
    }

}

运行结果如下:

[value-0, value-1, value-2, value-3, value-4, value-5, value-6, value-7, value-8, value-9]
说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号