场景

  • 不同进程间的指令传递
  • 不同编程语言间的消息传递;如 PHPPython 之间的服务互调

原理

  • 利用 Redisblpop 的阻塞特性等待消息然后消费
  • 用唯一的 SessionID 去标识某次消息,以应对并发场景

Python 版简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import redis
import uuid


class FCMessenger:

__redisDB = None
__sessionID = None

def __init__(self, host, port):
self.__redisDB = redis.StrictRedis(host=host, port=port, db=0)

def __send_api(self, api):
return 'fc:messenger:api:%s' % api

def __generate_session_id(self):
return 'fc:messenger:session:id:%s' % uuid.uuid1()

def __inner_wait_for_message(self, api, timeout=10):
item = self.__redisDB.blpop([api], timeout=timeout)
if item:
return item[1].decode()
return None

def wait_for_message(self, api, timeout=10, receipt=True):
uid = self.__inner_wait_for_message(self.__send_api(api), timeout)
content = self.__redisDB.get(uid)
if content:
content = content.decode()

self.__sessionID = uid

if receipt:
self.__redisDB.rpush(self.__send_api(self.__sessionID), 'OK')

return content

def send_receipt(self, content):
if self.__sessionID:
self.__redisDB.rpush(self.__send_api(self.__sessionID), content)
self.__sessionID = None

def send_message(self, api, content, waiting=True, timeout=10):
session_id = self.__generate_session_id()
self.__redisDB.set(session_id, content)
self.__redisDB.expire(session_id, 60)
self.__redisDB.rpush(self.__send_api(api), session_id)
if not waiting:
return
return self.__inner_wait_for_message(self.__send_api(session_id), timeout=timeout)

使用

1
2
3
4
5
6
# 接收者
while True:
messenger = FCMessenger(MyConfig.MSG_HOST, MyConfig.MSG_PORT)
msg = messenger.wait_for_message('test', 0, receipt=False)
# ...
messenger.send_receipt('Accepted')
1
2
3
# 发送者
messenger = FCMessenger(MyConfig.MSG_HOST, MyConfig.MSG_PORT)
content = messenger.send_message('test', 'xxxx')

Tips

  • 接收者通常为一个循环队列
  • 发送者发送消息后将充当一个临时的监听者等待回复
  • 对于一些耗时任务,不妨建立多个接收者监听并处理任务
  • 根据实际场景,选择收到消息后立即回显或自定义回显