跳转到主要内容

Python NATS客户端

项目描述

Python NATS客户端

为什么?

我非常喜欢NATS

由于对官方Python客户端asyncio支持不够,所以我从未使用过它。

尽管我从nats-python中学到了很多东西,但它有一些我不愿意作为最终用户绕过的粗糙边缘。

我想自己学习足够的socket编程,并更深入地了解NATS协议

一个符合我使用NATS方式的Python NATS客户端。

  • 客户端必须有一个名称
  • 客户端在幕后对PING进行PONG响应
  • 鼓励将客户端仅用作上下文管理器
  • 接收订阅消息时非阻塞
  • 请求/响应时阻塞

它远非功能完整。

它实现了我使用的功能,这可能也是您需要的所有功能。

如何?

一个虚构的示例,说明了如何通过NATS让一个应用程序与一个发布者和一个响应者交互。

在您的开发机器上启动NATS并运行python -m goingnats以查看其运行。

import datetime as dt
import time

def publisher():
    """publish time.time() every second"""
    with Client(name="publisher") as client:
        while True:
            time.sleep(1)
            client.publish(subject=b"time.time", payload=f"{time.time()}".encode())

threading.Thread(target=publisher, daemon=True).start()

def responder():
    """respond to request for today with the date"""
    with Client(name="responder") as client:
        client.subscribe(subject=b"today")
        client.subscribe(subject=b"add")
        while True:
            for request in client.get():
                if request.subject == b"today":
                    # slow responder
                    time.sleep(2)
                    # will format the date according to payload or defaults to ...
                    format = request.payload.decode() if request.payload else "%Y-%m-%d"
                    response = f"{dt.date.today():{format}}".encode()
                elif request.subject == b"add":
                    response = _int_to_bytes(sum(json.loads(request.payload)))
                else:
                    continue
                client.publish(
                    subject=request.inbox,
                    payload=response,
                )

threading.Thread(target=responder, daemon=True).start()

# application
with Client(name="consumer") as client:
    print("--- one ---")
    print(one(subject=b"time.time"))
    print("--- client.subscribe + client.request ---")
    client.subscribe(subject=b"time.time")
    received = 0
    response = None
    while received < 5:
        # waits for at most 10 ms for messages
        for message in client.get(wait=10):
            print(message)
            received += 1
        if received == 3 and response is None:
            # publish
            publish(subject=b"time.time", payload=b"hijack")
            # request response are blocking
            response = client.request(subject=b"today", payload=b"%Y%m%d")
            print(response)
    print("--- request ---")
    print(request(subject=b"add", payload=b"[1, 2, 3]"))
    try:
        print(request(subject=b"today", wait=100))
    except TimeoutError as e:
        print(e)
# UserWarning: NOP - out of context manager
client.publish(subject=b"out.of.context.manager")

one还有一个...实际上是三个

>>> from goingnats import one
>>> one(subject=b">")
Message(...)

one是一个非常有用的辅助工具,它等待接收给定主题的消息并返回它。

>>> from goingnats import request
>>> request(subject=b"add", payload=b"[1, 2, 3]")
Message(payload=b"6")

request是当开发运行在NATS上的服务时的另一个有用的辅助工具。

>>> from goingnats import publish
>>> publish(subject=b"something.important", payload=b"OR_NOT")

publishrequest类似,但不期望有响应...另一个方便的辅助工具。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

goingnats-2022.4.0.tar.gz (6.3 kB 查看哈希值)

上传时间

支持