Python的NATS客户端
项目描述
NATS - Asyncio的Python3客户端
用于asyncio的Python客户端,用于NATS消息系统。
支持的平台
应至少与Python +3.8兼容。
安装
pip install nats-py
入门指南
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError
async def main():
    # It is very likely that the demo server will see traffic from clients other than yours.
    # To avoid this, start your own locally and modify the example to use it.
    nc = await nats.connect("nats://demo.nats.io:4222")
    # You can also use the following for TLS against the demo server.
    #
    # nc = await nats.connect("tls://demo.nats.io:4443")
    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
    # Simple publisher and async subscriber via coroutine.
    sub = await nc.subscribe("foo", cb=message_handler)
    # Stop receiving after 2 messages.
    await sub.unsubscribe(limit=2)
    await nc.publish("foo", b'Hello')
    await nc.publish("foo", b'World')
    await nc.publish("foo", b'!!!!!')
    # Synchronous style with iterator also supported.
    sub = await nc.subscribe("bar")
    await nc.publish("bar", b'First')
    await nc.publish("bar", b'Second')
    try:
        async for msg in sub.messages:
            print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
            await sub.unsubscribe()
    except Exception as e:
        pass
    async def help_request(msg):
        print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
        await nc.publish(msg.reply, b'I can help')
    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sub = await nc.subscribe("help", "workers", help_request)
    # Send a request and expect a single response
    # and trigger timeout if not faster than 500 ms.
    try:
        response = await nc.request("help", b'help me', timeout=0.5)
        print("Received response: {message}".format(
            message=response.data.decode()))
    except TimeoutError:
        print("Request timed out")
    # Remove interest in subscription.
    await sub.unsubscribe()
    # Terminate connection to NATS.
    await nc.drain()
if __name__ == '__main__':
    asyncio.run(main())
JetStream
从v2.0.0版本开始,客户端现在支持JetStream
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
    nc = await nats.connect("localhost")
    # Create JetStream context.
    js = nc.jetstream()
    # Persist messages on 'foo's subject.
    await js.add_stream(name="sample-stream", subjects=["foo"])
    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print(ack)
    # Create pull based consumer on 'foo'.
    psub = await js.pull_subscribe("foo", "psub")
    # Fetch and ack messagess from consumer.
    for i in range(0, 10):
        msgs = await psub.fetch(1)
        for msg in msgs:
            await msg.ack()
            print(msg)
    # Create single ephemeral push based subscriber.
    sub = await js.subscribe("foo")
    msg = await sub.next_msg()
    await msg.ack()
    # Create single push based subscriber that is durable across restarts.
    sub = await js.subscribe("foo", durable="myapp")
    msg = await sub.next_msg()
    await msg.ack()
    # Create deliver group that will be have load balanced messages.
    async def qsub_a(msg):
        print("QSUB A:", msg)
        await msg.ack()
    async def qsub_b(msg):
        print("QSUB B:", msg)
        await msg.ack()
    await js.subscribe("foo", "workers", cb=qsub_a)
    await js.subscribe("foo", "workers", cb=qsub_b)
    for i in range(0, 10):
        ack = await js.publish("foo", f"hello world: {i}".encode())
        print("\t", ack)
    # Create ordered consumer with flow control and heartbeats
    # that auto resumes on failures.
    osub = await js.subscribe("foo", ordered_consumer=True)
    data = bytearray()
    while True:
        try:
            msg = await osub.next_msg()
            data.extend(msg.data)
        except TimeoutError:
            break
    print("All data in stream:", len(data))
    await nc.close()
if __name__ == '__main__':
    asyncio.run(main())
TLS
可以使用ssl上下文配置TLS连接
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations('ca.pem')
ssl_ctx.load_cert_chain(certfile='client-cert.pem',
                        keyfile='client-key.pem')
await nats.connect(servers=["tls://127.0.0.1:4443"], tls=ssl_ctx, tls_hostname="localhost")
在连接URL中将方案设置为tls将使客户端自动创建一个默认ssl上下文
import asyncio
import ssl
from nats.aio.client import Client as NATS
async def run():
    nc = NATS()
    await nc.connect("tls://demo.nats.io:4443")
注意:如果在OS X中遇到SSL证书错误,请首先尝试安装certifi证书包。例如,如果使用Python 3.7,则运行
$ /Applications/Python\ 3.7/Install\ Certificates.command
 -- pip install --upgrade certifi
Collecting certifi
...
 -- removing any existing file or link
 -- creating symlink to certifi certificate bundle
 -- setting permissions
 -- update complete
NKEYS和JWT用户凭据
从v0.9.0版本发布以来,您还可以选择安装NKEYS以使用新的NATS v2.0认证功能
pip install nats-py[nkeys]
用法
await nats.connect("tls://connect.ngs.global:4222", user_credentials="/path/to/secret.creds")
开发
- 安装nats服务器.
- 确保服务器已在您的PATH中可用:nats-server -v。
- 安装依赖项:python3 -m pipenv install --dev。
- 运行测试:python3 -m pytest。
许可协议
除非另有说明,否则NATS源文件是在LICENSE文件中找到的Apache Version 2.0许可证下分发的。
项目详情
    
       关闭
    
      
        
    
    
  
nats_py-2.9.0.tar.gz的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | 01886eb9e0a87f0ec630652cf1fae65d2a8556378a609bc6cc07d2ea60c8d0dd | |
| MD5 | c8f3db602047c0cb6fe985ab61b080eb | |
| BLAKE2b-256 | badc9a01dd9561b736622c0aa3a19f6b40f3eeb22051eaea1475ceb81d5da48d |