跳转到主要内容

基于aioamqp构建的消费者/生产者/rpc库

项目描述

info:

基于aioamqp构建的消费者/生产者/rpc库

https://img.shields.io/travis/aio-libs/aioamqp_consumer.svg https://img.shields.io/pypi/v/aioamqp_consumer.svg

安装

pip install aioamqp_consumer

消费者/生产者使用

import asyncio

from aioamqp_consumer import Consumer, Producer


async def task(payload, properties):
    await asyncio.sleep(1)
    print(payload)


async def main():
    amqp_url = 'amqp://guest:guest@127.0.0.1:5672//'
    amqp_queue = 'your-queue-here'
    queue_kwargs = {
        'durable': True,
    }
    # https://aioamqp.readthedocs.io/en/latest/api.html#aioamqp.connect
    amqp_kwargs = {}

    async with Producer(amqp_url, amqp_kwargs=amqp_kwargs) as producer:
        for _ in range(5):
            await producer.publish(
                b'hello',
                amqp_queue,
                queue_kwargs=queue_kwargs,
            )

    consumer = Consumer(
        amqp_url,
        task,
        amqp_queue,
        queue_kwargs=queue_kwargs,
        amqp_kwargs=amqp_kwargs,
    )
    await consumer.scale(20)  # scale up to 20 background coroutines
    await consumer.scale(5)  # downscale to 5 background coroutines
    # wait for rabbitmq queue is empty and all local messages are processed
    await consumer.join()
    consumer.close()
    await consumer.wait_closed()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

RPC使用

import asyncio

from aioamqp_consumer import RpcClient, RpcServer, rpc

payload = b'test'


@rpc(queue_name='random_queue')
async def method(payload):
    print(payload)
    return payload


async def main():
    amqp_url = 'amqp://guest:guest@127.0.0.1:5672//'

    server = RpcServer(amqp_url, method=method)

    client = RpcClient(amqp_url)

    ret = await client.wait(method(payload))

    assert ret == payload

    await client.close()

    await server.stop()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

关于内置的json编码/解码,请参阅aioamqp_consumer.json_rpc

在生产部署aioamqp_consumer.Consumer/aioamqp_consumer.RpcServer时,有内置的简单运行器

from aioamqp_consumer import RpcServer, json_rpc

amqp_url = 'amqp://guest:guest@127.0.0.1:5672//'


@json_rpc(queue_name='random_queue')
async def square(*, x):
    ret = x ** 2

    print(x, ret)

    return ret

if __name__ == '__main__':
    RpcServer(amqp_url, method=square).run()

感谢

此库由Ocean S.A.捐赠

感谢公司做出贡献。

项目详情


下载文件

下载您平台对应的文件。如果您不确定该选择哪一个,请了解更多关于安装包的信息。

源分布

aioamqp_consumer-0.3.4.tar.gz (14.0 kB 查看哈希值)

上传时间

由以下组织支持

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面