基于aioamqp构建的消费者/生产者/rpc库
项目描述
- info:
基于aioamqp构建的消费者/生产者/rpc库
安装
pip install aioamqp_consumer
消费者/生产者使用
import asyncio
from aioamqp_consumer import Consumer, Producer
async def task(payload, properties):
await asyncio.sleep(1)
print(payload)
async def main():
amqp_url = 'amqp://guest:guest@127.0.0.1:5672//'
amqp_queue = 'your-queue-here'
queue_kwargs = {
'durable': True,
}
# https://aioamqp.readthedocs.io/en/latest/api.html#aioamqp.connect
amqp_kwargs = {}
async with Producer(amqp_url, amqp_kwargs=amqp_kwargs) as producer:
for _ in range(5):
await producer.publish(
b'hello',
amqp_queue,
queue_kwargs=queue_kwargs,
)
consumer = Consumer(
amqp_url,
task,
amqp_queue,
queue_kwargs=queue_kwargs,
amqp_kwargs=amqp_kwargs,
)
await consumer.scale(20) # scale up to 20 background coroutines
await consumer.scale(5) # downscale to 5 background coroutines
# wait for rabbitmq queue is empty and all local messages are processed
await consumer.join()
consumer.close()
await consumer.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
RPC使用
import asyncio
from aioamqp_consumer import RpcClient, RpcServer, rpc
payload = b'test'
@rpc(queue_name='random_queue')
async def method(payload):
print(payload)
return payload
async def main():
amqp_url = 'amqp://guest:guest@127.0.0.1:5672//'
server = RpcServer(amqp_url, method=method)
client = RpcClient(amqp_url)
ret = await client.wait(method(payload))
assert ret == payload
await client.close()
await server.stop()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
关于内置的json编码/解码,请参阅aioamqp_consumer.json_rpc
在生产部署aioamqp_consumer.Consumer/aioamqp_consumer.RpcServer时,有内置的简单运行器
from aioamqp_consumer import RpcServer, json_rpc
amqp_url = 'amqp://guest:guest@127.0.0.1:5672//'
@json_rpc(queue_name='random_queue')
async def square(*, x):
ret = x ** 2
print(x, ret)
return ret
if __name__ == '__main__':
RpcServer(amqp_url, method=square).run()
感谢
此库由Ocean S.A.捐赠
感谢公司做出贡献。
项目详情
关闭
aioamqp_consumer-0.3.4.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | b1e55fb0d3d964909a1f659c908f8cfa92445363194644746fa6604a183b1e72 |
|
MD5 | 64603a812c56dd78b96feee3189f7ee8 |
|
BLAKE2b-256 | d4c2e16b6d610c486612df9b236b94cb87dde2111bffbf7cd209515c8b2881f1 |