为zeromq提供持久性。
项目描述
persizmq
persizmq为zeromq提供持久性。消息在后台接收并在进一步处理之前存储在磁盘上。
目前,我们只支持zeromq订阅者。添加对其他类的支持可以很容易完成;我们只是到目前为止还没有需要它们。
用法
订阅者
持久订阅者包装了一个zeromq订阅者。我们将持久订阅拆分为两个组件:一个在后台监听消息的线程化订阅者和一个将消息存储在磁盘上的持久化组件。
线程化订阅者
线程化订阅者实现为persizmq.ThreadedSubscriber。您需要指定一个回调函数,该函数在接收到每条消息时被调用。
您还需要指定异常回调,以便在监听线程中处理抛出的异常。
示例
import time
import zmq
import persizmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")
def callback(msg: bytes)->None:
print("received a message: {}".format(msg))
def on_exception(exception: Exception)->None:
print("an exception was raised in the listening thread: {}".format(exception))
with persizmq.ThreadedSubscriber(callback=callback, subscriber=subscriber, on_exception=on_exception):
# do something while we are listening on messages...
time.sleep(10)
存储
我们为接收到的消息提供两种存储模式
persizmq.PersistentStorage:在磁盘上以FIFO队列的形式存储消息。
persizmq.PersistentLatestStorage:仅存储最新的消息在磁盘上。
存储组件直接作为回调传递给线程化订阅者。
示例
import pathlib
import zmq
import persizmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")
persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)
def on_exception(exception: Exception)->None:
print("an exception was raised in the listening thread: {}".format(exception))
with persizmq.ThreadedSubscriber(callback=storage.add_message, subscriber=subscriber, on_exception=on_exception):
msg = storage.front() # non-blocking
if msg is not None:
print("Received a persistent message: {}".format(msg))
storage.pop_front()
过滤
我们还提供可以链式添加到线程化订阅者的过滤组件。如果您只想持久化少量消息并忽略其余部分,过滤链尤其有用。
过滤器实现于persizmq.filter模块。
示例
import pathlib
import zmq
import persizmq
import persizmq.filter
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")
persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)
def on_exception(exception: Exception)->None:
print("an exception was raised in the listening thread: {}".format(exception))
with persizmq.ThreadedSubscriber(
lambda msg: storage.add_message(persizmq.filter.MaxSize(max_size=1000)(msg)),
subscriber=subscriber,
on_exception=on_exception):
msg = storage.front() # non-blocking
if msg is not None:
print("Received a persistent message: {}".format(msg))
storage.pop_front()
安装
创建一个虚拟环境
python3 -m venv venv3
激活它
source venv3/bin/activate
使用pip安装persizmq
pip3 install persizmq
开发
查看仓库。
在仓库根目录下创建虚拟环境
python3 -m venv venv3
激活虚拟环境
source venv3/bin/activate
安装开发依赖
pip3 install -e .[dev]
我们使用tox进行测试和打包发行版。假设虚拟环境已经激活且开发依赖已经安装,运行
tox
我们还提供了一套pre-commit检查,用于检查代码格式。在激活的虚拟环境中运行这些检查,其中包含开发依赖
./precommit.py
pre-commit脚本还可以自动格式化代码
./precommit.py --overwrite
版本控制
我们遵循语义版本控制。版本X.Y.Z表示
X是主版本(不向后兼容),
Y是次版本(向后兼容),
Z是补丁版本(向后兼容的错误修复)。
项目详情
关闭
persizmq-1.0.3.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 80967dfffd2a699a5cbf12bef481dde54e69ca7347e4c36f8ac04cd1421850fa |
|
MD5 | 75d8df058ed2138b0b7e01b305f8329b |
|
BLAKE2b-256 | 1426123c1bde346544ce70d0d28a1bd1fd823f10b5845ea2fabd0efbedffceaf |