Amazon SQS的具有观点的队列处理器
项目描述
SQS Workers
Amazon SQS的具有观点的队列处理器。
用法
使用以下命令安装软件包
pip install sqs-workers
使用以下配置您的boto3库以提供安装访问要求 例如这样
aws configure
不要忘记设置您首选的AWS区域。
然后,您将开始管理两个系统(最可能是从相同的代码库):一个将消息添加到队列,另一个执行它们。
from sqs_workers import SQSEnv, create_standard_queue
# This environment will use AWS requisites from ~/.aws/ directory
sqs = SQSEnv()
# Create a new queue.
# Note that you can use the AWS web interface for the same action as well, the
# web interface provides more options. You only need to do it once.
create_standard_queue(sqs, "emails")
# Get the queue object
queue = sqs.queue("emails")
# Register a queue processor
@queue.processor("send_email")
def send_email(to, subject, body):
print(f"Sending email {subject} to {to}")
然后有两种方法向队列添加任务。经典(又称“显式”)
queue.add_job("send_email", to="user@example.com", subject="Hello world", body="hello world")
以及“Celery方式”(我们在一定程度上模拟了Celery API)
send_email.delay(to="user@example.com", subject="Hello world", body="hello world")
要处理队列,您需要手动运行工作者。创建一个新文件,其中将包含sqs对象的定义并注册所有处理器(很可能通过从您的项目中导入必要的模块),然后运行SQS
from sqs_workers import SQSEnv
sqs = SQSEnv()
...
sqs.queue('emails').process_queue()
在生产环境中,我们通常不在一个进程中处理多个队列,但在开发环境中,使用以下方法一次性处理所有队列会更简单
sqs.process_queues()
序列化
有两种序列化器:JSON和pickle。
预烘焙任务
您可以创建所谓的“预烘焙异步任务”,这些任务除了任务本身外,还包含用于调用任务的参数。
将预烘焙任务视为Celery签名的轻量级版本
task = send_email.bake(to='user@example.com', subject='Hello world', body='hello world')
task.delay()
等同于
send_email.delay(to='user@example.com', subject='Hello world', body='hello world')
批量写入
如果您有很多任务要入队,则在添加时使用批量处理可能更有效
# Classic ("explicit") API
with queue.add_batch():
queue.add_job("send_email", to="user1@example.com", subject="Hello world", body="hello world")
queue.add_job("send_email", to="user2@example.com", subject="Hello world", body="hello world")
# "Celery way"
with send_email.batch():
send_email.delay(to="user1@example.com", subject="Hello world", body="hello world")
send_email.delay(to="user2@example.com", subject="Hello world", body="hello world")
当启用批量处理时
- 任务以10个一批添加到SQS中,减少了AWS操作的数量
- 无法获取任务
MessageId
,因为它是在批量发送后才知的 - 需要注意消息大小,因为SQS限制了单个消息大小和最大总有效负载大小为256 kB。
批量读取
如果您希望一次性消费和处理队列中的消息批量(例如,加快摄取),您可以在队列级别设置batching_policy
。
期望基础函数有一个单个参数,该参数将接收消息列表。
from sqs_workers.batching import BatchMessages
from sqs_workers import SQSEnv, create_standard_queue
sqs = SQSEnv()
create_standard_queue(sqs, "emails")
queue = sqs.queue("emails", batching_policy=BatchMessages(batch_size=10))
@queue.processor("send_email")
def send_email(messages: list):
for email in messages:
print(f"Sending email {email['subject']} to {email['to']}")
此函数将根据以下内容一次性接收最多10条消息
- 在正在消费的队列上有多少可用
- 在
emails
队列上,有多少消息是为send_email
作业的
同步任务执行
在Celery中,您只需将任务作为函数调用并传递参数,就可以同步运行任何任务。我们的AsyncTask对于这种情况会引发RuntimeError。
send_email(to='user@example.com', subject='Hello world', body='hello world')
...
RuntimeError: Async task email.send_email called synchronously (probably,
by mistake). Use either AsyncTask.run(...) to run the task synchronously
or AsyncTask.delay(...) to add it to the queue.
如果您想同步运行任务,请使用任务的run()
方法。
send_email.run(to='user@example.com', subject='Hello world', body='hello world')
FIFO队列
可以使用create_fifo_queue
创建FIFO队列,其名称必须以".fifo"结尾。
from sqs_workers import SQSEnv, create_fifo_queue
sqs = SQSEnv()
create_fifo_queue(sqs, 'emails_dead.fifo')
create_fifo_queue(sqs, 'emails.fifo',
redrive_policy=sqs.redrive_policy('emails_dead.fifo', 3)
)
除非设置了content_based_deduplication
标志,否则每个消息都必须使用属性_deduplication_id
发送。默认情况下,所有消息都具有相同的消息组default
,但您可以使用_group_id
更改它。
sqs.queue("emails.fifo").add_job(
'send_email', _deduplication_id=subject, _group_id=email, **kwargs)
异常处理
如果任务处理最终导致异常,则错误会被记录,任务会在一段时间后返回到队列。确切的行为由队列设置定义。
自定义处理器
如果您需要在执行特定任务之前或之后执行一些特定操作,可以定义自己的处理器。
自定义处理器的示例
from sqs_workers import SQSEnv
from sqs_workers.processors import Processor
class CustomProcessor(Processor):
def process(self, job_kwargs):
print(f'Processing {self.queue_name}.{self.job_name} with {job_kwargs}')
super(CustomProcessor, self).process(job_kwargs)
sqs = SQSEnv(processor_maker=CustomProcessor)
与上下文一起工作
上下文通过作业消息隐式传递给工作者,可用于日志记录或分析目的。
使用示例。
queue = sqs.queue("q1")
@queue.processor('q1', 'hello_world', pass_context=True)
def hello_world(username=None, context=None):
print(f'Hello {username} from {context["remote_addr"]}')
with sqs.context(remote_addr='127.0.0.1'):
hello_world.delay(username='Foo')
queue.process_batch()
或者,您可以这样设置上下文。
sqs.context['remote_addr'] = '127.0.0.1'
hello_world.delay(username='Foo')
然后,当需要清除上下文时
sqs.context.clear()
在Web应用程序中,您通常在处理Web请求的末尾调用它。
对所有任务自动应用上下文
您可以在通过子类化它们来处理处理器的处理器中执行此操作,而不是在处理函数内部处理上下文。
import os
from sqs_workers import SQSEnv
from sqs_workers.processors import Processor
class CustomProcessor(Processor):
def process(self, job_kwargs, job_context):
os.environ['REMOTE_ADDR'] = job_context['remote_addr']
super(CustomProcessor, self).process(job_kwargs, job_context)
os.unsetenv('REMOTE_ADDR')
sqs = SQSEnv(
processor_maker=CustomProcessor,
)
原始队列
原始队列只能有一个处理器,该处理器应是一个函数,它只接受消息作为其唯一参数。
原始队列有助于处理从外部源(如CloudWatch事件)添加到SQS的消息。
您以相同的方式启动处理器,如果需要,创建一个新的标准队列。
from sqs_workers import SQSEnv, create_standard_queue
sqs = SQSEnv()
create_standard_queue(sqs, 'cron')
然后你得到一个队列,但给它提供一个 queue_maker 参数,以创建所需类型的队列,并为其定义一个处理器。
from sqs_workers import RawQueue
cron = sqs.queue('cron', RawQueue)
@cron.raw_processor()
def processor(message):
print(message.body)
然后像往常一样开始处理你的队列
cron.process_queue()
你也可以向队列发送原始消息,但这可能不太有用
cron.add_raw_job("Hello world")
处理 CloudWatch 的消息
默认情况下,CloudWatch 调度器的消息体具有以下 JSON 结构。
{
"version": "0",
"id": "a9a10406-9a1f-0ddc-51ae-08db551fac42",
"detail-type": "Scheduled Event",
"source": "aws.events",
"account": "NNNNNNNNNN",
"time": "2019-09-20T09:19:56Z",
"region": "eu-west-1",
"resources": ["arn:aws:events:eu-west-1:NNNNNNNNNN:rule/Playground"],
"detail": {}
}
消息头
{
'SenderId': 'AIDAJ2E....',
'ApproximateFirstReceiveTimestamp': '1568971264367',
'ApproximateReceiveCount': '1',
'SentTimestamp': '1568971244845',
}
尽管如此,你可以传递任何有效的 JSON 作为消息,并且它将原样传递到消息体中。例如
{ "message": "Hello world" }
死信队列和重试
在创建队列时,你可以设置回退死信队列和重试策略,其可能如下所示。
from sqs_workers import SQSEnv, create_standard_queue
sqs = SQSEnv()
create_standard_queue(sqs, 'emails_dead')
create_standard_queue(sqs, 'emails',
redrive_policy=sqs.redrive_policy('emails_dead', 3)
)
这意味着“在四次(3 + 1)尝试发送给收件人失败后,将消息移动到 email_deadletters 队列”。
回退策略
你可以为整个环境或特定队列定义回退策略。
queue = sqs.queue("emails", backoff_policy=DEFAULT_BACKOFF)
@queue.processor('send_email')
def send_email(to, subject, body):
print(f"Sending email {subject} to {to}")
默认策略是指数回退。我们建议设置回退策略和死信队列以限制最大执行尝试次数。
或者,你可以将回退设置为 IMMEDIATE_RETURN 以立即重新执行失败的任务。
queue = sqs.queue("emails", backoff_policy=IMMEDIATE_RETURN)
@queue.processor('send_email')
def send_email(to, subject, body):
print(f"Sending email {subject} to {to}")
关闭策略
在用 process_queue() 启动队列处理器时,可以可选地设置何时停止。
支持以下关闭策略
-
IdleShutdown(idle_seconds):在特定时间段内没有新任务发送时从函数返回。
-
MaxTasksShutdown(max_tasks):在处理至少 max_task 个作业后从函数返回。这有助于防止内存泄漏。
默认策略是 NeverShutdown。还可以使用 OrShutdown 或 AndShutdown 策略结合两个先前策略,或为特定行为创建自定义类。
在 5 分钟不活动后停止处理队列的示例
from sqs_workers import SQSEnv
from sqs_workers.shutdown_policies import IdleShutdown
sqs = SQSEnv()
sqs.queue("foo").process_queue(shutdown_policy=IdleShutdown(idle_seconds=300))
通过将失败消息推回处理死信队列
处理死信队列最常见的方法是修复导致消息最初出现在那里的主要错误,然后重新处理这些消息。
使用 sqs-workers,可以通过将死信队列中的所有消息放回主队列来完成。在处理队列时,处理器会取每个消息,并将其以硬编码的 1 秒延迟推回到上游队列。
使用示例
>>> from sqs_workers import JobQueue
>>> from sqs_workers.shutdown_policies IdleShutdown
>>> from sqs_workers.deadletter_queue import DeadLetterQueue
>>> env = SQSEnv()
>>> foo = env.queue("foo")
>>> foo_dead = env.queue("foo_dead", DeadLetterQueue.maker(foo))
>>> foo_dead.process_queue(shutdown_policy=IdleShutdown(10))
此代码将 foo_dead 队列中的所有消息推回 foo 队列。然后它等待 10 秒以确保没有新消息出现,然后退出。
通过在原地执行任务处理死信队列
除了将任务推回主队列外,你还可以在原地执行它们。为此,你需要将主队列中的队列处理器复制到 deadletter。
使用示例
>>> env = SQSEnv()
>>> foo = env.queue("foo")
>>> foo_dead = env.queue("foo_dead")
>>> foo.copy_processors(foo_dead)
>>>
>>> from sqs_workers.shutdown_policies IdleShutdown
>>> foo_dead.process_queue(shutdown_policy=IdleShutdown(10))
此代码将 foo_dead 队列中的所有消息使用 "foo" 队列的处理器执行。然后它等待 10 秒以确保没有新消息出现,然后退出。
编解码器
在添加到 SQS 之前,使用 Codec
对任务参数进行编码。目前有 3 个编解码器可用
pickle
:使用默认协议版本使用 Pickle 进行序列化;pickle_compat
:使用 Pickle 进行序列化,使用协议版本 2,这与 Python 2 兼容;json
:使用 JSON 进行序列化。
默认情况下,使用 pickle_compat
,以与 sqs-workers 的先前版本保持向后兼容。
如果使用不受信任的数据(请参阅有关安全的说明 pickle 文档),或为了与其他系统兼容,建议使用 JSON。在其他所有情况下,应使用 pickle
而不是 pickle_compat
,因为它更紧凑、更高效。
>>> env = SQSEnv(codec="pickle")
使用 MemorySession 在单元测试中使用
存在一个特殊的MemorySession,它可以作为单元测试中真实队列的快速且简单的替代品。如果您有一个名为create_task
的函数,它向队列中添加一些任务,并且您想测试其功能,从技术上讲,您可以编写如下测试
from sqs_workers import SQSEnv
env = SQSEnv()
def test_task_creation_side_effects():
create_task()
env.process_batch('foo')
...
问题是您的测试开始依赖于AWS(或localstack)基础设施,这并不是您总是需要的。您可以选择将MemorySession传递给您的SQSEnv实例。
from sqs_workers import SQSEnv, MemorySession
env = SQSEnv(MemorySession())
请注意,MemorySession有一些严重的限制,可能不适合您的用例。具体来说,当您使用MemorySession时
- 重发策略不起作用
- 标准队列和FIFO队列之间没有区别
- FIFO队列不支持基于内容的去重
- 延迟任务执行效率低下:任务从队列中取出,如果时间还没到,任务会被放回。
- API可能返回略有不同的结果
贡献
请参阅我们的指南这里
本地开发
我们使用Poetry进行依赖管理和打包。请参阅此处的安装说明。
安装Poetry后,您可以在虚拟环境中运行以下命令来安装依赖项
poetry install
测试
我们使用pytest运行unittests,并使用tox在所有支持的Python版本上运行它们。
如果您只运行pytest
或tox
,所有测试都将针对AWS、localstack和MemorySession运行。您可以使用pytest的-k
标志禁用您不希望使用的测试,例如使用-k localstack
或-k 'not aws'
。
使用AWS进行测试
确保您已配置boto3客户端(参考),然后运行
poetry run pytest -k aws
或者,要测试所有支持的版本,运行
poetry run tox -- -k aws
使用localstack进行测试
与AWS相比,localstack测试应该运行得更快,而且它们在离线时也能很好地工作。
运行ElasticMQ并确保SQS端点通过地址localhost:4566可用
docker run -p 4566:9324 --rm -it softwaremill/elasticmq-native
然后运行
poetry run pytest -k localstack
或
poetry run tox -- -k localstack
使用MemorySession进行测试
MemorySession应该更快,但具有上面文档中记录的所有限制。但仍然可以用来测试一些逻辑更改。
只需运行
poetry run pytest -k memory
或
poetry run tox -- -k memory
发布新版本
- 在
pyproject.toml
中提升版本 - 更新CHANGELOG
- 使用提交消息“版本X.X.X”提交更改
- 用
vX.X.X
标记当前提交 - 在GitHub上创建名为
vX.X.X
的新版本 - GitHub Actions将为您将新版本发布到PIP
项目详细信息
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建分发
sqs_workers-0.5.19.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f705c011285ae2d0e3b99262a982d89137abc6afbbb503488297a6c085c82304 |
|
MD5 | 4cd6916616557940d535ae72fc491421 |
|
BLAKE2b-256 | 2c14c0ca08235e290f5047c0fbb9fea671869b77d51059c8b76fd731cef1b975 |
sqs_workers-0.5.19-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e2ae1fed4f55967a5d2ccabfc27109d9b786a57a5286407a6d127fbff281a4d4 |
|
MD5 | 62f429ccd3e22fcf9b4669ec62cb37e3 |
|
BLAKE2b-256 | c6a5b11d99898b3121ffb93758d287863ce44c9a32da4bdbfaed82d1e9266923 |