标题:【Redis08】Redis基础:发布订阅Pub/Sub相关操作
Redis基础学习:发布订阅Pub/Sub相关操作
发布订阅的概念不知道大家有没有接触过,其实呀,简单点理解,就像是我写的这篇文章,发布出来之后大家都看到了,我就是一个发布者(或生产者),而各位看客您呢,那就是订阅者(或消费者)。
概念
在之前我们已经学习过,List 和 Sorted Set 可以做队列,但是它们的队列通常是 N 个生产一个消费,发布订阅模式则是 N 个生产 N 个消费。不管哪个客户端,向一个队列中发送了一条消息,N 个监听相关队列的客户端都可以收到这条消息,并且进行相应的业务处理,这就是发布订阅模式的典型应用场景。这个队列有另一个名称,叫做频道。我们要监听订阅的可以是多个频道,一旦进入监听频道的模式,客户端就会进入类似于 BLPOP 这样的阻塞模式,一直等待生产者的消息数据的到来。
再换一种说法的话,其实发布订阅模式就是一种广播机制。将数据像广播一样发散出去,只要你调得频率对,就可以收听到我们的广播。不过这也引出了一个问题,那就是当前没有订阅的话,就不会收到消息。而对于一个标准队列来说,如果消费者未启动,消息会堆积,当有消费者之后,会继续进行消费,所以,Pub/Sub 也并不算是一个完全的队列方案,只能说它是一个标准的广播系统。
操作
理解了概念之后,我们马上就订阅几个频道试试。
// 消费者客户端1
127.0.0.1:6379> SUBSCRIBE a b
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "a"
3) (integer) 1
1) "subscribe"
2) "b"
3) (integer) 2
// 消费者客户端2
127.0.0.1:6379> SUBSCRIBE a b c
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "a"
3) (integer) 1
1) "subscribe"
2) "b"
3) (integer) 2
1) "subscribe"
2) "c"
3) (integer) 3
上面分别是使用两个客户端,通过 SUBSCRIBE 命令监听需要订阅的频道。第一个客户端我们订阅了 a b 两个频道,第二个客户端订阅了 a b c 三个频道。
订阅成功后,返回的内容是按频道分组的信息,1) 是返回值类型,这里显示的都是订阅操作、2)很明显是频道名称、3)是目前已有的频道数量。
现在,这两个客户端都进入阻塞状态了,你可以试试在上面敲啥都不管用了,只能通过 ctrl+c 来关闭。
接下来,我们就再开一个客户端用于发送消息,也就是生产者客户端。
// 生产者客户端
127.0.0.1:6379> publish a 1
(integer) 2
使用 PUBLISH 向 a 频道发送了一条消息后,之前阻塞的两个消费者客户端马上就会显示出下面的内容,表示接收到的数据。
// 消费者客户端1、2
1) "message"
2) "a"
3) "1"
然后我们再向 c 频道发送数据,会发现只有 2 号客户端有响应,因为客户端1我们并没有订阅 c 这个频道。
// 生产者客户端2
127.0.0.1:6379> publish c 3
(integer) 1
// 消费者客户端2
1) "message"
2) "c"
3) "3"
简单吗?非常简单,主要的操作就是这些了。不过还有一个 UNSUBSCRIBE 命令是用于退出订阅的,但是 redis-cli 这个命令行客户端并不支持,你可以自己试试。后面我们演示 PHP 中使用发布订阅模式的时候再看来这个功能在 PHP 的 Redis 扩展中是如何起作用的。
模式匹配
上面的 SUBSCRIBE 是订阅指定名称的频道,不过还有一种情况,那就是我们可以按照一定的规则名称去订阅,不必强制只指定订阅某个固定的名字。
// 客户端1
127.0.0.1:6379> PSUBSCRIBE http.* socket.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "http.*"
3) (integer) 1
1) "psubscribe"
2) "socket.*"
3) (integer) 2
// 客户端2
127.0.0.1:6379> PSUBSCRIBE http.* ftp.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "http.*"
3) (integer) 1
1) "psubscribe"
2) "ftp.*"
3) (integer) 2
没错,使用的命令就是在 SUBSCRIBE 前加了个 P ,PSUBSCRIBE 后面的名字可以使用一些模式匹配符号,比如 *、?、[ab] 之类的,* 表示无限多个任意字符,? 表示一个任意字符,[ab]表示只能是 a 或者 b 。
然后我们就随便发了呗。
// 生产者客户端
127.0.0.1:6379> PUBLISH http.bar httpbar
(integer) 2
127.0.0.1:6379> PUBLISH http.dududu httpdududu
(integer) 2
127.0.0.1:6379> PUBLISH ftp.fff ftpfff
(integer) 1 // 只有客户端2响应
只要是符合 http.* 的频道名,两个客户端都能接收到信息。同样的,ftp.* 只有客户端2订阅了,所以也只有它会响应。
其它命令
看出来没有,其实发布订阅模式的主要命令就是三个,SUBSCRIBE、PSUBSCRIBE 和 PUBLISH 。除此之外,还提供了一个统计查询相关的命令 PUBSUB 。首先我们通过 PUBSUB HELP 看看它的使用方式。
127.0.0.1:6379> PUBSUB help
1) PUBSUB <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CHANNELS [<pattern>]
3) Return the currently active channels matching a <pattern> (default: '*').
4) NUMPAT
5) Return number of subscriptions to patterns.
6) NUMSUB [<channel> ...]
7) Return the number of subscribers for the specified channels, excluding
8) pattern subscriptions(default: no channels).
9) HELP
10) Prints this help.
PUBSUB CHANNELS 是返回当前已经被订阅的频道信息,后面也可以选择指定频道名,如果不指定,就是返回全部的。
127.0.0.1:6379> PUBSUB channels
1) "b"
2) "a"
注意,PUBSUB CHANNELS 只返回 SUBSCRIBE 订阅的频道信息,模式匹配订阅的频道信息这里是看不到的。接下来看看频道的监听客户端数量,使用的是 PUBSUB NUMSUB 。
127.0.0.1:6379> PUBSUB NUMSUB
(empty array)
127.0.0.1:6379> PUBSUB NUMSUB a
1) "a"
2) (integer) 2
127.0.0.1:6379> PUBSUB NUMSUB b
1) "b"
2) (integer) 2
127.0.0.1:6379> PUBSUB NUMSUB c
1) "c"
2) (integer) 1
127.0.0.1:6379> PUBSUB NUMSUB d
1) "d"
2) (integer) 0
PUBSUB NUMSUB 命令必须要指定频道名,如果不指定返回的就是一个空数组。我们可以看到 a 和 b 频道都有 2 个客户端在订阅,而 c 频道只有一个,d 频道没有人在监听。它同样也是仅支持 SUBSCRIBE 订阅的频道信息。那么模式频道就没有什么工具可以使用吗?有的,就一个 PUBSUB NUMPAT 。
127.0.0.1:6379> PUBSUB NUMPAT
(integer) 3
PUBSUB NUMPAT 只是返回模式订阅中订阅的频道数量,我们上面订阅的是 http.*、socket.*和 ftp.* ,因此,它返回的就是 3 ,不是客户端数量哦,就是去重的频道数量。
在 PHP 中使用
最后,我们再来看看 PUBSUB 模式在 PHP 中应该如何使用。
<?php
$redis = new Redis();
$redis->connect("localhost");
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // 要设置连接超时时间,要不一会就断了
$redis->subscribe(['a', 'b', 'c'], function($redis, $chan, $msg){
var_dump($redis);
if($chan == 'a'){
echo "a msg:" . $msg, PHP_EOL;
}
if($chan == 'b'){
echo "b msg:" . $msg, PHP_EOL;
}
if($chan == 'c'){
echo "c msg:" . $msg, PHP_EOL;
$redis->unsubscribe(['b']);
}
});
上面就是一个最简单的订阅操作。一般来说,这种操作都会放在命令行中通过脚本来运行,像下面这样。
php redis-pubsub.php
如果你想放在网页上运行的话,由于 http 的响应是需要程序运行完成才会输出的,但订阅是阻塞模式的,所以可能没法让你如愿实现这样的功能。当然,我们可以考虑别的方案,比如配合 websocket 进行输出。
好了,运行上面的命令之后我们就启动了一个客户端,接下来直接去 PUBLISH 一些数据吧,用 PHP 也行,直接 redis-cli 也行。
127.0.0.1:6379> PUBLISH a 1
(integer) 1
127.0.0.1:6379> PUBLISH b 2
(integer) 1
127.0.0.1:6379> PUBLISH c 3 // 退订了
(integer) 1
127.0.0.1:6379> PUBLISH b 22
(integer) 0 // 没有客户端处理了
客户端这边阻塞运行的脚本马上就显示出了内容。前面说过,在 redis-cli 中,UNSUBSCRIBE 没法用,但在 PHP 客户端这边是可以用的。我们在 PHP 中处理频道 c 的地方做了一个操作,那就是 UNSUBSCRIBE 掉对于频道 b 的订阅,之后再 PUBLISH b 的话客户端就不会再接收到任何消息了。
➜ learn-laravel git:(main) ✗ php redis-pubsub.php
object(Redis)#1 (0) {
}
a msg:1
object(Redis)#1 (0) {
}
b msg:2
object(Redis)#1 (0) {
}
c msg:3
总结
简单吗?这还不简单,总共加起来才几个命令,最核心的其实就三个。不过,要找应用场景才是最主要的,这里我就举一个最简单的场景。电商系统,当用户下订单之后,要干嘛?是不是要通知商家、要通知平台,还有可能还要给这个用户自己也发一条消息,同时一些后台脚本可能也要开始跟进处理这个订单。想想这个时候,有发布订阅这套机制,大家同时订阅监听一个订单频道,下订单成功后直接将订单信息发布到这个频道,然后不同的系统去做不同的后续操作就好啦!当然,这个功能用队列会更好一些,因为怕广播出现问题有的客户端没有操作,未订阅的消息就丢失了。而且它需要一直保持连接,需要单独占用一个 Redis 连接,因此,它在日常使用中其实并不常见,大家了解一下就好。如果有合适的应用场景(可以考虑的:聊天、消息推送、视频弹幕等),记得咱们 Redis 有这么个功能就好了。
视频链接
微信文章地址:https://mp.weixin.qq.com/s/npuMXiZdPM52ZPf5jmckPw