跳转到主要内容

为zeromq提供持久性。

项目描述

persizmq

persizmq为zeromq提供持久性。消息在后台接收并在进一步处理之前存储在磁盘上。

目前,我们只支持zeromq订阅者。添加对其他类的支持可以很容易完成;我们只是到目前为止还没有需要它们。

用法

订阅者

持久订阅者包装了一个zeromq订阅者。我们将持久订阅拆分为两个组件:一个在后台监听消息的线程化订阅者和一个将消息存储在磁盘上的持久化组件。

线程化订阅者

线程化订阅者实现为persizmq.ThreadedSubscriber。您需要指定一个回调函数,该函数在接收到每条消息时被调用。

您还需要指定异常回调,以便在监听线程中处理抛出的异常。

示例

import time

import zmq

import persizmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")

def callback(msg: bytes)->None:
    print("received a message: {}".format(msg))

def on_exception(exception: Exception)->None:
    print("an exception was raised in the listening thread: {}".format(exception))

with persizmq.ThreadedSubscriber(callback=callback, subscriber=subscriber, on_exception=on_exception):
    # do something while we are listening on messages...
    time.sleep(10)

存储

我们为接收到的消息提供两种存储模式

  1. persizmq.PersistentStorage:在磁盘上以FIFO队列的形式存储消息。

  2. persizmq.PersistentLatestStorage:仅存储最新的消息在磁盘上。

存储组件直接作为回调传递给线程化订阅者。

示例

import pathlib

import zmq

import persizmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")

persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)

def on_exception(exception: Exception)->None:
    print("an exception was raised in the listening thread: {}".format(exception))

with persizmq.ThreadedSubscriber(callback=storage.add_message, subscriber=subscriber, on_exception=on_exception):
    msg = storage.front()  # non-blocking
    if msg is not None:
        print("Received a persistent message: {}".format(msg))
        storage.pop_front()

过滤

我们还提供可以链式添加到线程化订阅者的过滤组件。如果您只想持久化少量消息并忽略其余部分,过滤链尤其有用。

过滤器实现于persizmq.filter模块。

示例

import pathlib

import zmq

import persizmq
import persizmq.filter

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")

persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)

def on_exception(exception: Exception)->None:
    print("an exception was raised in the listening thread: {}".format(exception))

with persizmq.ThreadedSubscriber(
    lambda msg: storage.add_message(persizmq.filter.MaxSize(max_size=1000)(msg)),
    subscriber=subscriber,
    on_exception=on_exception):

    msg = storage.front()  # non-blocking
    if msg is not None:
        print("Received a persistent message: {}".format(msg))
        storage.pop_front()

安装

  • 创建一个虚拟环境

python3 -m venv venv3
  • 激活它

source venv3/bin/activate
  • 使用pip安装persizmq

pip3 install persizmq

开发

  • 查看仓库。

  • 在仓库根目录下创建虚拟环境

python3 -m venv venv3
  • 激活虚拟环境

source venv3/bin/activate
  • 安装开发依赖

pip3 install -e .[dev]
  • 我们使用tox进行测试和打包发行版。假设虚拟环境已经激活且开发依赖已经安装,运行

tox
  • 我们还提供了一套pre-commit检查,用于检查代码格式。在激活的虚拟环境中运行这些检查,其中包含开发依赖

./precommit.py
  • pre-commit脚本还可以自动格式化代码

./precommit.py  --overwrite

版本控制

我们遵循语义版本控制。版本X.Y.Z表示

  • X是主版本(不向后兼容),

  • Y是次版本(向后兼容),

  • Z是补丁版本(向后兼容的错误修复)。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源发行版

persizmq-1.0.3.tar.gz (6.7 kB 查看哈希值)

由以下机构支持