跳转到主要内容

TaskIQ的SQS代理

项目描述

TaskIQ SQS代理

主要适用于TaskIQ的通用SQS异步代理。

过期

如果您将sqs_expiry标签设置为Unix时间戳,则如果工作进程在该时间之后收到消息,则消息将被丢弃。

import asyncio
from taskiq_sqs import SQSBroker

broker = SQSBroker("http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/my-queue")

@broker.task
async def add_one(value: int) -> int:
    return value + 1


async def main() -> None:
    # Never forget to call startup in the beginning.
    await broker.startup()
    # Send the task to the broker.
    task = await add_one.kiq(1)
    # Wait for the result. (result backend must be configured)
    result = await task.wait_result(timeout=2)
    print(f"Task execution took: {result.execution_time} seconds.")
    if not result.is_err:
        print(f"Returned value: {result.return_value}")
    else:
        print("Error found while executing task.")
    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源代码分发

taskiq_sqs-0.0.11.tar.gz (17.8 kB 查看哈希值)

上传时间 源代码

构建分发

taskiq_sqs-0.0.11-py3-none-any.whl (13.8 kB 查看哈希值)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面