Django应用程序,用于在AWS SQS队列上调度和运行函数。
项目描述
beanstalk-dispatch
beanstalk-dispatch
是一个Django应用程序,它运行已经安排在 AWS SQS 队列上运行的函数,并在监听该队列的 Elastic Beanstalk 工作机上执行它们。
此应用程序最初由 @marcua 为 @b12io 的开源应用程序 orchestra 编写。
该库支持Django 3到Django 4,Python版本从3.6到3.10。如果您想看到一个功能或找到一个错误,请通过打开一个 问题 或 拉取请求 来通知我。
5分钟内开始使用
要安装
pip install beanstalk-dispatch
- 为具有以下两个参数的应用程序创建一个Elastic Beanstalk环境:在
settings.py
BEANSTALK_DISPATCH_SQS_KEY = 'your AWS key for accessing SQS'
BEANSTALK_DISPATCH_SQS_SECRET = 'your AWS secret for accessing SQS'
- 将
beanstalk_dispatch
添加到 settings.py 的INSTALLED_APPS
INSTALLED_APPS = (
# ...other installed applications...
'beanstalk_dispatch',
)
-
将
url(r'^beanstalk_dispatch/', include('beanstalk_dispatch.urls')),
添加到您的主urls.py
-
将
/beanstalk_dispatch/dispatcher
添加为 HTTP 端点或 AWS 控制台中您的 beanstalk 工作人员配置。 -
添加一个调度表。调度器通过创建一个 HTTP 端点,本地 SQS/Beanstalk 守护进程将请求 POST 到该端点。该端点会查询一个
BEANSTALK_DISPATCH_TABLE
,它将函数名映射到要运行的函数。以下是一个示例
if os.environ.get('BEANSTALK_WORKER') == 'True':
BEANSTALK_DISPATCH_TABLE = {
'a_function_to_dispatch': ('some_package.beanstalk_tasks.'
'the_name_of_the_function_in_the_module')
}
第一行是一个检查,确保此类机器应该是 beanstalk 工作人员。我们仅在工作人员机器的环境配置中设置 BEANSTALK_WORKER
环境变量为 'True'
。这避免了其他环境(例如,我们的 Web 服务器)作为运行任意代码的开放代理。
第二行是调度表。它将路径映射到要执行的函数。
安排函数运行
beanstalk_dispatch.client.schedule_function
在给定的 SQS 队列上安排函数运行。您传递给它的函数名必须是 BEANSTALK_DISPATCH_TABLE
中的一个键,您传递给它的 queue_name
必须是已配置 beanstalk 工作人员的队列。
from beanstalk_dispatch.client import schedule_function
schedule_function('a-queue', 'a_function_to_dispatch',
'1', '2', kwarg1=1, kwarg2=2)
SafeTasks
默认情况下,beanstalk_dispatch
运行的每个函数都被 SafeTask
类包装,该类在函数上设置 @timeout
装饰器并捕获任何异常进行日志记录。如果您想自定义 SafeTask
的行为,创建一个子类并在 BEANSTALK_DISPATCH_TABLE
中引用此对象。
以下参数/函数可以在 SafeTask
上进行配置
timeout_timedelta
:任务可以运行的最大秒数,默认为 2
分钟。 verbose
:指定是否记录失败的布尔值,默认为 False
。 run
:要填充任务工作的抽象方法。 on_error
:如果任务因任何原因失败而运行的函数。 on_success
:任务成功完成后运行的函数。 on_completion
:在每次任务(在 on_error
或 on_success
之后)后运行的函数。
例如
# beanstalk_tasks.py
from datetime import timedelta
from beanstalk_dispatch.client import schedule_function
from beanstalk_dispatch.safe_task import SafeTask
class MySafeTask(SafeTask):
timeout_timedelta = timedelta(seconds=1000)
verbose = True
def run(self, *args, **kwargs):
# Run the task
print('Running task')
def on_error(self, e, *args, **kwargs):
print('There was an error {}'.format(e))
def on_success(self, *args, **kwargs):
print('Success!')
def on_completion(self, *args, **kwargs):
print('Task completed')
schedule_function('a-queue', 'mysafetask',
'1', '2', kwarg1=1, kwarg2=2)
# settings.py
if os.environ.get('BEANSTALK_WORKER') == 'True':
BEANSTALK_DISPATCH_TABLE = {
'mysafetask': 'beanstalk_tasks.MySafeTask'
}
项目详情
beanstalk-dispatch-0.1.1.tar.gz 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 1b600e2f44dc471c67991d2de25e42b34e372ba7fd9bdd422127d3c0b7fb6e32 |
|
MD5 | 6bbbd51c1caf76be3aab9cace39d0965 |
|
BLAKE2b-256 | 1b8e15c935bdbc78fa65ddc2eb50c9359faec01277b4192d5eaab0629239b4f5 |
哈希值 for beanstalk_dispatch-0.1.1-py2.py3-none-any.whl
算法 | 散列摘要 | |
---|---|---|
SHA256 | 3a45208c6e226b57407389ade2983e3cbccfcc8eff5949db3f8b484464ebc13d |
|
MD5 | 77585594caf51ee1a21d51d7f5b1c622 |
|
BLAKE2b-256 | d87b46015db4560ac6e0ba7202b037e3aed83a5cb51c9642f9237c92813ff4de |