用于ZMQ通信的包装器。
项目描述
ZMQ Tubes
ZMQ Tubes是一个ZMQ通信的管理系统。它可以通过一个接口管理多个ZMQ套接字。整个系统是分层的,基于主题(参见MQTT主题)。
类
- TubeMessage - 此类表示请求/响应消息。一些类型的管道需要以这种格式响应。
- Tube - 此类包装ZMQ套接字。它代表客户端和服务器之间的连接。
- TubeMonitor - 该类可以嗅探ZMQTube通信。
- TubeNode - 这代表通过管道进行通信的应用程序接口。
Asyncio / Threading
该库支持异步方法。Python 3.7的Asyncio。
from zmq_tubes import TubeNode, Tube # Asyncio classes
from zmq_tubes.threads import TubeNode, Tube # Threads classes
用法
yaml文件中的节点定义
我们可以通过yaml文件定义一个TubeNode的所有管道。以下示例需要安装这些包 PyYAML
,pyzmq
和 zmq_tubes
。
客户端服务(asyncio示例)
# client.yml
tubes:
- name: Client REQ
addr: ipc:///tmp/req.pipe
tube_type: REQ
topics:
- foo/bar
- name: Client PUB
addr: ipc:///tmp/pub.pipe
tube_type: PUB
topics:
- foo/pub/#
# client.py
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage
async def run():
with open('client.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
async with node:
print(await node.request('foo/bar', 'message 1'))
await node.publish('foo/pub/test', 'message 2')
if __name__ == "__main__":
asyncio.run(run())
> python client.py
topic: foo/bar, payload: response
服务器服务(线程示例)
# server.yml
tubes:
- name: server ROUTER
addr: ipc:///tmp/req.pipe
tube_type: ROUTER
server: True
topics:
- foo/bar
- name: server SUB
addr: ipc:///tmp/pub.pipe
tube_type: SUB
server: True
topics:
- foo/pub/#
# server.py
import yaml
from zmq_tubes.threads import TubeNode, TubeMessage
def handler(request: TubeMessage):
print(request.payload)
if request.tube.tube_type_name == 'ROUTER':
return request.create_response('response')
def run():
with open('server.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
node.register_handler('foo/#', handler)
with node:
node.start().join()
if __name__ == "__main__":
run()
> python server.py
message 1
message 2
YAML定义
yaml文件以根元素 tubes
开始,其中包含所有管道定义的列表。
name
- 字符串 - 管道名称。addr
- 字符串 - 格式为transport://address
的连接或绑定地址(参见更多https://libzmq.zeromq.cn/2-1:zmq-connect)server
- 布尔型 - 这个管子是位于服务器端(绑定到addr
)还是客户端(连接到addr
)tube_type
- 字符串 - 这个管子的类型(更多内容请见 https://zguide.zeromq.cn/docs/chapter2/#Messaging-Patterns)identity
- 字符串 - (可选)我们可以设置自定义的管子标识utf8_decoding
- 布尔型 - (默认 = True),如果设置为 True,则自动对负载进行 UTF8 解码。sockopts
- 字典 - (可选)我们可以为这个管子设置 sockopts(更多内容请见 https://libzmq.zeromq.cn/4-2:zmq-setsockopt)monitor
- 字符串 - (可选)管子监控的绑定地址(更多内容请见 调试 / 监控)
请求 / 响应
这是一个简单场景,服务器顺序处理请求。
服务器
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
return 'answer'
# or return request.create_response('response')
tube = Tube(
name='Server',
addr='ipc:///tmp/req_resp.pipe',
server=True,
tube_type='REP'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'question'
客户端
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/req_resp.pipe',
tube_type='REQ'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'
方法 request
接受可选参数 utf8_decoding
。在之前的示例中,当我们将此参数设置为 False
时,返回的负载不会自动解码,我们得到字节。
订阅 / 发布者
服务器
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/sub_pub.pipe',
server=True,
tube_type='SUB'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'
客户端
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/sub_pub.pipe',
tube_type='PUB'
)
# In the case of publishing, the first message is very often
# lost. The workaround is to connect the tube manually as soon as possible.
tube.connect()
node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')
请求 / 路由器
服务器是异步的。这意味着它可以同时处理多个请求。
服务器
import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
if request.payload == 'wait':
await asyncio.sleep(10)
return request.create_response(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/req_router.pipe',
server=True,
tube_type='ROUTER'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'
客户端
import asyncio
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/req_router.pipe',
tube_type='REQ'
)
async def task(node, text):
print(await node.request('test/xxx', text))
node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'
经销商 / 响应
客户端是异步的。这意味着它可以同时发送多个请求。
服务器
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
return 'response'
# or return requset.create_response('response')
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_resp.pipe',
server=True,
tube_type='REP'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'
客户端
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_resp.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'message')
# output: 'response'
经销商 / 路由器
客户端和服务器都是异步的。这意味着它可以同时发送和接收更多请求/响应。
服务器
import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
if request.payload == 'wait':
await asyncio.sleep(10)
return request.create_response(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_router.pipe',
server=True,
tube_type='ROUTER'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'
客户端
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_router.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'wait')
await node.send('test/xxx', 'message')
# output: 'message'
# output: 'wait'
经销商 / 经销商
客户端和服务器都是异步的。这意味着它可以同时发送和接收更多请求/响应。
服务器
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_dealer.pipe',
server=True,
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'message from server')
# output: 'message from client'
客户端
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_dealer.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.send('test/xxx', 'message from client')
# output: 'message from server'
调试 / 监控
我们可以为我们的 zmq 管子分配一个监控套接字。通过这个监控套接字,我们可以嗅探 zmq 通信或获取 zmq 管子的配置。
tubes:
- name: ServerRouter
addr: ipc:///tmp/router.pipe
monitor: ipc:///tmp/test.monitor
tube_type: ROUTER
server: yes
topics:
- foo/#
这是一个 yaml 定义示例。我们可以为同一 tubeNode 中的更多管子使用相同的监控套接字。当我们将监控属性添加到我们的管子定义中时,应用程序会自动创建一个新的套接字监控器:/tmp/test.monitor
。您的应用程序作为服务器端运行。日志仅在监控工具运行时发送到套接字。
监控工具
在应用程序中启用监控后,我们可以使用监控工具进行嗅探。
# get the server tube configuration
> zmqtube-monitor get_schema ipc:///tmp/display.monitor
tubes:
- addr: ipc:///tmp/router.pipe
monitor: ipc:///tmp/test.monitor
name: ServerRouter
server: 'yes'
tube_type: ROUTER
# the log tube communication. Logs will be saved to dump.rec as well.
> zmqtube-monitor logs -d ./dump.rec ipc:///tmp/display.monitor
0.28026580810546875 ServerRouter < foo/test Request
0.0901789665222168 ServerRouter > foo/test Response
# The format of output
# <relative time> <tube name> <direction> <topic> <message>`
客户端仿真
当我们有一个转储文件(例如 dump.rec
)时,我们可以模拟与我们的应用程序的通信。第一步是准备模拟客户端架构文件。为此,我们可以从我们的应用程序中获取管子节点配置,然后进行编辑。
> zmqtube-monitor get_schema ipc:///tmp/display.monitor > mock_schema.yaml
> vim mock_schema.yaml
...
# Now, we have to update the file mock_schema.yaml.
# We change configuration to the mock client configuration.
# The names of the tubes must be the same as are in your app.
# We can remove monitoring attribute and change server and
# tube_type attributes. In this mock file, the topics are not
# required, they are ignored.
> cat mock_schema.yaml
tubes:
- addr: ipc:///tmp/router.pipe
name: ServerRouter
tube_type: REQ
现在,我们可以开始模拟客户端通信。
> zmqtube-monitor simulate mock_schema.yaml dump.rec
如果我们的应用程序的响应与工具预期的不同(存储在转储文件中的响应),那么监控工具会警告我们。
我们可以通过参数 --speed
修改仿真速度。
在默认配置中,仿真以与原始通信相同的速度运行(参数 --speed=1
)。
速度 | 描述 |
---|---|
0 | 非阻塞仿真 |
0.5 | 比原始速度快两倍 |
1 | 原始速度 |
2 | 比原始速度慢两倍 |
监控编程声明的示例。
import zmq
from zmq_tubes.threads import Tube, TubeNode, TubeMessage, TubeMonitor
def handler(request: TubeMessage):
print(request.payload)
return request.create_response('response')
resp_tube = Tube(
name='REP',
addr='ipc:///tmp/rep.pipe',
server='yes',
tube_type=zmq.REP
)
req_tube = Tube(
name='REQ',
addr='ipc:///tmp/rep.pipe',
tube_type=zmq.REQ
)
node = TubeNode()
node.register_tube(resp_tube, f"foo/#")
node.register_tube(req_tube, f"foo/#")
node.register_handler(f"foo/#", handler)
node.register_monitor(resp_tube, TubeMonitor(addr='ipc:///tmp/test.monitor'))
with node:
print(node.request('foo/xxx', 'message 2'))
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。