跳转到主要内容

为Python跨机器边界实现IPC而构建的简单面向消息的中间件库

项目描述

PyMQ

Build Status PyPI Version PyPI - Downloads PyPI License Coverage Status Codestyle

PyMQ是一个简单的面向消息的中间件库,用于在Python中实现跨机器边界的IPC。API通过发布/订阅、队列和同步RPC实现不同的远程调用方式。

使用PyMQ,开发者可以以松散耦合的方式通过现有传输机制将运行在不同机器上的Python应用程序集成在一起。PyMQ目前提供Redis后端、使用SQS和SNS的AWS后端、用于单机IPC的POSIX IPC后端以及用于测试的内存后端。API是可扩展的,可以插入其他传输。

使用PyMQ

安装

从pymq 0.5开始,所有提供程序都分别作为setuptools extras。如果您想使用redis后端,则运行

pip install pymq[redis]

可用的提供程序

  • pymq[redis] Redis提供程序
  • pymq[aws] 使用SNS和SQS的AWS提供程序
  • pymq[ipc] Linux IPC提供程序
  • pymq[full] 安装所有提供程序

初始化PyMQ

核心模块管理一个全局事件总线实例,它提供了远程调用原语。默认的Redis实现使用一个事件循环来处理发布/订阅对象。全局事件总线通过pymq.init和一个提供程序工厂来初始化。

import pymq
from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

这将在本地Redis服务器之上创建一个事件总线实例。

发布/订阅

发布/订阅允许基于事件的异步通信。事件类用于传输状态和标识通道。

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

队列

队列是直接的,因为它们与Python queue.Queue规范兼容。

import pymq

queue = pymq.queue('my_queue') 
queue.put('obj')
print(queue.get()) # outputs 'obj'

RPC

服务器代码

import pymq

@pymq.remote('product_remote')
def product(a: int, b: int) -> int: # pymq relies on type hints for marshalling
    return a * b

客户端代码

import pymq

product = pymq.stub('product_remote')
product(2, 4) # 8

具有共享代码库,方法也可以通过传递可调用对象来公开和调用。例如,

import pymq

# common code
class Remote:
    def echo(self, param) -> None:
        return 'echo: ' + param

# server
obj = Remote()
pymq.expose(obj.echo)

# client
echo = pymq.stub(Remote.echo)
echo('pymq') # "echo: pymq"

如果有多个相同对象的提供者,则可以使用 multi=True 初始化存根以获取结果列表。在这种情况下使用超时可能很有用。

remote = pymq.stub('remote_method', multi=True, timeout=2)

result = remote() # result will be a list containing the results of all invocations of available remote objects

提供者

  • SimpleEventBus 用于测试和基本的单线程调度
  • RedisEventBus 可以跨越网络和进程边界,但需要运行中的 Redis 实例
  • AwsEventBus 使用 AWS 简单通知服务 (SNS) 和简单队列服务 (SQS)
  • IpcEventBus 使用 posix_ipc 消息队列作为事件循环,并在 /run/shm 中维护一个主题订阅树。在没有附加服务器组件的情况下,用于跨进程边界的事件很有用。

兼容性

Python 3.8+

已知限制

  • JSON 序列化严重依赖于类型提示。发送没有类型提示的复杂类型将导致类型错误。
  • 当前对 JSON 序列化不支持多态性
  • 基于模式的主题匹配对内存事件总线或 IPC 事件总线不起作用
  • 在使用 IPC 提供者时,您只能拥有有限数量的队列,因为内核限制了每个进程的文件描述符数量
  • 外部组件对 RPC 通道的订阅将在多调用场景中引起问题
  • 在多进程场景中,使用 pymq 单例可能不会按预期工作,因为该模块在全局变量中保持一个线程。一种解决方案是在分叉进程中通过调用 shutdown()init() 重新启动总线。
  • IPC 提供者仅适用于 Linux
  • 多 RPC 对 AWS 提供者不起作用
  • AWS 提供者不能自动删除所有队列和主题。

背景

最初是 Symmetry 项目的部分,后来被提取为一个独立的库。

项目详情


下载文件

下载您平台的文件。如果您不确定要选择哪个,请了解更多关于 安装包 的信息。

源代码分发

pymq-0.6.2.tar.gz (22.4 kB 查看散列)

上传时间

构建分发

pymq-0.6.2-py3-none-any.whl (23.8 kB 查看散列)

上传时间 Python 3

由以下支持