线程任务重试模块
项目描述
functastic用于管理您希望重试直到满足成功条件或可能永远进行的任务。它可以单线程运行或在单独的线程中运行。可以配置任务启动时间、成功条件、重试尝试、重试间隔和时间间隔退避。需要注意的是,所有任务都在单个线程中依次运行,因此如果有一个运行时间较长的任务,可能会延迟后续任务的实际开始时间。考虑到这一点,functastic目前更适合快速任务。我希望将来以侧线程的形式启动任务,但目前不想要求使用任何特定的线程库。
functastic提供两个类:TaskHeap和Task。 Tasks封装一个函数并将其附加到TaskHeap上,该TaskHeap提供了一个run()函数句柄,用于运行/调度/重试Tasks直到满足成功条件。 Task的默认成功条件是函数不抛出任何异常并返回任何非None值。
调用Task对象永远不会引发异常。手动调用任务或使用TaskHeap将仅记录异常,并可能使用它们来确定成功,但不会引发异常。这一规则的唯一例外是,如果你的自定义success_condition函数引发异常,因此请小心编写它们。Task在初始化时也会引发异常,如果你提供了一个不可调用的success_condition。
安装
使用pip install functastic安装,或者克隆仓库并使用python setup.py install,或者如果你想玩代码,可以使用pip install -e。
Task使用
基本任务是一个包装了函数的封装,它具有一些用于确定成功和何时运行函数的属性。请注意,与预定时间、延迟或超时有关的内容是“最佳尝试”。长时间运行的任务可能会干扰任务计划。任务的配置属性包括:
func,要运行的函数
args,传递给函数的参数列表
kwargs,传递给函数的关键字参数字典
attempts,重试次数(设置为0表示直到成功为止)
task_timeout,函数可以重试的秒数
delay,函数每次运行之间的时间(由回退修改)
backoff,延迟乘数,每次迭代以指数方式增加延迟。backoff = 1是标准间隔,backoff = 2将每次重试之间的时间加倍
start_time,函数首次运行的时间戳,例如time.time() + 30表示现在30秒后运行
success_condition,用于确定本次迭代任务是否成功的函数。默认为没有引发异常且返回值不为空。如果success_condition不可调用,则Task初始化期间将引发异常
以下是一些创建任务的示例。
from functastic import Task
import time
f = some_function
# this is the basic task, it will have retry set to True
# until it returns a non None value and doesn't raise
task = Task(f, args['a'])
# let's give it only 10 tries. after 10 tries, retry will be set to False
task = Task(f, args['a'], attempts=10)
# and slow it down a bit (wait 1 second between each attempt).
# the delay schedules the task again 1 second after the previous
# attempt. without delay, schedule for task's next run will be immediately
# TaskHeap manages the schedules if you want to use it
task = Task(f, args['a'], attempts=10, delay=1)
# and now let's make it backoff if at first it doesn't succeed
# this will be run at t=[0, 1, 2, 4, 8, 16, 32, 64, 128, 256] seconds
task = Task(f, args['a'], attempts=10, delay=1, backoff=2)
# another way to think of a task only having a certain number of attempts
# is to give it a timeout
# this function will be run approximately every 1 second for 60 seconds
# CAUTION! long running tasks may delay things, tasks with task_timeout
# can only be guaranteed to run once
task = Task(f, args['a'], task_timeout=60, delay=1)
# want to schedule a task to start running 60 seconds from now?
# note that the task_timeout doesn't start counting until the first run
# so this function will start running in 60 seconds and retry every 1
# second for 30 seconds
task = Task(f, args['a'], start_time=time.time()+60, delay=1,
task_timeout=30))
# define your own success condition for a task
task = Task(f, args['a'], delay=1,
success_condition=lambda t: t.result == 'a')
# or change it later
task.success_condition = lambda t: t.result == 'b'
# you could also define a more involved function instead of lambdas
def success(task):
if 'some key' in task.result:
return True
task = Task(f, args['a'], delay=1, success_condition=success)
通过某种方式失败而“成功”的任务
有时将success_condition视为一个break条件是有意义的。假设有一个向Web服务发出请求的函数。如果你得到一个404,因为你正在轮询资源变得可用,那么重试是有意义的,但如果得到另一个错误,比如401 unauthorized,重试就没有意义了,所以如果我们得到非404异常,我们希望break(或“成功”)
import requests
from functastic import Task
def handle_resource(res):
pass
def get_thing(url):
r = requests.get(url)
r.raise_for_status()
handle_resource(r.json())
# every 5 seconds attempt to fetch the resource and handle it,
# quit when there is no exception (actual success) or there is
# a non 404 exception (call failed in a bad way, so don't try it again)
task = Task(get_thing, args['http://whatever.com/resource/id'], delay=5,
success_condition=lambda t: (t.exception and '404' not in t.exception.message or
not t.exception))
不使用TaskHeap的任务
from functastic import Task
task = Task(f, args['a'], attempts=10)
# task.retry always starts as True
while task.retry:
# calling the task calls the function
task()
# at this point, task.retry may have become False depending on the task
# optionally sleep in between calls
time.sleep(2)
RecurringTask使用
重复任务用于需要重复运行的任务,无论结果如何。想象一个没有成功条件且实际上除了日志消息略有不同之外,你可以在使用Task时通过设置始终失败的成功条件来实现RecurringTask逻辑。仅当你想永久重复运行某些内容时才使用RecurringTask,否则如果你有尝试次数、任务超时或成功条件,你应该使用Task。
from functastic import Task
from functastic import RecurringTask
# using Task, set the success condition to `lambda t: None`
# and do not specify attempts
task = Task(f, args=['a'], delay=10,
success_condition=lambda t: None)
# and the equivalent RecurringTask:
rtask = RecurringTask(f, args=['a'], delay=10)
# this task will be run 5 times, 10 seconds after the previous run,
# regardless of what f returns or raises. this behavior cannot be
# achieved with RecurringTask
task = Task(f, args=['a'], delay=10, attempts=5,
success_condition=lambda t: None)
TaskHeap使用
将Task与TaskHeap结合使用,我将使用一个经常因异常和返回值而失败的简单函数
def usually_fails(arg):
if random.randint(1, 4) != 1:
raise Exception('everything is ruined')
if random.randint(1, 4) != 2:
return None
print '%s ran at %s' % (arg, datetime.today())
return arg
TaskHeap是可迭代的,并作为bool和str(tasks)工作,给出了相当好的输出
from functastic import Task
from functastic import TaskHeap
tasks = TaskHeap()
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))
if tasks:
print len(tasks)
print str(tasks)
for task in tasks:
print task
非线程使用
运行一个任务或一组任务,并在它们完成后返回。如果没有使用stop=True,则tasks.run()调用将永远阻塞,因为它不会在每个TaskHeap间隔停止迭代。
from functastic import Task
from functastic import TaskHeap
# add tasks and then run run(stop=True)
tasks = TaskHeap()
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))
tasks.run(stop=True)
与线程库一起使用
TaskHeap与线程库配合良好。这将在另一个线程中运行任务循环,并在它们运行时随意添加任务。
import eventlet
from functastic import Task
from functastic import TaskHeap
# note the use of eventlet.sleep here to specify which sleep
# function TaskHeap should use, or use monkey patching
# interval can also be passed if you don't like the default 0.1s
# this sets the interval task run interval to 3 seconds
tasks = TaskHeap(sleep=eventlet.sleep, interval=3)
eventlet.spawn(tasks.run)
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))
# have to sleep here to surrender execution to the green thread
while tasks:
tasks.sleep()
停止任务
一旦使用run()启动了TaskHeap,它将无限期地运行,除非在run(stop=True)中传递了stop=True。它可以通过两种不同的方式停止:- stop_after(),一旦所有任务都完成,就使任务循环退出。很明显,即使堆中有任何一个RecurringTask,stop_after()也不会做任何事情。- stop_now(),尽可能快地使任务循环停止。由于任务循环是单线程的,它将在完成当前迭代后退出。这意味着当前任务(如果有),将继续按计划进行,但除非再次调用run(),否则不会运行所有未来的任务。
import eventlet
from functastic import Task
from functastic import TaskHeap
tasks = TaskHeap(sleep=eventlet.sleep)
gt = eventlet.spawn(tasks.run)
tasks.append(Task(usually_fails, args=['a'], delay=1))
tasks.append(Task(usually_fails, args=['b'], attempts=10, delay=1))
# stop the tasks thread after 5 second, gt.wait() will return almost
# instantly
tasks.sleep(5)
tasks.stop_now()
gt.wait() # <-- this line should return quickly
# start the tasks again
gt = eventlet.spawn(tasks.run)
# this time tell the tasks loop to exit once finished
tasks.sleep()
tasks.stop_after()
gt.wait() # <-- this line should return when all tasks complete
项目详情
functastic-2.2.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 0969cf0db93b1ee219aefaf0c98bd094daa62ab0a7e057030dd66308d0662c64 |
|
MD5 | c81f36aa9f899b08e695cd715dfd7861 |
|
BLAKE2b-256 | c0b591cfd94870eb7d4381b087c2cb524568c360654412244a3716bfd2175993 |