跳转到主要内容

线程任务重试模块

项目描述

functastic用于管理您希望重试直到满足成功条件或可能永远进行的任务。它可以单线程运行或在单独的线程中运行。可以配置任务启动时间、成功条件、重试尝试、重试间隔和时间间隔退避。需要注意的是,所有任务都在单个线程中依次运行,因此如果有一个运行时间较长的任务,可能会延迟后续任务的实际开始时间。考虑到这一点,functastic目前更适合快速任务。我希望将来以侧线程的形式启动任务,但目前不想要求使用任何特定的线程库。

functastic提供两个类:TaskHeapTaskTasks封装一个函数并将其附加到TaskHeap上,该TaskHeap提供了一个run()函数句柄,用于运行/调度/重试Tasks直到满足成功条件。 Task的默认成功条件是函数不抛出任何异常并返回任何非None值。

调用Task对象永远不会引发异常。手动调用任务或使用TaskHeap将仅记录异常,并可能使用它们来确定成功,但不会引发异常。这一规则的唯一例外是,如果你的自定义success_condition函数引发异常,因此请小心编写它们。Task在初始化时也会引发异常,如果你提供了一个不可调用的success_condition

安装

使用pip install functastic安装,或者克隆仓库并使用python setup.py install,或者如果你想玩代码,可以使用pip install -e

Task使用

基本任务是一个包装了函数的封装,它具有一些用于确定成功和何时运行函数的属性。请注意,与预定时间、延迟或超时有关的内容是“最佳尝试”。长时间运行的任务可能会干扰任务计划。任务的配置属性包括:

  • func,要运行的函数

  • args,传递给函数的参数列表

  • kwargs,传递给函数的关键字参数字典

  • attempts,重试次数(设置为0表示直到成功为止)

  • task_timeout,函数可以重试的秒数

  • delay,函数每次运行之间的时间(由回退修改)

  • backoff,延迟乘数,每次迭代以指数方式增加延迟。backoff = 1是标准间隔,backoff = 2将每次重试之间的时间加倍

  • start_time,函数首次运行的时间戳,例如time.time() + 30表示现在30秒后运行

  • success_condition,用于确定本次迭代任务是否成功的函数。默认为没有引发异常且返回值不为空。如果success_condition不可调用,则Task初始化期间将引发异常

以下是一些创建任务的示例。

from functastic import Task
import time
f = some_function
# this is the basic task, it will have retry set to True
# until it returns a non None value and doesn't raise
task = Task(f, args['a'])

# let's give it only 10 tries. after 10 tries, retry will be set to False
task = Task(f, args['a'], attempts=10)

# and slow it down a bit (wait 1 second between each attempt).
# the delay schedules the task again 1 second after the previous
# attempt. without delay, schedule for task's next run will be immediately
# TaskHeap manages the schedules if you want to use it
task = Task(f, args['a'], attempts=10, delay=1)

# and now let's make it backoff if at first it doesn't succeed
# this will be run at t=[0, 1, 2, 4, 8, 16, 32, 64, 128, 256] seconds
task = Task(f, args['a'], attempts=10, delay=1, backoff=2)

# another way to think of a task only having a certain number of attempts
# is to give it a timeout
# this function will be run approximately every 1 second for 60 seconds
# CAUTION! long running tasks may delay things, tasks with task_timeout
#          can only be guaranteed to run once
task = Task(f, args['a'], task_timeout=60, delay=1)

# want to schedule a task to start running 60 seconds from now?
# note that the task_timeout doesn't start counting until the first run
# so this function will start running in 60 seconds and retry every 1
# second for 30 seconds
task = Task(f, args['a'], start_time=time.time()+60, delay=1,
            task_timeout=30))

# define your own success condition for a task
task = Task(f, args['a'], delay=1,
            success_condition=lambda t: t.result == 'a')
# or change it later
task.success_condition = lambda t: t.result == 'b'
# you could also define a more involved function instead of lambdas
def success(task):
    if 'some key' in task.result:
        return True

task = Task(f, args['a'], delay=1, success_condition=success)

通过某种方式失败而“成功”的任务

有时将success_condition视为一个break条件是有意义的。假设有一个向Web服务发出请求的函数。如果你得到一个404,因为你正在轮询资源变得可用,那么重试是有意义的,但如果得到另一个错误,比如401 unauthorized,重试就没有意义了,所以如果我们得到非404异常,我们希望break(或“成功”)

import requests
from functastic import Task

def handle_resource(res):
    pass

def get_thing(url):
    r = requests.get(url)
    r.raise_for_status()
    handle_resource(r.json())

# every 5 seconds attempt to fetch the resource and handle it,
# quit when there is no exception (actual success) or there is
# a non 404 exception (call failed in a bad way, so don't try it again)
task = Task(get_thing, args['http://whatever.com/resource/id'], delay=5,
            success_condition=lambda t: (t.exception and '404' not in t.exception.message or
                                         not t.exception))

不使用TaskHeap的任务

from functastic import Task

task = Task(f, args['a'], attempts=10)

# task.retry always starts as True
while task.retry:
    # calling the task calls the function
    task()
    # at this point, task.retry may have become False depending on the task
    # optionally sleep in between calls
    time.sleep(2)

RecurringTask使用

重复任务用于需要重复运行的任务,无论结果如何。想象一个没有成功条件且实际上除了日志消息略有不同之外,你可以在使用Task时通过设置始终失败的成功条件来实现RecurringTask逻辑。仅当你想永久重复运行某些内容时才使用RecurringTask,否则如果你有尝试次数、任务超时或成功条件,你应该使用Task

from functastic import Task
from functastic import RecurringTask
# using Task, set the success condition to `lambda t: None`
# and do not specify attempts
task = Task(f, args=['a'], delay=10,
            success_condition=lambda t: None)
# and the equivalent RecurringTask:
rtask = RecurringTask(f, args=['a'], delay=10)

# this task will be run 5 times, 10 seconds after the previous run,
# regardless of what f returns or raises. this behavior cannot be
# achieved with RecurringTask
task = Task(f, args=['a'], delay=10, attempts=5,
            success_condition=lambda t: None)

TaskHeap使用

TaskTaskHeap结合使用,我将使用一个经常因异常和返回值而失败的简单函数

def usually_fails(arg):
    if random.randint(1, 4) != 1:
        raise Exception('everything is ruined')
    if random.randint(1, 4) != 2:
        return None
    print '%s ran at %s' % (arg, datetime.today())
    return arg

TaskHeap是可迭代的,并作为boolstr(tasks)工作,给出了相当好的输出

from functastic import Task
from functastic import TaskHeap
tasks = TaskHeap()
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))
if tasks:
    print len(tasks)
    print str(tasks)
    for task in tasks:
        print task

非线程使用

运行一个任务或一组任务,并在它们完成后返回。如果没有使用stop=True,则tasks.run()调用将永远阻塞,因为它不会在每个TaskHeap间隔停止迭代。

from functastic import Task
from functastic import TaskHeap
# add tasks and then run run(stop=True)
tasks = TaskHeap()
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))
tasks.run(stop=True)

与线程库一起使用

TaskHeap与线程库配合良好。这将在另一个线程中运行任务循环,并在它们运行时随意添加任务。

import eventlet
from functastic import Task
from functastic import TaskHeap
# note the use of eventlet.sleep here to specify which sleep
# function TaskHeap should use, or use monkey patching
# interval can also be passed if you don't like the default 0.1s
# this sets the interval task run interval to 3 seconds
tasks = TaskHeap(sleep=eventlet.sleep, interval=3)
eventlet.spawn(tasks.run)
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))

# have to sleep here to surrender execution to the green thread
while tasks:
    tasks.sleep()

停止任务

一旦使用run()启动了TaskHeap,它将无限期地运行,除非在run(stop=True)中传递了stop=True。它可以通过两种不同的方式停止:- stop_after(),一旦所有任务都完成,就使任务循环退出。很明显,即使堆中有任何一个RecurringTaskstop_after()也不会做任何事情。- stop_now(),尽可能快地使任务循环停止。由于任务循环是单线程的,它将在完成当前迭代后退出。这意味着当前任务(如果有),将继续按计划进行,但除非再次调用run(),否则不会运行所有未来的任务。

import eventlet
from functastic import Task
from functastic import TaskHeap

tasks = TaskHeap(sleep=eventlet.sleep)
gt = eventlet.spawn(tasks.run)
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))

# stop the tasks thread after 5 second, gt.wait() will return almost
# instantly
tasks.sleep(5)
tasks.stop_now()
gt.wait()             # <-- this line should return quickly

# start the tasks again
gt = eventlet.spawn(tasks.run)

# this time tell the tasks loop to exit once finished
tasks.sleep()
tasks.stop_after()
gt.wait()             # <-- this line should return when all tasks complete

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

functastic-2.2.tar.gz (12.5 kB 查看哈希值)

上传时间

由支持