Python NATS客户端
项目描述
Python NATS客户端
为什么?
我非常喜欢NATS。
由于对官方Python客户端的asyncio支持不够,所以我从未使用过它。
尽管我从nats-python中学到了很多东西,但它有一些我不愿意作为最终用户绕过的粗糙边缘。
我想自己学习足够的socket编程,并更深入地了解NATS协议。
一个符合我使用NATS方式的Python NATS客户端。
- 客户端必须有一个名称
- 客户端在幕后对PING进行PONG响应
- 鼓励将客户端仅用作上下文管理器
- 接收订阅消息时非阻塞
- 请求/响应时阻塞
它远非功能完整。
它实现了我使用的功能,这可能也是您需要的所有功能。
如何?
一个虚构的示例,说明了如何通过NATS让一个应用程序与一个发布者和一个响应者交互。
在您的开发机器上启动NATS并运行python -m goingnats以查看其运行。
import datetime as dt
import time
def publisher():
    """publish time.time() every second"""
    with Client(name="publisher") as client:
        while True:
            time.sleep(1)
            client.publish(subject=b"time.time", payload=f"{time.time()}".encode())
threading.Thread(target=publisher, daemon=True).start()
def responder():
    """respond to request for today with the date"""
    with Client(name="responder") as client:
        client.subscribe(subject=b"today")
        client.subscribe(subject=b"add")
        while True:
            for request in client.get():
                if request.subject == b"today":
                    # slow responder
                    time.sleep(2)
                    # will format the date according to payload or defaults to ...
                    format = request.payload.decode() if request.payload else "%Y-%m-%d"
                    response = f"{dt.date.today():{format}}".encode()
                elif request.subject == b"add":
                    response = _int_to_bytes(sum(json.loads(request.payload)))
                else:
                    continue
                client.publish(
                    subject=request.inbox,
                    payload=response,
                )
threading.Thread(target=responder, daemon=True).start()
# application
with Client(name="consumer") as client:
    print("--- one ---")
    print(one(subject=b"time.time"))
    print("--- client.subscribe + client.request ---")
    client.subscribe(subject=b"time.time")
    received = 0
    response = None
    while received < 5:
        # waits for at most 10 ms for messages
        for message in client.get(wait=10):
            print(message)
            received += 1
        if received == 3 and response is None:
            # publish
            publish(subject=b"time.time", payload=b"hijack")
            # request response are blocking
            response = client.request(subject=b"today", payload=b"%Y%m%d")
            print(response)
    print("--- request ---")
    print(request(subject=b"add", payload=b"[1, 2, 3]"))
    try:
        print(request(subject=b"today", wait=100))
    except TimeoutError as e:
        print(e)
# UserWarning: NOP - out of context manager
client.publish(subject=b"out.of.context.manager")
one还有一个...实际上是三个
>>> from goingnats import one
>>> one(subject=b">")
Message(...)
one是一个非常有用的辅助工具,它等待接收给定主题的消息并返回它。
>>> from goingnats import request
>>> request(subject=b"add", payload=b"[1, 2, 3]")
Message(payload=b"6")
request是当开发运行在NATS上的服务时的另一个有用的辅助工具。
>>> from goingnats import publish
>>> publish(subject=b"something.important", payload=b"OR_NOT")
publish与request类似,但不期望有响应...另一个方便的辅助工具。
项目详情
    
       关闭
    
      
        
    
    
  
goingnats-2022.4.0.tar.gz的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | f3a74c96f360a15cbd7c5a6cddffbb535562bb890d29975eda22c72522df532b | |
| MD5 | 979b248752568e87a43a092cf7444932 | |
| BLAKE2b-256 | 4a4cc6ea5d8ba733d3d200b2c0b02948606b766dda382c8879575b4dc11826d6 |