Jedis 发布订阅

发布订阅(pub/sub)是一种消息范式,消息的发送者(称为发布者 publish)不会将消息直接发送给特定的接收者(称为订阅者 subscribe)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。本文将介绍怎样通过 jedis 实现消息发布订阅。

JedisPubSub

redis.clients.jedis.JedisPubSub 是一个抽象类,该类用来作为订阅的参数进行传递,当某类事件触发时,将触发该类的某个方法。方法定义如下:

public void onMessage(String channel, String message)  基于频道,当收到消息时,触发该方法

public void onSubscribe(String channel, int subscribedChannels)  基于频道,初始化订阅时触发

public void onUnsubscribe(String channel, int subscribedChannels)  基于频道,取消订阅时触发

public void onPMessage(String pattern, String channel, String message)  基于模式,当收到消息时,触发该方法

public void onPSubscribe(String pattern, int subscribedChannels)  基于模式,初始化订阅时触发

public void onPUnsubscribe(String pattern, int subscribedChannels)  基于模式,取消订阅时触发

public void onPong(String pattern)  收到 ping 消息时触发

基于频道

“发布/订阅”包含两种角色:发布者和订阅者。发布者可以向指定的频道(channel)发送消息。订阅者可以订阅一个或者多个频道(channel),所有订阅此频道的订阅者都会收到此消息。

下面是订阅和发布消息方法定义:

  • void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels)  订阅一个或多个频道,回调需要实现 BinaryJedisPubSub 抽象类

  • void subscribe(JedisPubSub jedisPubSub, String... channels) 订阅一个或多个频道,回调需实现 JedisPubSub 抽象类

  • long publish(byte[] channel, byte[] message)  发布字节数组消息到给定的频道

  • long publish(String channel, String message)  发布字符串消息到给定频道

示例:

package com.hxstrive.redis.demo.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

/**
* Redis 发布/订阅测试
* @author hxstrive.com
*/
public class PubSubTest {

   public static void main(String[] args) {
       server(); // 启动服务端,订阅频道
       client(); // 启动客户端,发布消息
   }

   private static void server() {
       new Thread(new Runnable() {
           @Override
           public void run() {
               JedisPool jedisPool = new JedisPool("127.0.01", 6379);
               Jedis jedis = jedisPool.getResource();
               // 订阅名为 myChannel 频道
               jedis.subscribe(new JedisPubSub() {
                   @Override
                   public void onMessage(String channel, String message) {
                       // 输出频道和消息
                       System.out.println("channel = " + channel + ", message = " + message);
                   }
               }, "myChannel");
           }
       }).start();
   }

   private static void client() {
       try {
           JedisPool jedisPool = new JedisPool("127.0.01", 6379);
           Jedis jedis = jedisPool.getResource();
           for(int i = 0; i < 5; i++) {
               Thread.sleep(500);
               // 发布一个消息
               jedis.publish("myChannel", "hello! jedis");
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

}

运行程序,输出如下:

channel = myChannel, message = hello! jedis
channel = myChannel, message = hello! jedis
channel = myChannel, message = hello! jedis
channel = myChannel, message = hello! jedis
channel = myChannel, message = hello! jedis

基于模式

如果有某个/某些模式和该频道匹配,所有订阅这个/这些频道的客户端也同样会收到信息。方法定义如下:

  • void psubscribe(BinaryJedisPubSub jedisPubSub, byte[]... patterns)  订阅一个或多个模式,接收所有和这些模式匹配频道中的消息,回调需要实现 BinaryJedisPubSub 抽象类

  • void psubscribe(JedisPubSub jedisPubSub, String... patterns)  订阅一个或多个模式,接收所有和这些模式匹配频道中的消息,回调需实现 JedisPubSub 抽象类

示例:

package com.hxstrive.redis.demo.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

/**
* Redis 发布/订阅测试,基于模式订阅
* @author hxstrive.com 2023/3/27
*/
public class PubSubTest2 {

   public static void main(String[] args) {
       server(); // 启动服务端,订阅频道
       client(); // 启动客户端,发布消息
   }

   private static void server() {
       new Thread(new Runnable() {
           @Override
           public void run() {
               JedisPool jedisPool = new JedisPool("127.0.01", 6379);
               Jedis jedis = jedisPool.getResource();
               // 订阅频道名称匹配 *.hxstrive.com 模式的频道
               jedis.psubscribe(new JedisPubSub() {
                   @Override
                   public void onPMessage(String pattern, String channel, String message) {
                       // 输出频道和消息
                       System.out.println("pattern = " + pattern + ", channel = " + channel + ", message = " + message);
                   }
               }, "*.hxstrive.com");
           }
       }).start();
   }

   private static void client() {
       JedisPool jedisPool = new JedisPool("127.0.01", 6379);
       Jedis jedis = jedisPool.getResource();

       // 发布多个消息
       try {
           Thread.sleep(100);
           jedis.publish("www.hxstrive.com", "hi, www.hxstrive.com");

           Thread.sleep(100);
           jedis.publish("doc.hxstrive.com", "hi, doc.hxstrive.com");

           Thread.sleep(100);
           jedis.publish("sms.hxstrive.com", "hi, sms.hxstrive.com");

           Thread.sleep(100);
           jedis.publish("www.inlive365.com", "hi, www.inlive365.com");
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

}

运行程序,输出如下:

pattern = *.hxstrive.com, channel = www.hxstrive.com, message = hi, www.hxstrive.com
pattern = *.hxstrive.com, channel = doc.hxstrive.com, message = hi, doc.hxstrive.com
pattern = *.hxstrive.com, channel = sms.hxstrive.com, message = hi, sms.hxstrive.com
说说我的看法
全部评论(
没有评论
关于
本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,请来信告知:hxstrive@outlook.com
公众号