跳转到主要内容

用于ZMQ通信的包装器。

项目描述

PyPI PyPI - Python Version License

ZMQ Tubes

ZMQ Tubes是一个ZMQ通信的管理系统。它可以通过一个接口管理多个ZMQ套接字。整个系统是分层的,基于主题(参见MQTT主题)。

  • TubeMessage - 此类表示请求/响应消息。一些类型的管道需要以这种格式响应。
  • Tube - 此类包装ZMQ套接字。它代表客户端和服务器之间的连接。
  • TubeMonitor - 该类可以嗅探ZMQTube通信。
  • TubeNode - 这代表通过管道进行通信的应用程序接口。

Asyncio / Threading

该库支持异步方法。Python 3.7的Asyncio。

from zmq_tubes import TubeNode, Tube            # Asyncio classes
from zmq_tubes.threads import TubeNode, Tube    # Threads classes

用法

yaml文件中的节点定义

我们可以通过yaml文件定义一个TubeNode的所有管道。以下示例需要安装这些包 PyYAMLpyzmqzmq_tubes

客户端服务(asyncio示例)

# client.yml
tubes:
  - name: Client REQ
    addr:  ipc:///tmp/req.pipe
    tube_type: REQ
    topics:
      - foo/bar
  
  - name: Client PUB
    addr:  ipc:///tmp/pub.pipe
    tube_type: PUB
    topics:
      - foo/pub/#
# client.py
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage


async def run():
  with open('client.yml', 'r+') as fd:
    schema = yaml.safe_load(fd)
  node = TubeNode(schema=schema)  
  async with node:
      print(await node.request('foo/bar', 'message 1'))
      await node.publish('foo/pub/test', 'message 2')

if __name__ == "__main__":
    asyncio.run(run())
> python client.py
topic: foo/bar,  payload: response

服务器服务(线程示例)

# server.yml
tubes:
  - name: server ROUTER
    addr:  ipc:///tmp/req.pipe
    tube_type: ROUTER
    server: True
    topics:
      - foo/bar
  
  - name: server SUB
    addr:  ipc:///tmp/pub.pipe
    tube_type: SUB
    server: True
    topics:
      - foo/pub/#
# server.py
import yaml
from zmq_tubes.threads import TubeNode, TubeMessage


def handler(request: TubeMessage):
  print(request.payload)
  if request.tube.tube_type_name == 'ROUTER': 
    return request.create_response('response')


def run():
  with open('server.yml', 'r+') as fd:
    schema = yaml.safe_load(fd)
  node = TubeNode(schema=schema)
  node.register_handler('foo/#', handler)
  with node:
    node.start().join()

if __name__ == "__main__":
    run()
> python server.py
message 1
message 2

YAML定义

yaml文件以根元素 tubes 开始,其中包含所有管道定义的列表。

  • name - 字符串 - 管道名称。
  • addr - 字符串 - 格式为 transport://address 的连接或绑定地址(参见更多https://libzmq.zeromq.cn/2-1:zmq-connect
  • server - 布尔型 - 这个管子是位于服务器端(绑定到 addr)还是客户端(连接到 addr
  • tube_type - 字符串 - 这个管子的类型(更多内容请见 https://zguide.zeromq.cn/docs/chapter2/#Messaging-Patterns
  • identity - 字符串 - (可选)我们可以设置自定义的管子标识
  • utf8_decoding - 布尔型 - (默认 = True),如果设置为 True,则自动对负载进行 UTF8 解码。
  • sockopts - 字典 - (可选)我们可以为这个管子设置 sockopts(更多内容请见 https://libzmq.zeromq.cn/4-2:zmq-setsockopt
  • monitor - 字符串 - (可选)管子监控的绑定地址(更多内容请见 调试 / 监控

请求 / 响应

这是一个简单场景,服务器顺序处理请求。

服务器

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'answer'
  # or return request.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()

# output: 'question'

客户端

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_resp.pipe',
  tube_type='REQ'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'

方法 request 接受可选参数 utf8_decoding。在之前的示例中,当我们将此参数设置为 False 时,返回的负载不会自动解码,我们得到字节。

订阅 / 发布者

服务器

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/sub_pub.pipe',
  server=True,
  tube_type='SUB'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

客户端

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/sub_pub.pipe',
  tube_type='PUB'
)
# In the case of publishing, the first message is very often
# lost. The workaround is to connect the tube manually as soon as possible.
tube.connect()

node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')        

请求 / 路由器

服务器是异步的。这意味着它可以同时处理多个请求。

服务器

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

客户端

import asyncio
from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_router.pipe',
  tube_type='REQ'
)


async def task(node, text):
  print(await node.request('test/xxx', text))


node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'

经销商 / 响应

客户端是异步的。这意味着它可以同时发送多个请求。

服务器

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'response'
  # or return requset.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

客户端

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_resp.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

await node.send('test/xxx', 'message')

# output: 'response'

经销商 / 路由器

客户端和服务器都是异步的。这意味着它可以同时发送和接收更多请求/响应。

服务器

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

客户端

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_router.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

await node.send('test/xxx', 'wait')
await node.send('test/xxx', 'message')

# output: 'message'
# output: 'wait'

经销商 / 经销商

客户端和服务器都是异步的。这意味着它可以同时发送和接收更多请求/响应。

服务器

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_dealer.pipe',
  server=True,
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

await node.send('test/xxx', 'message from server')
# output: 'message from client'

客户端

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_dealer.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

await node.send('test/xxx', 'message from client')
# output: 'message from server'

调试 / 监控

我们可以为我们的 zmq 管子分配一个监控套接字。通过这个监控套接字,我们可以嗅探 zmq 通信或获取 zmq 管子的配置。

tubes:
  - name: ServerRouter
    addr:  ipc:///tmp/router.pipe
    monitor: ipc:///tmp/test.monitor 
    tube_type: ROUTER
    server: yes    
    topics:
      - foo/#       

这是一个 yaml 定义示例。我们可以为同一 tubeNode 中的更多管子使用相同的监控套接字。当我们将监控属性添加到我们的管子定义中时,应用程序会自动创建一个新的套接字监控器:/tmp/test.monitor。您的应用程序作为服务器端运行。日志仅在监控工具运行时发送到套接字。

监控工具

在应用程序中启用监控后,我们可以使用监控工具进行嗅探。

# get the server tube configuration
> zmqtube-monitor get_schema ipc:///tmp/display.monitor
    tubes:
      - addr: ipc:///tmp/router.pipe
        monitor: ipc:///tmp/test.monitor 
        name: ServerRouter
        server: 'yes'
        tube_type: ROUTER

# the log tube communication. Logs will be saved to dump.rec as well. 
> zmqtube-monitor logs -d ./dump.rec ipc:///tmp/display.monitor
 0.28026580810546875 ServerRouter < foo/test Request
 0.0901789665222168 ServerRouter > foo/test Response

# The format of output
# <relative time> <tube name> <direction> <topic> <message>` 

客户端仿真

当我们有一个转储文件(例如 dump.rec)时,我们可以模拟与我们的应用程序的通信。第一步是准备模拟客户端架构文件。为此,我们可以从我们的应用程序中获取管子节点配置,然后进行编辑。

> zmqtube-monitor get_schema ipc:///tmp/display.monitor > mock_schema.yaml
> vim mock_schema.yaml
...   
# Now, we have to update the file mock_schema.yaml. 
# We change configuration to the mock client configuration. 
# The names of the tubes must be the same as are in your app. 
# We can remove monitoring attribute and change server and 
# tube_type attributes. In this mock file, the topics are not 
# required, they are ignored. 

> cat mock_schema.yaml
tubes:
- addr: ipc:///tmp/router.pipe
  name: ServerRouter
  tube_type: REQ

现在,我们可以开始模拟客户端通信。

> zmqtube-monitor simulate mock_schema.yaml dump.rec

如果我们的应用程序的响应与工具预期的不同(存储在转储文件中的响应),那么监控工具会警告我们。
我们可以通过参数 --speed 修改仿真速度。

在默认配置中,仿真以与原始通信相同的速度运行(参数 --speed=1)。

速度 描述
0 非阻塞仿真
0.5 比原始速度快两倍
1 原始速度
2 比原始速度慢两倍

监控编程声明的示例。

import zmq
from zmq_tubes.threads import Tube, TubeNode, TubeMessage, TubeMonitor


def handler(request: TubeMessage):
  print(request.payload)
  return request.create_response('response')

resp_tube = Tube(
  name='REP',
  addr='ipc:///tmp/rep.pipe',
  server='yes',
  tube_type=zmq.REP
)

req_tube = Tube(
  name='REQ',
  addr='ipc:///tmp/rep.pipe',  
  tube_type=zmq.REQ
)

node = TubeNode()
node.register_tube(resp_tube, f"foo/#")
node.register_tube(req_tube, f"foo/#")
node.register_handler(f"foo/#", handler)

node.register_monitor(resp_tube, TubeMonitor(addr='ipc:///tmp/test.monitor'))
  
with node:
    print(node.request('foo/xxx', 'message 2'))

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。

源分发

zmq_tubes-1.14.0.tar.gz (19.1 kB 查看哈希值)

上传时间

构建分发

zmq_tubes-1.14.0-py3-none-any.whl (18.9 kB 查看哈希值)

上传时间 Python 3

支持者