跳转到主要内容

Django应用程序,用于在AWS SQS队列上调度和运行函数。

项目描述

beanstalk-dispatch

PyPI version PyPI Github Actions

beanstalk-dispatch 是一个Django应用程序,它运行已经安排在 AWS SQS 队列上运行的函数,并在监听该队列的 Elastic Beanstalk 工作机上执行它们。

此应用程序最初由 @marcua@b12io 的开源应用程序 orchestra 编写。

该库支持Django 3到Django 4,Python版本从3.6到3.10。如果您想看到一个功能或找到一个错误,请通过打开一个 问题拉取请求 来通知我。

5分钟内开始使用

要安装

pip install beanstalk-dispatch
  • 为具有以下两个参数的应用程序创建一个Elastic Beanstalk环境:在 settings.py
     BEANSTALK_DISPATCH_SQS_KEY = 'your AWS key for accessing SQS'
     BEANSTALK_DISPATCH_SQS_SECRET = 'your AWS secret for accessing SQS'
  • beanstalk_dispatch 添加到 settings.py 的 INSTALLED_APPS
INSTALLED_APPS = (
    # ...other installed applications...
    'beanstalk_dispatch',
)
  • url(r'^beanstalk_dispatch/', include('beanstalk_dispatch.urls')), 添加到您的主 urls.py

  • /beanstalk_dispatch/dispatcher 添加为 HTTP 端点或 AWS 控制台中您的 beanstalk 工作人员配置。

  • 添加一个调度表。调度器通过创建一个 HTTP 端点,本地 SQS/Beanstalk 守护进程将请求 POST 到该端点。该端点会查询一个 BEANSTALK_DISPATCH_TABLE,它将函数名映射到要运行的函数。以下是一个示例

      if os.environ.get('BEANSTALK_WORKER') == 'True':
        BEANSTALK_DISPATCH_TABLE = {
            'a_function_to_dispatch': ('some_package.beanstalk_tasks.'
                                      'the_name_of_the_function_in_the_module')
        }

第一行是一个检查,确保此类机器应该是 beanstalk 工作人员。我们仅在工作人员机器的环境配置中设置 BEANSTALK_WORKER 环境变量为 'True'。这避免了其他环境(例如,我们的 Web 服务器)作为运行任意代码的开放代理。

第二行是调度表。它将路径映射到要执行的函数。

安排函数运行

beanstalk_dispatch.client.schedule_function 在给定的 SQS 队列上安排函数运行。您传递给它的函数名必须是 BEANSTALK_DISPATCH_TABLE 中的一个键,您传递给它的 queue_name 必须是已配置 beanstalk 工作人员的队列。

from beanstalk_dispatch.client import schedule_function

schedule_function('a-queue', 'a_function_to_dispatch',
    '1', '2', kwarg1=1, kwarg2=2)

SafeTasks

默认情况下,beanstalk_dispatch 运行的每个函数都被 SafeTask 类包装,该类在函数上设置 @timeout 装饰器并捕获任何异常进行日志记录。如果您想自定义 SafeTask 的行为,创建一个子类并在 BEANSTALK_DISPATCH_TABLE 中引用此对象。

以下参数/函数可以在 SafeTask 上进行配置

timeout_timedelta:任务可以运行的最大秒数,默认为 2 分钟。 verbose:指定是否记录失败的布尔值,默认为 Falserun:要填充任务工作的抽象方法。 on_error:如果任务因任何原因失败而运行的函数。 on_success:任务成功完成后运行的函数。 on_completion:在每次任务(在 on_erroron_success 之后)后运行的函数。

例如

# beanstalk_tasks.py
from datetime import timedelta

from beanstalk_dispatch.client import schedule_function
from beanstalk_dispatch.safe_task import SafeTask

class MySafeTask(SafeTask):

    timeout_timedelta = timedelta(seconds=1000)
    verbose = True

    def run(self, *args, **kwargs):
        # Run the task
        print('Running task')

    def on_error(self, e, *args, **kwargs):
        print('There was an error {}'.format(e))

    def on_success(self, *args, **kwargs):
        print('Success!')

    def on_completion(self, *args, **kwargs):
        print('Task completed')

schedule_function('a-queue', 'mysafetask',
    '1', '2', kwarg1=1, kwarg2=2)
# settings.py
  if os.environ.get('BEANSTALK_WORKER') == 'True':
    BEANSTALK_DISPATCH_TABLE = {
        'mysafetask': 'beanstalk_tasks.MySafeTask'
    }

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分发

beanstalk-dispatch-0.1.1.tar.gz (14.1 kB 查看散列)

上传

构建分发

beanstalk_dispatch-0.1.1-py2.py3-none-any.whl (15.0 kB 查看散列)

上传 Python 2 Python 3

由以下机构支持