Spring Data Redis 教程

ClusterOperations 操作接口

Spring Data Redis 中的 ClusterOperations 操作接口针对特定于群集的操作的 Redis 操作。RedisClusterNode 可以从连接中获得,也可以使用 host 和 RedisNode.getPort() 或节点 ID 来构造。

准备工作

由于 ClusterOperations 操作接口用来操作 Redis 集群,在正式介绍 ClusterOperations 具体用法前,我们需要先搭建一个 Redis 集群环境,为了测试方便,直接在本地 Windows 11 中搭建伪集群。集群如下:

127.0.0.1:6379
127.0.0.1:6378
127.0.0.1:6377
127.0.0.1:6376
127.0.0.1:6375
127.0.0.1:6374

该 Redis 集群总共有6个节点,3 个 Master,3 个 Slave。

修改配置

Redis 集群模式和单机模式的配置文件有一定的区别,将简单的集群配置写入到 application.yml 文件,配置内容如下:

# Redis 集群配置
spring:
  redis:
    database: 1
    cluster:
      nodes: 127.0.0.1:6379,127.0.0.1:6378,127.0.0.1:6377,127.0.0.1:6376,127.0.0.1:6375,127.0.0.1:6374

后台持久化

使用 bgSave() 方法可以对集群中指定的节点手动触发持久化,方法定义如下:

  • void bgSave(RedisClusterNode node) 在给定节点上开始后台保存数据。

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374);
ops.bgSave(node);

执行上面代码后,观察 127.0.0.1:6374 节点控制台,控制台会输出如下信息:

...
[11384] 24 Aug 13:05:00.304 # fork operation complete
[11384] 24 Aug 13:05:00.326 * Background saving terminated with success

同步保存数据库快照

  • void save(RedisClusterNode node) 在服务器上同步保存当前数据库快照。

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374);
ops.save(node);

执行上面代码后,观察 127.0.0.1:6374 节点控制台,控制台会输出如下信息:

...
[11384] 24 Aug 13:26:45.044 * DB saved on disk

数据库刷新

使用 flushDb() 方法可以手动刷新数据库,方法定义如下:

  • void flushDb(RedisClusterNode node) 在节点上刷新数据库。

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374);
ops.flushDb(node);

上面代码手动刷新了 127.0.0.1:6374 节点。

获取指定所有Key

使用 keys() 方法可以从指定节点获取键集合,方法定义如下:

  • Set<K> keys(RedisClusterNode node, K pattern) 获取位于给定节点中匹配 pattern 的所有键。

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
// 写入数据
ValueOperations<String,String> valueOps = redisTemplate.opsForValue();
valueOps.set("key1", "value1");
valueOps.set("key2", "value2");
valueOps.set("key3", "value3");
valueOps.set("key4", "value4");

// 获取每个节点所有的缓存 key
for(int i = 6374; i <= 6379; i++) {
    RedisClusterNode node = new RedisClusterNode("127.0.0.1", i);
    Set<String> keys = ops.keys(node, "*");
    System.out.println(node + " ==> " + Arrays.toString(keys.toArray()));
}

运行示例,输出结果如下:

127.0.0.1:6374 ==> [key3, key2]
127.0.0.1:6375 ==> [key1]
127.0.0.1:6376 ==> [key4]
127.0.0.1:6377 ==> [key4]
127.0.0.1:6378 ==> [key2, key3]
127.0.0.1:6379 ==> [key1]

Ping 节点

使用 ping() 方法可以 Ping 指定的节点,查看节点状态,方法定义如下:

  • String ping(RedisClusterNode node) Ping 给定的节点

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
// ping 每一个节点
for(int i = 6374; i <= 6379; i++) {
    RedisClusterNode node = new RedisClusterNode("127.0.0.1", i);
    String ping = ops.ping(node);
    System.out.println(node + " ==> " + ping);
}

运行示例,输出结果如下:

127.0.0.1:6374 ==> PONG
127.0.0.1:6375 ==> PONG
127.0.0.1:6376 ==> PONG
127.0.0.1:6377 ==> PONG
127.0.0.1:6378 ==> PONG
127.0.0.1:6379 ==> PONG

注意:如果能够正常 Ping 通,则返回 “PONG” 字符串。如果不能 Ping 通,则抛出 “Unable to connect to 127.0.0.1:6378” 错误。

随机返回键

使用 randomKey() 方法我们可以从指定节点中随机返回一个键,方法定义如下:

  • K randomKey(RedisClusterNode node) 从给定节点所服务的范围中获取随机密钥。

示例:

// 准备数据
ValueOperations<String,String> valueOps = redisTemplate.opsForValue();
for(int i = 0; i < 20; i++) {
    valueOps.set("key" + i, "value" + i);
}

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
// 从每个节点随机返回一个键
for(int i = 6374; i <= 6379; i++) {
    RedisClusterNode node = new RedisClusterNode("127.0.0.1", i);
    String key = ops.randomKey(node);
    System.out.println(node + " ==> " + key);
}

运行示例,输出结果如下:

127.0.0.1:6374 ==> key7
127.0.0.1:6375 ==> key10
127.0.0.1:6376 ==> key16
127.0.0.1:6377 ==> key0
127.0.0.1:6378 ==> key3
127.0.0.1:6379 ==> key14

关闭节点

使用 shutdown() 方法可以手动关闭指定节点,方法定义如下:

  • void shutdown(RedisClusterNode node) 关闭给定的节点

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
// 从每个节点随机返回一个键
for(int i = 6374; i <= 6379; i++) {
    RedisClusterNode node = new RedisClusterNode("127.0.0.1", i);
    ops.shutdown(node);
    System.out.println(node + " ==> shutdown");
}

运行示例,输出结果如下:

127.0.0.1:6374 ==> shutdown
127.0.0.1:6375 ==> shutdown
127.0.0.1:6376 ==> shutdown
127.0.0.1:6377 ==> shutdown
127.0.0.1:6378 ==> shutdown
127.0.0.1:6379 ==> shutdown

执行上面代码后,观察所有节点控制台,控制台会输出如下信息:

...
[12752] 24 Aug 13:40:39.312 # User requested shutdown...
[12752] 24 Aug 13:40:39.312 # There is a child saving an .rdb. Killing it!
[12752] 24 Aug 13:40:39.359 * Saving the final RDB snapshot before exiting.
[12752] 24 Aug 13:40:39.437 * DB saved on disk
[12752] 24 Aug 13:40:39.453 # Redis is now ready to exit, bye bye...

移除和添加节点

使用 meet() 方法可以将一个节点添加到 Redis 集群,forget() 方法将指定节点从 Redis 集群中移除。方法定义如下:

  • void forget(RedisClusterNode node) 从集群中移除指定的节点

  • void meet(RedisClusterNode node) 将指定的节点添加到集群中

  • Collection<RedisClusterNode> getSlaves(RedisClusterNode node) 获取节点对应的所有 Slave 节点

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
// 选择一个 Slave 节点
RedisClusterNode slaveNode = null;
for(int i = 6374; i <= 6379; i++) {
    try {
        RedisClusterNode node = new RedisClusterNode("127.0.0.1", i);
        Collection<RedisClusterNode> slaves = ops.getSlaves(node);
        for (RedisClusterNode slave : slaves) {
            System.out.println("slave ==> " + slave);
            if (null == slaveNode) {
                slaveNode = slave;
            }
        }
    } catch (RedisSystemException e) {}
}

if(null == slaveNode) {
    System.err.println("很抱歉,没有 Slave 节点!");
    return;
}

// 将节点从集群移除
ops.forget(slaveNode);
System.out.println("从集群移除节点:" + slaveNode);

// 将节点添加到集群
ops.meet(slaveNode);
System.out.println("添加节点到集群:" + slaveNode);

运行示例,输出结果如下:

slave ==> 127.0.0.1:6378
slave ==> 127.0.0.1:6379
slave ==> 127.0.0.1:6377
从集群移除节点:127.0.0.1:6378
添加节点到集群:127.0.0.1:6378

注意:如果我们调用 forget() 或者 meet() 方法时,传递的节点不是 slave 节点,则会抛出如下错误信息:

Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR Can't forget my master!; nested exception is org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR Can't forget my master! 不能移除 master 类型节点

添加插槽(Slot)

一个 Redis 集群包含 16384 个插槽(hash slot),数据库中的每个键都属于这 16384 个插槽的其中一个,集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽,其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和,集群中的每个节点负责处理一部分插槽。

Spring Data Redis 中,可以使用 addSlots() 方法添加指定插槽到某个节点,方法定义如下:

  • void addSlots(RedisClusterNode node, int... slots) 将给定的插槽添加到节点

  • void addSlots(RedisClusterNode node, RedisClusterNode.SlotRange range) 将 RedisClusterNode.SlotRange 中的插槽添加到给定节点

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
RedisClusterNode node = new RedisClusterNode("127.0.0.1", 6374);
// 添加一个或多个卡槽到 Redis
ops.addSlots(node, 1);

// 将一个范围内的卡槽添加到 Redis
RedisClusterNode.SlotRange slotRange = 
        new RedisClusterNode.SlotRange(100, 200);
ops.addSlots(node, slotRange);

移动插槽

使用 reshard() 方法可以将指定插槽以及插槽下面的键移动到指定的节点。方法定义如下:

  • void reshard(RedisClusterNode source, int slot, RedisClusterNode target) 将插槽分配从一个源移到目标节点,并复制与该插槽关联的关键字

示例:

ClusterOperations<String,String> ops = redisTemplate.opsForCluster();
RedisClusterNode node1 = new RedisClusterNode("127.0.0.1", 6374);
RedisClusterNode node2 = new RedisClusterNode("127.0.0.1", 6379);
ops.reshard(node1, 1, node2);

注意:进行卡槽移动必须在 Master 节点上面进行,否则抛出如下错误:

org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR Please use SETSLOT only with masters.

手动开始 AOF(Append-Only-File)

AOF 是 Redis 的另一种持久化的方式。它是基于文本形式的,追加写入命令的方式来做持久化的。这个有些类似 Mysql 中的 binlog,它的持久化记录的粒度比 RDB 要更好,所以生产环境中一般会开启 AOF 持久化的。当然,在 Redis4 之后的版本,rdb 和 aof 是可以开启混合模式的。

在 Spring Data Redis 中,使用 bgReWriteAof() 方法手动执行 AOF 写入。方法定义如下:

  • void bgReWriteAof(RedisClusterNode node) 在给定节点上启动仅附加文件重写过程。


说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号