Python NATS客户端
项目描述
Python NATS客户端
为什么?
我非常喜欢NATS。
由于对官方Python客户端的asyncio支持不够,所以我从未使用过它。
尽管我从nats-python中学到了很多东西,但它有一些我不愿意作为最终用户绕过的粗糙边缘。
我想自己学习足够的socket编程,并更深入地了解NATS协议。
一个符合我使用NATS方式的Python NATS客户端。
- 客户端必须有一个名称
- 客户端在幕后对PING进行PONG响应
- 鼓励将客户端仅用作上下文管理器
- 接收订阅消息时非阻塞
- 请求/响应时阻塞
它远非功能完整。
它实现了我使用的功能,这可能也是您需要的所有功能。
如何?
一个虚构的示例,说明了如何通过NATS让一个应用程序与一个发布者和一个响应者交互。
在您的开发机器上启动NATS并运行python -m goingnats
以查看其运行。
import datetime as dt
import time
def publisher():
"""publish time.time() every second"""
with Client(name="publisher") as client:
while True:
time.sleep(1)
client.publish(subject=b"time.time", payload=f"{time.time()}".encode())
threading.Thread(target=publisher, daemon=True).start()
def responder():
"""respond to request for today with the date"""
with Client(name="responder") as client:
client.subscribe(subject=b"today")
client.subscribe(subject=b"add")
while True:
for request in client.get():
if request.subject == b"today":
# slow responder
time.sleep(2)
# will format the date according to payload or defaults to ...
format = request.payload.decode() if request.payload else "%Y-%m-%d"
response = f"{dt.date.today():{format}}".encode()
elif request.subject == b"add":
response = _int_to_bytes(sum(json.loads(request.payload)))
else:
continue
client.publish(
subject=request.inbox,
payload=response,
)
threading.Thread(target=responder, daemon=True).start()
# application
with Client(name="consumer") as client:
print("--- one ---")
print(one(subject=b"time.time"))
print("--- client.subscribe + client.request ---")
client.subscribe(subject=b"time.time")
received = 0
response = None
while received < 5:
# waits for at most 10 ms for messages
for message in client.get(wait=10):
print(message)
received += 1
if received == 3 and response is None:
# publish
publish(subject=b"time.time", payload=b"hijack")
# request response are blocking
response = client.request(subject=b"today", payload=b"%Y%m%d")
print(response)
print("--- request ---")
print(request(subject=b"add", payload=b"[1, 2, 3]"))
try:
print(request(subject=b"today", wait=100))
except TimeoutError as e:
print(e)
# UserWarning: NOP - out of context manager
client.publish(subject=b"out.of.context.manager")
one
还有一个...实际上是三个
>>> from goingnats import one
>>> one(subject=b">")
Message(...)
one
是一个非常有用的辅助工具,它等待接收给定主题的消息并返回它。
>>> from goingnats import request
>>> request(subject=b"add", payload=b"[1, 2, 3]")
Message(payload=b"6")
request
是当开发运行在NATS上的服务时的另一个有用的辅助工具。
>>> from goingnats import publish
>>> publish(subject=b"something.important", payload=b"OR_NOT")
publish
与request
类似,但不期望有响应...另一个方便的辅助工具。
项目详情
关闭
goingnats-2022.4.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f3a74c96f360a15cbd7c5a6cddffbb535562bb890d29975eda22c72522df532b |
|
MD5 | 979b248752568e87a43a092cf7444932 |
|
BLAKE2b-256 | 4a4cc6ea5d8ba733d3d200b2c0b02948606b766dda382c8879575b4dc11826d6 |