跳转到主要内容

TurboGears2的异步作业工作器

项目描述

关于AsyncJob

https://drone.io/bitbucket.org/axant/tgext.asyncjob/status.png

AsyncJob是一个用于处理后台/同步作业的TurboGears2扩展。允许在系统在后台执行更多工作的同时快速返回响应给用户,对于视频转码、缩略图生成或其他用户在得到答案之前不能期望所需时间的任务非常有用。

要在后台执行任务,只需执行以下操作

from tgext.asyncjob import asyncjob_perform
asyncjob_perform(callable, arg1, arg2, kwarg=value)

安装

tgext.asyncjob可以从PyPI或bitbucket安装

easy_install tgext.asyncjob

应该对大多数用户适用

启用AsyncJob

在您的应用程序 config/app_cfg.py 中添加以下行

import tgext.asyncjob
tgext.asyncjob.plugme(base_config)

或者如果tgext.pluggable可用,使用可插拔应用程序接口

from tgext.pluggable import plug
plug(base_config, 'tgext.asyncjob')

您可以将 Globals 对象本身传递给 plug 函数

plug(base_config, 'tgext.asyncjob', app_globals=app_globals)

它将用于存储任务队列,否则asyncjob将自动检测调用堆栈帧中的 Globals 对象,从调用它的对象内部获取对象。

执行后台任务

要执行后台任务,您可以简单地使用 tgext.asyncjob.asyncjob_perform。它可以从任何存在有效请求的上下文中调用,将在后台执行作为第一个参数传递的可调用对象,并使用提供的参数。

from tgext.asyncjob import asyncjob_perform

def background_task(number):
    print number*2

asyncjob_perform(background_task, 5)

跟踪任务进度

asyncjob 跟踪任务状态,并允许更新以实现进度条或其他用户报告长时间运行操作的功能。每次调用 asyncjob_perform 时,它将为刚刚安排的操作返回一个唯一的 ID,可以使用此 ID 在任何时候检索任务状态。

您可以从后台任务内部随时更新进度状态,通过调用 asyncjob_set_progress(value, data)。期望 value 参数是一个整数值,而第二个可选的 data 参数可以是您稍后希望获取的任何内容。

要检索进度状态,您可以通过调用 asyncjob_get_progress 并传入您想要获取状态的任务的 ID。返回值是一个包含两个元素的元组,第一个元素是数字值,第二个元素是您传递给 asyncjob_set_progress 的数据。如果任务已完成,则返回 None,如果尚未开始,则返回 (-1, None)

进度跟踪示例

from tgext.asyncjob import asyncjob_perform, asyncjob_get_progress, asyncjob_set_progress

@expose()
def controller_method(self):
    def async_action():
        for i in range(5):
            asyncjob_set_progress(i)
            time.sleep(1)

    taskid = asyncjob_perform(async_action)
    return redirect(url('/state', uid=taskid))

@expose()
def state(self, uid):
    state = asyncjob_get_progress(uid)
    if not state:
        return 'Job Completed'
    elif state[0] < 0:
        return 'Not yet started'
    else:
        return str(state[0])

多进程中的进度跟踪

当使用多个进程来提供服务端应用程序时,默认的任务跟踪系统无法在不同的进程之间跟踪进程状态。

为了解决这个问题,您必须依赖于一个替代的进度跟踪器,该跟踪器将状态存储在共享存储中。默认情况下,提供了 tgext.asyncjob.tracker.redisdb.RedisProgressTracker,它将状态存储在 Redis 数据库中。

使用它就像这样

from tgext.asyncjob import start_async_worker
tgext.asyncjob.tracker.redisdb import RedisProgressTracker

class Globals(object):
    def __init__(self):
        start_async_worker(progress_tacker=RedisProgressTracker(host='localhost'))

RedisProgressTracker 类接受 hostportdbtimeout 参数,这些参数设置了数据库连接选项和进度跟踪键的超时(以避免留下漂流的键)。默认情况下,使用 db 15 和超时为 1 小时

自定义进度跟踪器

编写自定义进度跟踪器就像提供一个具有以下方法的类一样简单。

class MyProgressTracker(object):
    def track(self, entryid): pass
    def remove(self, entryid): pass
    def set_progress(self, entryid, value, message): pass
    def get_progress(self, entryid): pass

您的自定义进度跟踪器可以传递给 start_async_worker 作为参数,以将状态存储在 SQL 数据库、MongoDB 或其他系统中。

请参考 tgext.asyncjob.tracker.redisdb.RedisProgressTrackertgext.asyncjob.tracker.memory.MemoryProgressTracker 的实现示例。

访问数据库

默认情况下,asyncjob 会自动管理 SQLAlchemy 会话和事务。每个后台任务都封装在一个事务中,如果发生任何异常,则该事务会被回滚。

AsyncJob 使用其自己的 SQLAlchemy 会话,因此永远不要传递已经绑定到另一个会话的对象。重新查询它们。

开发者可能需要记住的唯一问题是,当他们正在寻找在开始后台任务之前刚刚创建的对象时,它们可能还不可用。为了避免这个问题,asyncjob 提供了 asyncjob_timed_query,它将执行查询以查找结果,直到结果本身出现或达到超时(默认为 60 秒)。

这可以用来检索在开始后台任务之前创建的对象,等待它们出现在数据库中。

from tgext.asyncjob import asyncjob_perform, asyncjob_timed_query

@expose()
def controller_method(self):
    def async_query_action(group_id):
        group = asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id)).first()
        group.display_name = 'Prova'

    g = Group(group_name='test_group')
    DBSession.add(g)
    DBSession.flush()
    asyncjob_perform(async_query_action, g.group_id)
    return 'OK'

要更改超时,您只需向 asyncjob_timed_query 传递不同的 retriesinterval 参数即可。

asyncjob_timed_query(DBSession.query(Group).filter_by(group_id=group_id),
                     retries=10, interval=6).first()

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分发

tgext.asyncjob-0.3.1.tar.gz (9.5 kB 查看哈希值)

上传时间 源代码

支持