跳转到主要内容

一个简单的SQS任务队列。

项目描述

警告:此库仍在beta测试中。它有一个稳定的API,并且已在生产中部署,但我们尚未从大量用例中收到反馈,可能存在未知错误。

PyQS是一个简单的SQS任务管理器。其目标是提供一个简单、可靠的与SQS交互的celery兼容接口。它使用boto3在底层进行认证并与SQS通信。

安装

PyQS可以从PyPI获取,并且可以使用所有常规方式进行安装。要通过CLI安装

$ pip install pyqs

或者只需将其添加到您的requirements.txt文件中。

使用方法

PyQS使用一些非常简单的语义来创建和读取任务。这大部分来自于SQS拥有一个非常简单的API。

创建任务

向队列添加任务相当简单。

from pyqs import task

@task(queue='email')
def send_email(subject, message):
    pass

send_email.delay(subject='Hi there')

注意:这假设您已经在合适的环境变量中设置了您的AWS密钥,或者正在使用IAM角色。PyQS在与AWS通信方面没有做任何事情特别之处,它只创建了适当的boto连接。

如果您没有传递队列,PyQS将使用函数路径作为队列名称。例如以下函数位于email/tasks.py

@task()
def send_email(subject):
    pass

这将在email.tasks.send_email队列中显示。

您还可以指定函数路径,如果您想引用不同项目中的函数

@task(custom_function_path="foo.bar.send_email")
# This references function send_email in foo/bar.py instead of email/tasks.py
def send_email(subject):
    pass

读取任务

读取任务需要运行PyQS。如果任务已经在您的 PYTHON_PATH 中,可以直接运行。

$ pyqs email.tasks.send_email

如果我们想运行具有特定前缀的所有任务,这基于Python的fnmatch

$ pyqs email.*

我们也可以通过逗号分隔来在一次调用中读取多个不同的队列。

$ pyqs send_email,read_email,write_email

如果您想运行更多的工作进程来处理任务,可以增加并发量。这将产生额外的进程来处理消息。

$ pyqs send_email --concurrency 10

简单进程工作进程

为了使用一个处理原始实现中一些边缘情况的简单版本PyQS,传递simple-worker标志。

$ pyqs send_email --simple-worker

简单进程工作进程与原始实现有以下不同。

  • 不使用内部队列,并移除了对prefetch-multiplier标志的支持。这有助于简化所需的思维模型,因为消息既不在SQS队列中也不在内部队列中。

  • 当传递simple-worker标志时,默认的batchsize为1而不是10。这是可配置的。

  • 读取或处理从SQS的消息时不检查可见性超时。
    • 允许工作进程在消息超过其可见性超时后继续处理消息,这意味着我们解决了在max_receives=1时,如果错误地设置了较短的可见性超时并超过了可见性超时,消息将永远不会被处理的问题。以前,如果配置了DLQ,此消息将最终进入DLQ,并且永远不会实际处理。

    • 这增加了我们处理消息多次的概率,尤其是当batchsize > 1时,但开发人员可以通过检查消息是否已经处理过来自解决此问题。

钩子

PyQS有一个事件注册表,可以在每次任务运行前后运行一个函数。

from pyqs import task, events

def print_pre_process(context):
    print({"pre_process": context})

def print_post_process(context):
    print({"pre_process": context})

events.register_event("pre_process", print_pre_process)
events.register_event("post_process", print_post_process)

@task(queue="my_queue")
def send_email(subject):
    pass

操作说明

死信队列

建议为PyQS管理的任何队列使用死信队列。这是因为当前的消息检索策略不会在初次接收时删除消息。只有当消息成功完成后,消息才会从SQS中删除。如果您来自Celery和SQS,这可能是意外的行为。Celery试图内部管理此行为,但效果各异。

如果在消息处理过程中出现错误,它将被丢弃,并在可见性超时后重新出现。这可能导致存在永远不会离开队列并持续抛出错误的消息。死信队列通过收集已经重试指定次数的消息来帮助解决这个问题。

工作进程自毁

每个进程工作进程在处理100个任务(或处理失败)后将自己关闭。这是为了防止陈旧连接长时间滞留并阻止任务永久。此外,它有助于防止内存泄漏,尽管方式相当粗暴。进程工作进程关闭后,管理进程应立即注意并快速重启它。当前的100是硬编码的,但可能是可配置的。

队列阻塞

尽管有多个工作进程用于从不同的队列读取,但它们都附加到同一个内部队列。这意味着如果有一个队列有很多快速任务,另一个队列有少量慢速任务,它们可以相互阻塞,快速任务可以积压在慢速任务之后。最简单的解决方案就是运行两个不同的PyQS命令,一个针对每个队列,并设置适当的并发设置。

可见性超时

我们注意不处理已超过其队列可见超时的消息。目标是为了防止任务重复处理。然而,这种情况仍然很有可能发生,因为我们没有在任务周围使用事务语义。因此,根据您任务预期的长度,正确设置队列的可见超时非常重要。如果超时时间太短,任务将被处理两次,或者非常缓慢。如果超时时间太长,临时失败将延迟消息并大大降低队列吞吐量。这与上面描述的队列阻塞有关。SQS队列是免费的,所以保持每个队列中存储的消息尽可能同质是一个好习惯。

兼容性

Celery

PyQS是为了在我们的基础设施中替换Celery而创建的。为了实现这个目标,我们想确保与基本的Celery API兼容。为此,您可以在基于Celery的系统上轻松尝试使用PyQS。PyQS可以读取Celery写入SQS的消息。它将读取

pickle

json

序列化的SQS消息(尽管我们推荐JSON)。

操作系统

UNIX。由于使用了

os.getppid

系统调用,如果有人真的想要Windows支持,这个功能可能可以被解决。

Boto3

目前PyQS只支持将一些基本连接参数显式传递给连接。任何由执行以透明地查找连接凭证的工作,如IAM角色,仍然可以正常工作。

当从命令行运行PyQS时,您可以通过传递

--region

--access-key-id

--secret-access-key

来覆盖默认值。

注意事项

持久性

当我们从SQS读取一批消息时,我们会尝试将它们添加到我们的内部队列中,直到我们超过队列的可见超时。一旦超过,我们将丢弃这些消息并获取新的批次。此外,当一个进程工作员从内部队列获取消息时,将检查从SQS获取消息的时间与队列的可见超时时间,如果超过了超时,则丢弃该消息。目标是减少重复处理。然而,这个系统不提供事务,存在可能处理超过可见超时的消息的情况。您需要确保您能够处理这种边缘情况。

任务导入

目前还没有高级逻辑来确定导入任务处理的模块位置。PyQS将尝试使用

importlib

获取模块,然后在模块中查找任务。目前我们将PyQS的使用包装在Django管理命令中,这简化了任务导入。我们直接调用**_main()**方法,跳过

main()

,因为它只执行参数解析。

在容器内运行

PyQS假设进程ID不是1。如果您在容器内运行PyQS,您应该用supervisor或类似dummy-init的东西包装它。

为什么不直接使用Celery呢?

我们喜欢Celery。我们(Yipit.com)甚至赞助了原始SQS实现。然而,SQS与其他Celery支持的备份有很大不同。此外,Celery团队没有资源在履行其他职责的同时创建一个健壮的SQS实现。这意味着SQS携带了很多额外的功能和复杂的代码库,这使调试变得困难。

我们亲自经历了Celery的一些非常棘手的资源泄露问题,难以追踪。对于我们的用例,切换到一个我们完全理解的简单库要简单得多。随着这个库的发展,这可能会改变,转换的成本可能不值得。然而,我们希望为使用python和SQS的其他人提供使用更简单设置的选项。

变更日志

1.0.1

  • 为跟踪任务执行添加MessageId

1.0.0

0.1.6

  • 修复botocore客户端损坏的pickle问题。

0.1.5

  • 为预处理和后处理添加事件钩子。

0.1.4

  • 改进在启动工作器后创建队列时的行为。现在,工作器将每30秒刷新一次队列,以检查新的队列。

0.1.3

  • 将PPID检查更改为检查实际的父ID,而不是PID 1。这解决了在docker容器中运行时,期望PPID为1的问题。

0.1.2

  • 419ce2e 合并pull request #56 from orangain/honor-aws-region

  • 7c793d0 合并pull request #55 from orangain/fix-indentation-error

  • 0643fbb 尊重由 .aws/config 或环境变量配置的aws区域

  • f5c1db9 修复缩进错误

  • cdae257 合并pull request #52 from cedarai/master

  • a2ac378 合并pull request #53 from p1c2u/fix/nosetest-remove-stop-parameter

  • dbaa391 合并pull request #51 from p1c2u/fix/pep8-styles-fixes

  • 1577382 Nosetest移除停止参数

  • b7420e3 将当前目录添加到PYTHONPATH

  • 8d04b62 优雅关闭日志消息修复

  • 796acbc PEP8样式修复

  • 72dcb62 合并pull request #50 from hobbsh/add_example

  • d00d31f 更新readme

  • dfbf459 使用 .delay() 提交消息

  • 612158f 合并pull request #49 from hobbsh/no_log_0_msg

  • 09a649f 使用logger.debug记录成功的SQS日志行

  • dfd56c3 修复readme中的拼写错误

  • a774155 添加示例flask应用

  • 17e7b7c 在没有消息时不要记录消息检索成功

  • 14eb827 添加关闭信号日志。

0.1.1

  • 修复没有队列的账户上的KeyError

0.1.0

  • 升级到boto3

0.0.22

  • 修复Python 3支持

  • 允许在调用时覆盖 delay_seconds 属性

0.0.21

  • 使用 --prefetch-multiplier 调整 PREFETCH_MULTIPLIER

0.0.20

  • 在ManagerWorker上调整内部队列大小时,尊重 --batch-size

0.0.19

  • 添加运行时可以调整BATCHSIZE和INTERVAL的能力。感谢@masayang

  • 添加创建具有可见性延迟的任务的能力。感谢@joshbuddy

  • 添加创建具有自定义函数位置的任务的能力,允许跨项目任务

0.0.18

  • 将变更日志转换为.rst格式

  • 将变更日志添加到Pypi上的长描述中。感谢@adamchainz

0.0.17

  • 修复README中的拼写错误

  • 在README中添加有关死信队列的说明

0.0.16

  • 将README转换为reStructuredText (.rst) 格式,以便在PyPI上渲染

0.0.15

  • 处理工作器在尝试处理100次请求后自杀,而不是检查内部队列100次。

  • 如果我们发现内部队列中没有消息,则在重新检查之前暂停一会儿。

0.0.14

  • 处理工作器在处理100次请求后自杀

  • 处理工作器在处理之前检查消息的抓取时间和可见性超时,如果它已超过超时则丢弃它。

  • 将处理任务所用的 process_time() 记录到INFO级别。

0.0.13

  • 仅传递SQS队列ID到内部队列。这是尝试修复处理来自多个队列的消息时的错误。

0.0.12

  • 删除多余的调试代码

0.0.11

  • 添加额外的调试来调查消息删除错误

0.0.10

  • 为每个处理工作器提供自己的boto连接,以避免在删除消息时出现多进程竞争条件

0.0.9

  • 将长轮询间隔更改为有效值,0<=LPI<=20

0.0.8

  • 当从SQS拉取消息时切换到长轮询。

  • 将消息删除从SQS移动到消息处理之后。

0.0.7

  • 增加了读取JSON编码的celery消息的能力。

0.0.6

  • 将关闭日志切换到INFO级别。

  • 在消息检索循环中添加了短暂的睡眠,这样我们就不显得在大量使用CPU。

0.0.5

  • 将任务失败日志切换到ERROR(这次实际上是这样做)

  • 将任务成功日志切换到INFO

  • 为从SQS队列中检索的消息数量添加了INFO级别的日志。

  • 将读取器和工作者进程计数移动到DEBUG

0.0.4

  • 在创建连接时,增加了传递regionaccess_key_idsecret_access_key到Boto的能力。

  • 将任务失败的日志记录从INFO切换到ERROR日志记录器。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

pyqs-1.0.1.tar.gz (21.6 kB 查看哈希)

上传时间

构建分布

pyqs-1.0.1-py3-none-any.whl (16.1 kB 查看哈希)

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页