一个简单的SQS任务队列。
项目描述
警告:此库仍在beta测试中。它有一个稳定的API,并且已在生产中部署,但我们尚未从大量用例中收到反馈,可能存在未知错误。
PyQS是一个简单的SQS任务管理器。其目标是提供一个简单、可靠的与SQS交互的celery兼容接口。它使用boto3在底层进行认证并与SQS通信。
安装
PyQS可以从PyPI获取,并且可以使用所有常规方式进行安装。要通过CLI安装
$ pip install pyqs
或者只需将其添加到您的requirements.txt文件中。
使用方法
PyQS使用一些非常简单的语义来创建和读取任务。这大部分来自于SQS拥有一个非常简单的API。
创建任务
向队列添加任务相当简单。
from pyqs import task
@task(queue='email')
def send_email(subject, message):
pass
send_email.delay(subject='Hi there')
注意:这假设您已经在合适的环境变量中设置了您的AWS密钥,或者正在使用IAM角色。PyQS在与AWS通信方面没有做任何事情特别之处,它只创建了适当的boto连接。
如果您没有传递队列,PyQS将使用函数路径作为队列名称。例如以下函数位于email/tasks.py。
@task()
def send_email(subject):
pass
这将在email.tasks.send_email队列中显示。
您还可以指定函数路径,如果您想引用不同项目中的函数
@task(custom_function_path="foo.bar.send_email")
# This references function send_email in foo/bar.py instead of email/tasks.py
def send_email(subject):
pass
读取任务
读取任务需要运行PyQS。如果任务已经在您的 PYTHON_PATH 中,可以直接运行。
$ pyqs email.tasks.send_email
如果我们想运行具有特定前缀的所有任务,这基于Python的fnmatch。
$ pyqs email.*
我们也可以通过逗号分隔来在一次调用中读取多个不同的队列。
$ pyqs send_email,read_email,write_email
如果您想运行更多的工作进程来处理任务,可以增加并发量。这将产生额外的进程来处理消息。
$ pyqs send_email --concurrency 10
简单进程工作进程
为了使用一个处理原始实现中一些边缘情况的简单版本PyQS,传递simple-worker标志。
$ pyqs send_email --simple-worker
简单进程工作进程与原始实现有以下不同。
不使用内部队列,并移除了对prefetch-multiplier标志的支持。这有助于简化所需的思维模型,因为消息既不在SQS队列中也不在内部队列中。
当传递simple-worker标志时,默认的batchsize为1而不是10。这是可配置的。
- 读取或处理从SQS的消息时不检查可见性超时。
允许工作进程在消息超过其可见性超时后继续处理消息,这意味着我们解决了在max_receives=1时,如果错误地设置了较短的可见性超时并超过了可见性超时,消息将永远不会被处理的问题。以前,如果配置了DLQ,此消息将最终进入DLQ,并且永远不会实际处理。
这增加了我们处理消息多次的概率,尤其是当batchsize > 1时,但开发人员可以通过检查消息是否已经处理过来自解决此问题。
钩子
PyQS有一个事件注册表,可以在每次任务运行前后运行一个函数。
from pyqs import task, events
def print_pre_process(context):
print({"pre_process": context})
def print_post_process(context):
print({"pre_process": context})
events.register_event("pre_process", print_pre_process)
events.register_event("post_process", print_post_process)
@task(queue="my_queue")
def send_email(subject):
pass
操作说明
死信队列
建议为PyQS管理的任何队列使用死信队列。这是因为当前的消息检索策略不会在初次接收时删除消息。只有当消息成功完成后,消息才会从SQS中删除。如果您来自Celery和SQS,这可能是意外的行为。Celery试图内部管理此行为,但效果各异。
如果在消息处理过程中出现错误,它将被丢弃,并在可见性超时后重新出现。这可能导致存在永远不会离开队列并持续抛出错误的消息。死信队列通过收集已经重试指定次数的消息来帮助解决这个问题。
工作进程自毁
每个进程工作进程在处理100个任务(或处理失败)后将自己关闭。这是为了防止陈旧连接长时间滞留并阻止任务永久。此外,它有助于防止内存泄漏,尽管方式相当粗暴。进程工作进程关闭后,管理进程应立即注意并快速重启它。当前的100是硬编码的,但可能是可配置的。
队列阻塞
尽管有多个工作进程用于从不同的队列读取,但它们都附加到同一个内部队列。这意味着如果有一个队列有很多快速任务,另一个队列有少量慢速任务,它们可以相互阻塞,快速任务可以积压在慢速任务之后。最简单的解决方案就是运行两个不同的PyQS命令,一个针对每个队列,并设置适当的并发设置。
可见性超时
我们注意不处理已超过其队列可见超时的消息。目标是为了防止任务重复处理。然而,这种情况仍然很有可能发生,因为我们没有在任务周围使用事务语义。因此,根据您任务预期的长度,正确设置队列的可见超时非常重要。如果超时时间太短,任务将被处理两次,或者非常缓慢。如果超时时间太长,临时失败将延迟消息并大大降低队列吞吐量。这与上面描述的队列阻塞有关。SQS队列是免费的,所以保持每个队列中存储的消息尽可能同质是一个好习惯。
兼容性
Celery
PyQS是为了在我们的基础设施中替换Celery而创建的。为了实现这个目标,我们想确保与基本的Celery API兼容。为此,您可以在基于Celery的系统上轻松尝试使用PyQS。PyQS可以读取Celery写入SQS的消息。它将读取
pickle
和json
序列化的SQS消息(尽管我们推荐JSON)。操作系统
UNIX。由于使用了
os.getppid
系统调用,如果有人真的想要Windows支持,这个功能可能可以被解决。Boto3
目前PyQS只支持将一些基本连接参数显式传递给连接。任何由
当从命令行运行PyQS时,您可以通过传递
--region
、--access-key-id
和--secret-access-key
来覆盖默认值。注意事项
持久性
当我们从SQS读取一批消息时,我们会尝试将它们添加到我们的内部队列中,直到我们超过队列的可见超时。一旦超过,我们将丢弃这些消息并获取新的批次。此外,当一个进程工作员从内部队列获取消息时,将检查从SQS获取消息的时间与队列的可见超时时间,如果超过了超时,则丢弃该消息。目标是减少重复处理。然而,这个系统不提供事务,存在可能处理超过可见超时的消息的情况。您需要确保您能够处理这种边缘情况。
任务导入
目前还没有高级逻辑来确定导入任务处理的模块位置。PyQS将尝试使用
importlib
获取模块,然后在模块中查找任务。目前我们将PyQS的使用包装在Django管理命令中,这简化了任务导入。我们直接调用**_main()**方法,跳过main()
,因为它只执行参数解析。在容器内运行
PyQS假设进程ID不是1。如果您在容器内运行PyQS,您应该用supervisor或类似dummy-init的东西包装它。
为什么不直接使用Celery呢?
我们喜欢Celery。我们(Yipit.com)甚至赞助了原始SQS实现。然而,SQS与其他Celery支持的备份有很大不同。此外,Celery团队没有资源在履行其他职责的同时创建一个健壮的SQS实现。这意味着SQS携带了很多额外的功能和复杂的代码库,这使调试变得困难。
我们亲自经历了Celery的一些非常棘手的资源泄露问题,难以追踪。对于我们的用例,切换到一个我们完全理解的简单库要简单得多。随着这个库的发展,这可能会改变,转换的成本可能不值得。然而,我们希望为使用python和SQS的其他人提供使用更简单设置的选项。
变更日志
1.0.1
为跟踪任务执行添加MessageId
1.0.0
放弃对Py2的支持
添加新的SimpleProcessWorker (https://github.com/spulec/PyQS/pull/76)
0.1.6
修复botocore客户端损坏的pickle问题。
0.1.5
为预处理和后处理添加事件钩子。
0.1.4
改进在启动工作器后创建队列时的行为。现在,工作器将每30秒刷新一次队列,以检查新的队列。
0.1.3
将PPID检查更改为检查实际的父ID,而不是PID 1。这解决了在docker容器中运行时,期望PPID为1的问题。
0.1.2
419ce2e 合并pull request #56 from orangain/honor-aws-region
7c793d0 合并pull request #55 from orangain/fix-indentation-error
0643fbb 尊重由 .aws/config 或环境变量配置的aws区域
f5c1db9 修复缩进错误
cdae257 合并pull request #52 from cedarai/master
a2ac378 合并pull request #53 from p1c2u/fix/nosetest-remove-stop-parameter
dbaa391 合并pull request #51 from p1c2u/fix/pep8-styles-fixes
1577382 Nosetest移除停止参数
b7420e3 将当前目录添加到PYTHONPATH
8d04b62 优雅关闭日志消息修复
796acbc PEP8样式修复
72dcb62 合并pull request #50 from hobbsh/add_example
d00d31f 更新readme
dfbf459 使用 .delay() 提交消息
612158f 合并pull request #49 from hobbsh/no_log_0_msg
09a649f 使用logger.debug记录成功的SQS日志行
dfd56c3 修复readme中的拼写错误
a774155 添加示例flask应用
17e7b7c 在没有消息时不要记录消息检索成功
14eb827 添加关闭信号日志。
0.1.1
修复没有队列的账户上的KeyError
0.1.0
升级到boto3
0.0.22
修复Python 3支持
允许在调用时覆盖 delay_seconds 属性
0.0.21
使用 --prefetch-multiplier 调整 PREFETCH_MULTIPLIER。
0.0.20
在ManagerWorker上调整内部队列大小时,尊重 --batch-size。
0.0.19
添加运行时可以调整BATCHSIZE和INTERVAL的能力。感谢@masayang
添加创建具有可见性延迟的任务的能力。感谢@joshbuddy
添加创建具有自定义函数位置的任务的能力,允许跨项目任务
0.0.18
将变更日志转换为.rst格式
将变更日志添加到Pypi上的长描述中。感谢@adamchainz
0.0.17
修复README中的拼写错误
在README中添加有关死信队列的说明
0.0.16
将README转换为reStructuredText (.rst) 格式,以便在PyPI上渲染
0.0.15
处理工作器在尝试处理100次请求后自杀,而不是检查内部队列100次。
如果我们发现内部队列中没有消息,则在重新检查之前暂停一会儿。
0.0.14
处理工作器在处理100次请求后自杀
处理工作器在处理之前检查消息的抓取时间和可见性超时,如果它已超过超时则丢弃它。
将处理任务所用的 process_time() 记录到INFO级别。
0.0.13
仅传递SQS队列ID到内部队列。这是尝试修复处理来自多个队列的消息时的错误。
0.0.12
删除多余的调试代码
0.0.11
添加额外的调试来调查消息删除错误
0.0.10
为每个处理工作器提供自己的boto连接,以避免在删除消息时出现多进程竞争条件
0.0.9
将长轮询间隔更改为有效值,0<=LPI<=20
0.0.8
当从SQS拉取消息时切换到长轮询。
将消息删除从SQS移动到消息处理之后。
0.0.7
增加了读取JSON编码的celery消息的能力。
0.0.6
将关闭日志切换到INFO级别。
在消息检索循环中添加了短暂的睡眠,这样我们就不显得在大量使用CPU。
0.0.5
将任务失败日志切换到ERROR(这次实际上是这样做)
将任务成功日志切换到INFO
为从SQS队列中检索的消息数量添加了INFO级别的日志。
将读取器和工作者进程计数移动到DEBUG
0.0.4
在创建连接时,增加了传递region、access_key_id和secret_access_key到Boto的能力。
将任务失败的日志记录从INFO切换到ERROR日志记录器。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分布
构建分布
pyqs-1.0.1.tar.gz的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 56a36bfa88020111f6dc58ef485f525d47d08e5d195fc45fc1f54ec24a91bad7 |
|
MD5 | f09c0bdc6406e607c830423bbc93c90a |
|
BLAKE2b-256 | f2dc3199101fb15909a89421cef68d7045f9759e7f0bfcd9a40c0cc3b5f3ec44 |
pyqs-1.0.1-py3-none-any.whl的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | fdd97625afeb4f675008a41667b926307bf248b19c9a32f566feea0af2ae7600 |
|
MD5 | dbeab5f463e730c3dc298001b143fce3 |
|
BLAKE2b-256 | ae3d53da6006926985c0c3346f431c4e3ed63d6045c6ad7e6ae12367e5d298cd |