redis发布与订阅

redis可以发布与订阅频道,有两种订阅方式,一种是具体频道,一种是模式匹配频道。
对于具体频道,redis采用了字典来保存具体频道与订阅客户端的数据,key为频道,value为订阅的客户端,采用链表的形式保存。对于模式频道,redis直接采用链表的方式来保存模式客户端。每当有消息发布,redis根据消息所属的频道来通过字典找到客户端链表,遍历链表,发送消息。接着遍历模式链表,查找匹配的模式频道,发送消息给客户端。
具体的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/* Publish a message
*
* 将 message 发送到所有订阅频道 channel 的客户端,
* 以及所有订阅了和 channel 频道匹配的模式的客户端。
*/

int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;

/* Send to clients listening for that channel */
// 取出包含所有订阅频道 channel 的客户端的链表
// 并将消息发送给它们
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;

// 遍历客户端链表,将 message 发送给它们
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;

// 回复客户端。
// 示例:
// 1) "message"
// 2) "xxx"
// 3) "hello"
addReply(c,shared.mbulkhdr[3]);
// "message" 字符串
addReply(c,shared.messagebulk);
// 消息的来源频道
addReplyBulk(c,channel);
// 消息内容
addReplyBulk(c,message);

// 接收客户端计数
receivers++;
}
}

/* Send to clients listening to matching channels */
// 将消息也发送给那些和频道匹配的模式
if (listLength(server.pubsub_patterns)) {

// 遍历模式链表
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {

// 取出 pubsubPattern
pubsubPattern *pat = ln->value;

// 如果 channel 和 pattern 匹配
// 就给所有订阅该 pattern 的客户端发送消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {

// 回复客户端
// 示例:
// 1) "pmessage"
// 2) "*"
// 3) "xxx"
// 4) "hello"
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);

// 对接收消息的客户端进行计数
receivers++;
}
}

decrRefCount(channel);
}

// 返回计数
return receivers;
}