跳转到主要内容

Python任务队列

项目描述

https://github.com/closeio/tasktiger/actions/workflows/test.yaml/badge.svg?event=push

TaskTiger 是一个使用Redis的Python任务队列。

(对类似项目感兴趣吗? Close 正在寻找 优秀的工程师 加入我们的团队)

功能

  • 每个任务的fork或同步工作进程

    默认情况下,TaskTiger为每个任务启动一个子进程,这带来了一些好处:任务造成的内存泄漏被避免,因为子进程在任务完成后会被终止。可以为每个任务设置一个硬时间限制,如果任务未在此时间之前完成,则会被杀死。为了确保性能,可以在父进程中预先加载任何必要的Python模块。

    TaskTiger 还支持同步工作者,这由于没有分叉开销而提高了性能,并且任务具有重用网络连接的能力。为了防止内存泄漏累积,可以将工作者设置在一段时间后关闭,此时管理器可以重新启动它们。工作者在超时时也会自动退出,以防止进程状态不一致。

  • 独特队列

    TaskTiger 选项可以避免任务队列中的重复任务。在某些情况下,将多个类似任务组合起来是可取的。例如,想象一个索引对象的任务(例如,使它们可搜索)。如果对象已经存在于任务队列中且尚未处理,则唯一队列将确保索引任务不必做重复工作。然而,如果任务在排队时已经在运行,则任务将再次执行以确保索引任务总是获取最新的状态。

  • 任务锁

    TaskTiger 可以通过获取锁来确保不会执行具有相似参数的任务的多个实例。如果任务遇到锁,它将被重新入队,并在可配置的间隔后安排稍后执行。

  • 任务重试

    TaskTiger 允许重试异常(所有异常或特定列表中的异常),并提供了可配置的重试间隔(固定、线性、指数、自定义)。

  • 灵活队列

    任务可以轻松地分别排队。工作者从随机选择的队列中选取任务,并且可以配置为只处理特定的队列,确保所有队列都得到平等的处理。TaskTiger 还支持子队列,它们由点号分隔。例如,您可以拥有形式为 process_emails.CUSTOMER_ID 的按客户队列,并启动一个工作者来处理 process_emails 及其任何子队列。由于任务是随机选择的,因此所有客户都得到平等对待:如果一个客户正在排队许多任务,它不会阻止其他客户的任务被处理。还可以强制执行最大队列大小。

  • 批量队列

    批量队列可以用来将多个排队的任务合并为一个。这样,您的任务函数可以同时处理多组参数,从而提高性能。批量大小是可配置的。

  • 计划性和周期性任务

    可以计划任务在特定时间执行。任务也可以定期执行(例如,每五秒执行一次)。

  • 结构化日志记录

    TaskTiger 支持通过 structlog 以 JSON 风格进行日志记录,这使得工具分析日志更加灵活。例如,您可以使用 TaskTiger 与 Logstash、Elasticsearch 和 Kibana 一起使用。

    structlog 处理器 tasktiger.logging.tasktiger_processor 可以用于将当前任务 ID 注入到所有日志消息中。

  • 可靠性

    TaskTiger 以原子方式在队列状态之间移动任务,如果工作者崩溃,将在超时后重新执行任务。

  • 错误处理

    如果在任务执行期间发生异常并且任务没有设置为重试,TaskTiger 将将执行跟踪记录存储在错误队列中。然后可以手动重试或删除任务。TaskTiger 可以轻松集成到错误报告服务,如 Rollbar。

  • 管理界面

    存在一个简单的使用 flask-admin 的管理界面,作为一个独立的项目(tasktiger-admin)。

快速开始

使用 TaskTiger 很容易入门。

创建一个包含任务(的)的文件。

# tasks.py
def my_task():
    print('Hello')

使用 delay 方法排队任务。

In [1]: import tasktiger, tasks
In [2]: tiger = tasktiger.TaskTiger()
In [3]: tiger.delay(tasks.my_task)

运行一个工作者(确保可以找到任务代码,例如使用 PYTHONPATH)。

% PYTHONPATH=. tasktiger
{"timestamp": "2015-08-27T21:00:09.135344Z", "queues": null, "pid": 69840, "event": "ready", "level": "info"}
{"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.727051Z", "pid": 69840, "queue": "default", "child_pid": 70171, "event": "processing"}
Hello
{"task_id": "6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79", "level": "debug", "timestamp": "2015-08-27T21:03:56.732457Z", "pid": 69840, "queue": "default", "event": "done"}

配置

TaskTiger 对象跟踪 TaskTiger 的设置,并用于装饰和排队任务。构造函数接受以下参数

  • 连接

    Redis连接对象。连接应该使用 decode_responses=True 初始化,以避免在Python 3中发生编码问题。

  • 配置

    包含配置选项的字典。大多数配置选项不需要更改,完整的列表可以在 TaskTiger__init__ 方法中查看。

    以下是一些常用选项

    • ALWAYS_EAGER

      如果设置为 True,则所有任务(除了未来任务,即 when 是未来时间)都将在本地上阻塞执行,直到任务返回。这对于测试目的很有用。

    • BATCH_QUEUES

      设置批处理队列,即同时从队列中取出多个作业并作为列表传递给工作方法。它接受一个字典,其中键代表队列名称,值代表批处理大小。请注意,任务需要声明为 batch=True。此外,请注意,任何子队列都将自动被视为批处理队列,并且最具体的子队列名称的批处理值具有优先权。

    • ONLY_QUEUES

      如果设置为非空队列名称列表,则工作器仅处理给定的队列(及其子队列),除非命令行中明确传递队列。

  • setup_structlog

    如果设置为 True,则在初始化 TaskTiger 时设置结构化日志记录,使用 structlog。这使得编写自定义工作脚本更容易,因为不需要用户事先设置 structlog

示例

import tasktiger
from redis import Redis
conn = Redis(db=1, decode_responses=True)
tiger = tasktiger.TaskTiger(connection=conn, config={
    'BATCH_QUEUES': {
        # Batch up to 50 tasks that are queued in the my_batch_queue or any
        # of its subqueues, except for the send_email subqueue which only
        # processes up to 10 tasks at a time.
        'my_batch_queue': 50,
        'my_batch_queue.send_email': 10,
    },
})

任务装饰器

TaskTiger 提供了一个任务装饰器来指定任务选项。请注意,简单的任务不需要装饰。但是,装饰任务允许您使用与 Celery 兼容的替代语法来排队任务。

# tasks.py

import tasktiger
tiger = tasktiger.TaskTiger()

@tiger.task()
def my_task(name, n=None):
    print('Hello', name)
In [1]: import tasks
# The following are equivalent. However, the second syntax can only be used
# if the task is decorated.
In [2]: tasks.tiger.delay(my_task, args=('John',), kwargs={'n': 1})
In [3]: tasks.my_task.delay('John', n=1)

任务选项

任务支持多种选项,可以在任务装饰器中指定,也可以在排队任务时指定。对于后者,必须在 TaskTiger 对象上调用 delay 方法,并且任务装饰器中的任何选项都将被覆盖。

@tiger.task(queue='myqueue', unique=True)
def my_task():
    print('Hello')
# The task will be queued in "otherqueue", even though the task decorator
# says "myqueue".
tiger.delay(my_task, queue='otherqueue')

在排队任务时,任务需要在执行中的 Python 文件之外的定义。换句话说,任务不能在 __main__ 模块中。否则,TaskTiger 会返回错误。

以下选项都由 delay 和任务装饰器支持

  • 队列

    任务将被排队的队列名称。

  • hard_timeout

    如果任务运行时间超过指定的秒数,则将其终止并标记为失败。

  • unique

    布尔值,表示任务是否只有在没有相同函数、参数和关键字参数的类似任务时才会被排队。请注意,由于任务仍将被插入队列以处理另一个任务,因此即使存在类似的任务,也可能会同时执行多个类似任务。重新排队已安排的唯一任务不会更改它最初计划执行的原始时间。

  • unique_key

    如果设置,则意味着 unique=True,并指定用于构造唯一键的 kwargs 列表。默认情况下,所有参数和关键字参数都序列化和散列。

  • lock

    布尔值,表示在任务执行时(对于给定的参数和关键字参数)是否持有锁。如果带有相似参数/关键字参数的任务被排队并尝试获取锁,它将在稍后重试。

  • lock_key

    如果设置,则表示 lock=True 并指定用于构建锁键的kwargs列表。默认情况下,所有参数和kwargs都会被序列化并哈希。

  • 最大队列大小

    可以将此设置为整数值以强制执行最大队列大小。当达到此限制时,如果队列任务,将引发 QueueFullException 异常。处于 activescheduledqueued 状态的任务都会计入此限制。

  • when

    接受一个datetime(绝对日期)或timedelta(相对于现在)。如果提供,则任务将被安排在给定的时间。

  • retry

    布尔值,表示在任务失败时(无论是因为异常还是超时)是否重试任务。要限制失败列表,请使用 retry_on。除非提供 retry_method,否则使用配置的 DEFAULT_RETRY_METHOD

  • retry_on

    如果给出列表,则表示 retry=True。任务只有在给定的异常(或其子类)上才会重试。要使用 JobTimeoutException 在硬超时发生时重试任务。

  • retry_method

    如果提供,则表示 retry=True。传递以下之一:

    • 一个接受重试次数作为参数的函数,或者,

    • 一个元组 (f, args),其中 f 以重试次数作为第一个参数,后面跟着其他参数。

    函数需要返回所需的重试间隔(以秒为单位),或抛出 StopRetry 以停止重试。以下内置函数可以用于常见场景并返回适当的元组

    • fixed(delay, max_retries)

      返回一个方法,该方法返回给定的 delay(以秒为单位)或如果重试次数超过 max_retries,则抛出 StopRetry

    • linear(delay, increment, max_retries)

      fixed 相似,但以给定的 delay 开始,并在每次重试后增加给定的 increment

    • exponential(delay, factor, max_retries)

      fixed 相似,但以给定的 delay 开始,并在每次重试后乘以给定的 factor

    例如,要重试任务3次(总共执行4次),并在每次执行之间等待60秒,请传递 retry_method=fixed(60, 3)

  • runner_class

    如果提供,可以指定一个Python类以影响任务运行行为。运行类应该继承 tasktiger.runner.BaseRunner 并实现任务执行行为。默认实现可在 tasktiger.runner.DefaultRunner 中找到。可以实现以下行为

    • 在任务执行前后(在分叉的子进程中)执行特定代码,或自定义在单处理或批量处理中调用任务函数的方式。

      请注意,如果您想为所有任务执行特定代码,应使用 CHILD_CONTEXT_MANAGERS 配置选项。

    • 控制任务的硬超时行为。

    • 在任务永久失败后,在主工作进程中执行特定代码。

    这是一个高级功能,且在未来的 TaskTiger 版本中,运行类的接口和需求可能会更改。

以下选项只能在任务装饰器中指定

  • batch

    如果设置为 True,任务将接收包含 args 和 kwargs 的字典列表,并可以一次处理多个相同类型的任务。例如:[{"args": [1], "kwargs": {}}, {"args": [2], "kwargs": {}}] 注意,只有当工作进程为特定队列设置了 BATCH_QUEUES 时,列表中才会包含多个项目(请参阅 配置 部分)。

  • 计划

    如果提供了,会使任务定期执行。传递以下之一

    • 一个接受当前日期时间为参数的函数。

    • 一个元组 (f, args),其中 f 以当前日期时间作为第一个参数,后面跟着额外的参数。

    计划函数必须返回下一个任务执行的日期时间,或返回 None 以阻止周期性执行。该函数在初始化工作进程时用于确定初始任务执行日期,在任务即将执行时用于确定下一个执行日期。

    对于大多数常见场景,可以传递以下内置函数

    • periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None, end_date=None)

      使用从 start_date(默认为 2000-01-01T00:00Z,如果没有提供,则为周六)开始的相等间隔,直到 end_date(如果没有提供,则永远不变)。例如,要无限期地每五分钟运行一个任务,请使用 schedule=periodic(minutes=5)。要每周日 UTC 早上 4 点运行任务,可以使用 schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4))

    • cron_expr(expr, start_date=None, end_date=None)

      start_date,指定周期性任务的开始日期。如果没有提供,默认为 2000-01-01T00:00Z,周六。 end_date,指定周期性任务的结束日期。如果没有提供 end_date,任务将无限期重复。例如,要无限期每小时运行任务,请使用 schedule=cron_expr("0 * * * *")。要每周日 UTC 早上 4 点运行任务,可以使用 schedule=cron_expr("0 4 * * 0")

自定义重试

在某些情况下,任务重试选项可能不够灵活。例如,您可能希望根据异常类型使用不同的重试方法,或者您可能希望抑制在重试后失败的日志记录。在这些情况下,可以在任务函数中抛出 RetryException。支持以下选项

  • method

    指定此重试的自定义重试方法。如果没有提供,则使用任务的默认重试方法,或者如果未指定,则使用配置的 DEFAULT_RETRY_METHOD。请注意,传递给重试方法的重试次数始终是此方法已执行的总次数,而不管使用了哪种重试方法。

  • original_traceback

    如果从 except 块中抛出 RetryExceptionoriginal_traceback 为 True,则将记录原始跟踪信息(即捕获异常处的地方的堆栈跟踪)。默认为 False。

  • log_error

    如果设置为 False 且任务永久失败,则记录一个警告而不是错误,并在任务完成后将其从 Redis 中删除。默认为 True。

示例用法

from tasktiger.exceptions import RetryException
from tasktiger.retry import exponential, fixed

def my_task():
    if not ready():
        # Retry every minute up to 3 times if we're not ready. An error will
        # be logged if we're out of retries.
        raise RetryException(method=fixed(60, 3))

    try:
        some_code()
    except NetworkException:
        # Back off exponentially up to 5 times in case of a network failure.
        # Log the original traceback (as a warning) and don't log an error if
        # we still fail after 5 times.
        raise RetryException(method=exponential(60, 2, 5),
                             original_traceback=True,
                             log_error=False)

工作进程

在命令行中使用 tasktiger 命令来调用工作进程。若要调用多个工作进程,需要启动多个实例。这可以通过例如 Supervisor 很容易完成。以下 Supervisor 配置文件可以放置在 /etc/supervisor/tasktiger.ini 中,并以 ubuntu 用户运行 4 个 TaskTiger 工作进程。有关更多信息,请参阅 Supervisor 的文档。

[program:tasktiger]
command=/usr/local/bin/tasktiger
process_name=%(program_name)s_%(process_num)02d
numprocs=4
numprocs_start=0
priority=999
autostart=true
autorestart=true
startsecs=10
startretries=3
exitcodes=0,2
stopsignal=TERM
stopwaitsecs=600
killasgroup=false
user=ubuntu
redirect_stderr=false
stdout_logfile=/var/log/tasktiger.out.log
stdout_logfile_maxbytes=250MB
stdout_logfile_backups=10
stderr_logfile=/var/log/tasktiger.err.log
stderr_logfile_maxbytes=250MB
stderr_logfile_backups=10

工作进程支持以下选项

  • -q--queues

    如果指定,则仅处理给定的队列。多个队列可以用逗号分隔。也将处理给定队列的任何子队列。例如,-q first,second 将处理来自 firstsecond 以及如 first.CUSTOMER1first.CUSTOMER2 这样的子队列。

  • -e--exclude-queues

    如果指定,则从处理中排除给定的队列。多个队列可以用逗号分隔。除非使用 -q 选项指定更具体的队列,否则给定队列的任何子队列也将被排除。例如,-q email,email.incoming.CUSTOMER1 -e email.incoming 将处理来自 email 队列及其子队列如 email.outgoing.CUSTOMER1email.incoming.CUSTOMER1 的项目,但不处理 email.incomingemail.incoming.CUSTOMER2

  • -m--module

    启动工作进程时导入的模块。这提高了任务性能,因为模块不需要在每次任务分叉时重新导入。多个模块可以用逗号分隔。

    预载模块的另一种方式是设置自定义 TaskTiger 启动脚本,如下所述。

  • -h--host

    Redis 服务器主机名(如果不同于 localhost)。

  • -p--port

    Redis 服务器端口(如果不同于 6379)。

  • -a--password

    Redis 服务器密码(如果需要)。

  • -n--db

    Redis 服务器数据库编号(如果不同于 0)。

  • -M--max-workers-per-queue

    允许处理队列的最大工作进程数。

  • –store-tracebacks/–no-store-tracebacks

    存储与执行历史记录相关的跟踪信息(默认值为 True)。

  • –executor

    可以是 fork(默认)或 sync。决定是否通过分叉在单独的进程或同步地在同一进程中执行任务。参见“功能”部分了解两种方法的好处。

  • –exit-after

    在经过指定分钟数后退出工作进程。这主要用于与同步执行器一起使用,以防止内存泄漏累积。

在某些情况下,拥有自定义 TaskTiger 启动脚本是很方便的。例如,您的应用程序可能有一个 manage.py 命令来设置环境,您可能希望使用该脚本启动 TaskTiger 工作进程。为此,您可以使用 run_worker_with_args 方法,该方法启动 TaskTiger 工作进程并解析任何命令行参数。以下是一个示例

import sys
from tasktiger import TaskTiger

try:
    command = sys.argv[1]
except IndexError:
    command = None

if command == 'tasktiger':
    tiger = TaskTiger(setup_structlog=True)
    # Strip the "tasktiger" arg when running via manage, so we can run e.g.
    # ./manage.py tasktiger --help
    tiger.run_worker_with_args(sys.argv[2:])
    sys.exit(0)

检查、重新入队和删除任务

TaskTiger 提供了访问 Task 类的权限,该类允许您检查队列并执行对任务的各项操作。

每个队列可以包含以下状态的任务

  • queued:已入队且等待工作者取走的任务。

  • active:当前由工作者正在处理的任务。

  • scheduled:计划稍后执行的任务。

  • error:执行失败的任务。

要获取给定队列和状态的所有任务的列表,请使用 Task.tasks_from_queue。该方法返回一个包含队列中任务总数的元组(如果任务被截断则很有用)以及按最新顺序排列的队列中任务列表。使用 skiplimit 关键字参数,您可以获取队列的任意切片。如果您知道任务 ID,可以使用 Task.from_id 获取指定的任务。这两种方法都允许您使用 load_executions 关键字参数加载失败的任务执行记录,该参数接受一个整数,表示应加载多少次执行。

您也可以使用常规构造函数构建和排队任务,该构造函数接受 TaskTiger 实例、函数名和 Task 选项 部分中描述的选项。然后可以使用其 delay 方法将任务排队。请注意,如果适用,需要将 when 参数传递给 delay 方法。可以使用相同的参数重建唯一任务。

Task 对象具有以下属性

  • id:任务 ID。

  • data:来自 Redis 的原始数据作为字典。

  • executions:失败的任务执行列表(作为字典)。执行字典包含处理时间在 time_startedtime_failed,工作者主机在 host,异常名称在 exception_name,以及完整的回溯在 traceback

  • serialized_funcargskwargs:包含所有参数的序列化函数名。

  • func:导入(可执行)的函数

Task 对象具有以下方法

  • cancel:取消计划的任务。

  • delay:将任务排队以执行。

  • delete:从错误队列中删除任务。

  • execute:不排队直接运行任务。

  • n_executions:查询并返回过去任务执行的数量。

  • retry:将任务从错误队列中重新排队以执行。

  • update_scheduled_time:更新计划任务的时间为给定的时间。

在任务执行过程中,可以在任务函数中访问当前任务:对于非批量任务,TaskTiger 实例的 current_task 属性返回当前 Task 实例。对于批量任务,必须使用 current_tasks 属性,该属性返回当前正在处理的任务列表(与传递给任务的顺序相同)。

示例 1:排队一个唯一任务并在没有原始任务引用的情况下取消它。

from tasktiger import TaskTiger, Task

tiger = TaskTiger()

# Send an email in five minutes.
task = Task(tiger, send_mail, args=['email_id'], unique=True)
task.delay(when=datetime.timedelta(minutes=5))

# Unique tasks get back a task instance referring to the same task by simply
# creating the same task again.
task = Task(tiger, send_mail, args=['email_id'], unique=True)
task.cancel()

示例 2:通过 ID 检查队列并重试任务。

from tasktiger import TaskTiger, Task

QUEUE_NAME = 'default'
TASK_STATE = 'error'
TASK_ID = '6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79'

tiger = TaskTiger()

n_total, tasks = Task.tasks_from_queue(tiger, QUEUE_NAME, TASK_STATE)

for task in tasks:
    print(task.id, task.func)

task = Task.from_id(tiger, QUEUE_NAME, TASK_STATE, TASK_ID)
task.retry()

示例 3:访问批处理任务函数中的任务实例,以确定当前正在处理的任务之前执行了多少次。

from tasktiger import TaskTiger

tiger = TaskTiger()

@tiger.task(batch=True)
def my_task(args):
    for task in tiger.current_tasks:
        print(task.n_executions())

暂停队列处理

--max-workers-per-queue 选项使用队列锁来控制可以同时处理同一队列的工人数。使用此选项时,可以在队列上放置系统锁,这将在锁到期之前阻止工人处理该队列的任务。使用 TaskTiger 对象的 set_queue_system_lock() 方法来设置此锁。

Rollbar错误处理

TaskTiger 集成了 Rollbar 错误处理。当任务出错时,它可以将错误记录到 Rollbar 中,按队列、任务函数名称和异常类型分组。要启用日志记录,使用 tasktiger.rollbar 模块中提供的 StructlogRollbarHandler 初始化 rollbar。处理程序接受一个字符串作为参数,该参数用作所有报告给 Rollbar 的消息的前缀。以下是一个自定义的工人启动脚本

import logging
import rollbar
import sys
from tasktiger import TaskTiger
from tasktiger.rollbar import StructlogRollbarHandler

tiger = TaskTiger(setup_structlog=True)

rollbar.init(ROLLBAR_API_KEY, APPLICATION_ENVIRONMENT,
             allow_logging_basic_config=False)
rollbar_handler = StructlogRollbarHandler('TaskTiger')
rollbar_handler.setLevel(logging.ERROR)
tiger.log.addHandler(rollbar_handler)

tiger.run_worker_with_args(sys.argv[1:])

清理错误任务

错误的任务偶尔需要从 Redis 中清除,因此 TaskTiger 提供了一个 purge_errored_tasks 方法来帮助。将其设置为周期性任务可能会有所帮助,如下所示

from tasktiger import TaskTiger, periodic

tiger = TaskTiger()

@tiger.task(schedule=periodic(hours=1))
def purge_errored_tasks():
    tiger.purge_errored_tasks(
        limit=1000,
        last_execution_before=(
            datetime.datetime.utcnow() - datetime.timedelta(weeks=12)
        )
    )

运行测试套件

可以使用提供的 docker compose 文件在本地运行测试。安装 docker 后,可以使用以下命令运行测试

docker-compose run --rm tasktiger pytest

可以使用正常的 pytest 标志更详细地运行测试。例如

docker-compose run --rm tasktiger pytest tests/test_base.py::TestCase

发布新版本

  1. 确保代码已经在现实的生产环境中经过彻底的审查和测试。

  2. 更新 setup.pyCHANGELOG.md。确保您包括任何重大更改。

  3. 运行 python setup.py sdisttwine upload dist/<PACKAGE_TO_UPLOAD>

  4. 推送一个指向已发布提交的新标签,格式为:例如 v0.13

  5. 在 GitHub 的 UI 中将该标签标记为发布,并在描述中包含该版本的更改日志条目。例如:https://github.com/closeio/tasktiger/releases/tag/v0.13

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

tasktiger-0.21.0.tar.gz (92.0 kB 查看散列)

上传时间

由以下支持