Python的NATS客户端
项目描述
NATS - Asyncio的Python3客户端
用于asyncio的Python客户端,用于NATS消息系统。
支持的平台
应至少与Python +3.8兼容。
安装
pip install nats-py
入门指南
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError
async def main():
# It is very likely that the demo server will see traffic from clients other than yours.
# To avoid this, start your own locally and modify the example to use it.
nc = await nats.connect("nats://demo.nats.io:4222")
# You can also use the following for TLS against the demo server.
#
# nc = await nats.connect("tls://demo.nats.io:4443")
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Simple publisher and async subscriber via coroutine.
sub = await nc.subscribe("foo", cb=message_handler)
# Stop receiving after 2 messages.
await sub.unsubscribe(limit=2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')
# Synchronous style with iterator also supported.
sub = await nc.subscribe("bar")
await nc.publish("bar", b'First')
await nc.publish("bar", b'Second')
try:
async for msg in sub.messages:
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await sub.unsubscribe()
except Exception as e:
pass
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await nc.publish(msg.reply, b'I can help')
# Use queue named 'workers' for distributing requests
# among subscribers.
sub = await nc.subscribe("help", "workers", help_request)
# Send a request and expect a single response
# and trigger timeout if not faster than 500 ms.
try:
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
message=response.data.decode()))
except TimeoutError:
print("Request timed out")
# Remove interest in subscription.
await sub.unsubscribe()
# Terminate connection to NATS.
await nc.drain()
if __name__ == '__main__':
asyncio.run(main())
JetStream
从v2.0.0版本开始,客户端现在支持JetStream
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print(ack)
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
await msg.ack()
print(msg)
# Create single ephemeral push based subscriber.
sub = await js.subscribe("foo")
msg = await sub.next_msg()
await msg.ack()
# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()
async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()
await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)
# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()
while True:
try:
msg = await osub.next_msg()
data.extend(msg.data)
except TimeoutError:
break
print("All data in stream:", len(data))
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
TLS
可以使用ssl上下文配置TLS连接
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")
在连接URL中将方案设置为tls
将使客户端自动创建一个默认ssl上下文
import asyncio
import ssl
from nats.aio.client import Client as NATS
async def run():
nc = NATS()
await nc.connect("tls://demo.nats.io:4443")
注意:如果在OS X中遇到SSL证书错误,请首先尝试安装certifi
证书包。例如,如果使用Python 3.7,则运行
$ /Applications/Python\ 3.7/Install\ Certificates.command
-- pip install --upgrade certifi
Collecting certifi
...
-- removing any existing file or link
-- creating symlink to certifi certificate bundle
-- setting permissions
-- update complete
NKEYS和JWT用户凭据
从v0.9.0版本发布以来,您还可以选择安装NKEYS以使用新的NATS v2.0认证功能
pip install nats-py[nkeys]
用法
await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")
开发
- 安装nats服务器.
- 确保服务器已在您的PATH中可用:
nats-server -v
。 - 安装依赖项:
python3 -m pipenv install --dev
。 - 运行测试:
python3 -m pytest
。
许可协议
除非另有说明,否则NATS源文件是在LICENSE文件中找到的Apache Version 2.0许可证下分发的。
项目详情
关闭
nats_py-2.9.0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 01886eb9e0a87f0ec630652cf1fae65d2a8556378a609bc6cc07d2ea60c8d0dd |
|
MD5 | c8f3db602047c0cb6fe985ab61b080eb |
|
BLAKE2b-256 | badc9a01dd9561b736622c0aa3a19f6b40f3eeb22051eaea1475ceb81d5da48d |