ZeroMQ与asyncio的集成。
项目描述
asyncio (PEP 3156) 对ZeroMQ的支持。
与普通 pyzmq (zmq.asyncio) 相比,aiozmq 的不同之处在于
zmq.asyncio 只是通过替换 基础事件循环 为自定义循环来工作。这种方法可行,但有两个缺点
zmq.asyncio.ZMQEventLoop 不能与其他循环实现(最著名的是超快的 uvloop)结合使用。
它使用内部ZMQ Poller,具有快速的ZMQ套接字支持,但并不打算与许多(数千个)常规TCP套接字快速工作。
实际上这意味着不建议使用 zmq.asyncio 与 aiohttp 等网络服务器。
文档
参见 http://aiozmq.readthedocs.org
简单的客户端-服务器RPC示例
import asyncio
import aiozmq.rpc
class ServerHandler(aiozmq.rpc.AttrHandler):
@aiozmq.rpc.method
def remote_func(self, a:int, b:int) -> int:
return a + b
async def go():
server = await aiozmq.rpc.serve_rpc(
ServerHandler(), bind='tcp://127.0.0.1:5555')
client = await aiozmq.rpc.connect_rpc(
connect='tcp://127.0.0.1:5555')
ret = await client.call.remote_func(1, 2)
assert 3 == ret
server.close()
client.close()
asyncio.run(go())
低级请求/回复示例
import asyncio
import aiozmq
import zmq
async def go():
router = await aiozmq.create_zmq_stream(
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = list(router.transport.bindings())[0]
dealer = await aiozmq.create_zmq_stream(
zmq.DEALER,
connect=addr)
for i in range(10):
msg = (b'data', b'ask', str(i).encode('utf-8'))
dealer.write(msg)
data = await router.read()
router.write(data)
answer = await dealer.read()
print(answer)
dealer.close()
router.close()
asyncio.run(go())
与pyzmq的比较
zmq.asyncio 提供了一个与 asyncio 兼容的循环 实现。
但它基于 zmq.Poller,与大量非ZMQ套接字的使用配合不佳。
例如,如果您构建一个至少可以处理数千个并发网络请求(1000-5000)的Web服务器,pyzmq的内部轮询器将会很慢。
aiozmq 与 epoll 原生兼容,无需自定义循环实现,并且与例如 uvloop 等合作得很好。
需求
许可证
aiozmq 在BSD许可证下提供。
变更记录
1.0.0 (2022-11-02)
支持Python 3.9、3.10和3.11(部分归功于Esben Sonne)
取消对Python 3.5的支持
移除对使用注解作为转换函数的支持
0.9.0 (2020-01-25)
支持Python 3.7和3.8
0.8.0 (2016-12-07)
在创建zmq流时尊重 events_backlog 参数 #86
0.7.1 (2015-09-20)
修复监控事件实现
使库与Python 3.5兼容
0.7.0 (2015-07-31)
实现监控ZMQ事件 #50
对继承的类进行更深入的查找 #54
放宽端点检查 #56
在流API中实现监控事件 #52
0.6.1 (2015-05-19)
动态获取pyzmq套接字类型的列表
0.6.0 (2015-02-14)
将asyncio特定异常作为内置异常处理。
如果有的话,在rpc服务器调用日志中添加repr(exception)
添加transport.get_write_buffer_limits()方法
为transport添加__repr__
为tr.get_extra_info()添加zmq_type
添加zmq流
0.5.2 (2014-10-09)
在发送zmq消息后轮询事件,用于无事件传输
0.5.1 (2014-09-27)
修复无循环传输实现。
0.5.0 (2014-08-23)
在aiozmq.rpc.serve_rpc()中支持zmq设备
添加无循环0MQ传输
0.4.1 (2014-07-03)
为rpc服务器添加exclude_log_exceptions参数
0.4.0 (2014-05-28)
在ZmqTransport中实现pause_reading/resume_reading方法
0.3.0 (2014-05-17)
添加对Windows的有限支持。
修复不稳定的测试执行,将ZmqEventLoop默认更改为使用全局共享的zmq.Context。
在rpc服务器和客户端上处理取消。
0.2.0 (2014-04-18)
msg in msg_received现在是一个列表,而不是元组
允许通过transport.write()发送空msg
添加基准测试
从aiozmq.rpc.Error派生ServiceClosedError,而不是Exception
在服务器端实现远程调用日志记录(log_exceptions参数)。
在ZmqTransport中优化字节计数。
0.1.3 (2014-04-10)
函数默认值不传递给注解。添加对libzmq版本的检查(应该是>=3.0)
0.1.2 (2014-04-01)
函数默认值不传递给注解。
0.1.1 (2014-03-31)
将复数模块名称重命名为单数。
0.1.0 (2014-03-30)
实现具有 create_zmq_connection 方法的ZmqEventLoop,该方法在zmq传输和协议上操作。
实现ZmqEventLoopPolicy。
引入ZmqTransport和ZmqProtocol。
实现zmq.rpc,具有RPC、PUSHPULL和PUBSUB协议。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。