Postgres的LISTEN和NOTIFY功能的便捷Python接口。
项目描述
pgpubsub提供对PostgreSQL数据库中内置的事件通知系统的便捷访问。这提供了一个类似于Redis中的实时Pub/Sub系统。
用法
首先您需要建立一个连接
import pgpubsub pubsub = pgpubsub.connect(user='postgres', database='test')
pgpubsub.connect()函数接受的参数与psycopg2.connect()函数支持的参数相同。
发送事件
要发送一个事件,请使用pubsub.notify()方法
pubsub.notify('test_channel', 'some message')
接收事件
要接收事件,您必须首先使用pubsub.listen()方法订阅特定的频道
pubsub.listen('test')
您可以多次调用pubsub.listen()来接收来自多个频道的事件
pubsub.listen('chan1') pubsub.listen('chan2')
注意:频道名称必须是有效的SQL标识符。以下摘自Postgres文档:
SQL标识符和关键字必须以字母(a-z,但还包括带重音符号的字母和非拉丁字母)或下划线(_)开头。标识符或关键字后面的字符可以是字母、下划线、数字(0-9)或美元符号($)。
警告
由于频道是SQL标识符而不是字符串,它们不能像字符串那样被引号/转义。从不受信任的用户输入构建频道名称是不安全的。
不要这样做
channel = 'events_' + username pubsub.listen(channel)
如果您这样做,那么整个数据库可能会被用户名为“; DROP TABLE users;”的人破坏。强制XKCD。
一旦您已订阅了一个或多个频道,您可以选择通过迭代pubsub.events()或通过重复调用pubsub.get_event()方法来接收事件。
pubsub.events()
这是一个对pubsub上来的事件流的生成器。它允许您像列表一样遍历事件
for e in pubsub.events(): print e.payload
在幕后,pubsub在等待标准库的
select_timeout: 在放弃并再次尝试之前等待select.select的秒数。默认为5。
yield_timeouts: 默认为False。如果设置为True,则pubsub.events()将在等待select_timeout秒后发送一个None。这在WebSockets等需要发送keepalive消息,即使没有接收到新数据也很有用。
for e in pubsub.events(yield_timeouts=True): if e is None: send_websocket_ping() else: send_websocket_message(e.payload)
pubsub.get_event()
该方法始终立即返回。如果已接收到事件,则返回该事件。如果没有接收到事件,则返回None。
如果已接收到多个事件并排队等待,则重复的get_event()调用将一直返回下一个事件,直到没有事件为止并返回None。
>>> pubsub.listen('test') >>> pubsub.get_event() # Nothing delivered yet, so returns None >>> pubsub.notify('test', 'message 1') >>> pubsub.notify('test', 'message 2') >>> pubsub.get_event() Notify(9425, 'test', 'message 1') >>> pubsub.get_event() Notify(9425, 'test', 'message 2') >>> pubsub.get_event() # No more messages, so returns None
pubsub.get_event()方法旨在集成到事件循环中,如果在pubsub.events()上阻塞会导致问题。
取消订阅
如果您想停止接收您当前订阅的某个通道上的事件,可以调用pubsub.unlisten()。
pubsub.unlisten('channel2')
事件对象
pubsub.events()和pubsub.get_event()返回的事件对象是psycopg2的Notify类的实例。它们有三个可能有趣的属性
payload: 包含实际消息的字符串。
channel: 接收到事件的通道的名称。
pid: 在Postgres服务器上处理发送者连接的进程的pid。这可以防止在同时发送和接收事件的程序中产生无限循环。
my_pid = pubsub.conn.get_backend_pid() pubsub.listen('echo') for e in pubsub.events(): sender_pid = e.pid if sender_pid != my_pid: pubsub.notify('echo', e.payload)
Q & A
在线程之间传递pubsub对象是否安全?
不。
为什么使用动词“notify”和“listen”而不是“publish”和“subscribe”?
pgpubsub中的方法旨在尽可能类似于Postgres中的实际SQL命令,即NOTIFY和LISTEN。Postgres文档也提到“通知事件”而不是“消息”,因此pgpubsub使用相同的术语。
为什么没有回调式接口?
如果有需求并且有一个合理的规范,将来可能会有。
项目详情
pgpubsub-0.0.5.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 7f34ba700f2802537564a5329f20723772b99dfbf28d94856228b21e0af3c9a6 |
|
MD5 | 8a1e57fdcd50cd17fa970a921a057cc6 |
|
BLAKE2b-256 | ba80b36c9275c7197fa66733761a411f66d92086d524acbab08f56858f561fa8 |