跳转到主要内容

Postgres的LISTEN和NOTIFY功能的便捷Python接口。

项目描述

pgpubsub提供对PostgreSQL数据库中内置的事件通知系统的便捷访问。这提供了一个类似于Redis中的实时Pub/Sub系统。

用法

首先您需要建立一个连接

import pgpubsub
pubsub = pgpubsub.connect(user='postgres', database='test')

pgpubsub.connect()函数接受的参数与psycopg2.connect()函数支持的参数相同。

发送事件

要发送一个事件,请使用pubsub.notify()方法

pubsub.notify('test_channel', 'some message')

接收事件

要接收事件,您必须首先使用pubsub.listen()方法订阅特定的频道

pubsub.listen('test')

您可以多次调用pubsub.listen()来接收来自多个频道的事件

pubsub.listen('chan1')
pubsub.listen('chan2')

注意:频道名称必须是有效的SQL标识符。以下摘自Postgres文档:

SQL标识符和关键字必须以字母(a-z,但还包括带重音符号的字母和非拉丁字母)或下划线(_)开头。标识符或关键字后面的字符可以是字母、下划线、数字(0-9)或美元符号($)。

警告

由于频道是SQL标识符而不是字符串,它们不能像字符串那样被引号/转义。从不受信任的用户输入构建频道名称是不安全的。

不要这样做

channel = 'events_' + username
pubsub.listen(channel)

如果您这样做,那么整个数据库可能会被用户名为“; DROP TABLE users;”的人破坏。强制XKCD

一旦您已订阅了一个或多个频道,您可以选择通过迭代pubsub.events()或通过重复调用pubsub.get_event()方法来接收事件。

pubsub.events()

这是一个对pubsub上来的事件流的生成器。它允许您像列表一样遍历事件

for e in pubsub.events():
    print e.payload

在幕后,pubsub在等待标准库的函数时是阻塞的。您可以为pubsub.events()提供两个额外的参数来控制等待select.select时的超时处理

  • select_timeout: 在放弃并再次尝试之前等待select.select的秒数。默认为5。

  • yield_timeouts: 默认为False。如果设置为True,则pubsub.events()将在等待select_timeout秒后发送一个None。这在WebSockets等需要发送keepalive消息,即使没有接收到新数据也很有用。

    for e in pubsub.events(yield_timeouts=True):
        if e is None:
            send_websocket_ping()
        else:
            send_websocket_message(e.payload)

pubsub.get_event()

该方法始终立即返回。如果已接收到事件,则返回该事件。如果没有接收到事件,则返回None。

如果已接收到多个事件并排队等待,则重复的get_event()调用将一直返回下一个事件,直到没有事件为止并返回None。

>>> pubsub.listen('test')
>>> pubsub.get_event() # Nothing delivered yet, so returns None
>>> pubsub.notify('test', 'message 1')
>>> pubsub.notify('test', 'message 2')
>>> pubsub.get_event()
Notify(9425, 'test', 'message 1')
>>> pubsub.get_event()
Notify(9425, 'test', 'message 2')
>>> pubsub.get_event() # No more messages, so returns None

pubsub.get_event()方法旨在集成到事件循环中,如果在pubsub.events()上阻塞会导致问题。

取消订阅

如果您想停止接收您当前订阅的某个通道上的事件,可以调用pubsub.unlisten()。

pubsub.unlisten('channel2')

事件对象

pubsub.events()和pubsub.get_event()返回的事件对象是psycopg2的Notify类的实例。它们有三个可能有趣的属性

  • payload: 包含实际消息的字符串。

  • channel: 接收到事件的通道的名称。

  • pid: 在Postgres服务器上处理发送者连接的进程的pid。这可以防止在同时发送和接收事件的程序中产生无限循环。

    my_pid = pubsub.conn.get_backend_pid()
    pubsub.listen('echo')
    for e in pubsub.events():
        sender_pid = e.pid
        if sender_pid != my_pid:
            pubsub.notify('echo', e.payload)

Q & A

在线程之间传递pubsub对象是否安全?

不。

为什么使用动词“notify”和“listen”而不是“publish”和“subscribe”?

pgpubsub中的方法旨在尽可能类似于Postgres中的实际SQL命令,即NOTIFYLISTEN。Postgres文档也提到“通知事件”而不是“消息”,因此pgpubsub使用相同的术语。

为什么没有回调式接口?

如果有需求并且有一个合理的规范,将来可能会有。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

pgpubsub-0.0.5.tar.gz (5.2 kB 查看哈希值)

上传时间

支持者:

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面