与luigi一起工作的工具。
项目描述
Luigi-tools
这个包扩展并添加了luigi包的新功能。以下是一些这些功能的示例
- 添加一个新的
BoolParameter
,当默认值为True
时,会自动切换到显式解析(否则无法使用CLI将其设置为False
)。 - 添加几种类型的可选参数。
- 添加一个
OutputLocalTarget
类,以帮助构建输出树。 - 添加一个混合类,添加一个
--rerun
参数,强制重新运行给定的任务,即使其目标存在,也会重新运行所有依赖于该任务的任务。 - 添加一个混合类,用于删除失败任务的输出,这可能是损坏或不完整的。此功能应用snakemake规则的默认行为(任务)。
- 添加一个新的
@copy_params
机制,用于从任务复制参数到另一个任务(@inherits
将相同的对象提供给所有继承的任务,而@copy_params
仅复制参数的定义,因此每个继承的任务可以有不同的值)。 - 为给定任务添加获取和显示依赖图的功能。
- 添加机制以设置
luigi.cfg
文件的模板,这样用户只需更新特定值,而不是复制整个luigi.cfg
。
请阅读完整的API文档以获取更多详细信息。
安装
应该使用pip安装此包
pip install luigi-tools
用法
Luigi包自我描述如下
Luigi是一个Python包,帮助您构建复杂的批处理作业管道。它处理依赖关系解析、工作流管理、可视化、处理故障、命令行集成等。
luigi-tools包旨在使开发者更容易使用Luigi。以下展示了该包主要功能的几个示例。
布尔参数
luigi.BoolParameter可以以两种方式解析:隐式或显式。显式方式要求用户输入一个值:True
或False
。相反,隐式方式不需要值,如果提供了参数,则将值设置为True
。这不兼容默认值设置为True
的情况,因为在这种情况下无法使用CLI将该值设置为False
。
如果您希望默认值为True
时自动将解析设置为显式
from luigi.task import Task
from luigi_tools import BoolParameter
class MyTask(Task):
a_boolean_parameter = BoolParameter(default=True)
def run(self):
pass
具有前缀的目标
Luigi工作流基于表示工作流步骤状态的Target
对象。这些目标可以是任何东西,但通常是结果目录树中的文件。为了避免为每个目标指定结果目录,可以使用OutputLocalTarget
类并给它一个prefix
。因此,基于此类的所有目标都将位于同一目录中。
from luigi.task import Task
from luigi_tools.target import OutputLocalTarget
class MyTask(Task):
def run(self):
pass
def output(self):
# The target will point to the file result_directory/filename.ext
return OutputLocalTarget("filename.ext")
# Set the default prefix (it could also be called inside another Task)
OutputLocalTarget.set_default_prefix("result_directory")
# Run the task (the task can also be called with the CLI as usual)
luigi.build([MyTask()], local_scheduler=True)
可重试的任务
在Luigi中,任务的状态是从它们的依赖关系中推导出来的。如果目标存在,则假定任务已经完成,因此在再次运行工作流时会被跳过。这种行为通常很好,可以避免执行已经完成的计算。然而,有时在开发过程中可能希望覆盖以前的结果。因此,引入了一个mixin,它为任务添加一个--rerun
参数。当此参数设置为True
时,此任务的所有目标以及依赖于此任务的任务的目标都会被删除。因此,与该任务相关的所有任务都将再次运行。与任何mixin一样,它必须在继承列表中位于Task
类的左侧。
from luigi.task import Task
from luigi_tools.task import RerunMixin
class MyTask(RerunMixin, Task):
def run(self):
pass
现在任务MyTask
有一个布尔参数--rerun
,可以在CLI中调用
luigi -m my_module mytask --rerun
luigi -m my_module another_task_that_depends_on_mytask --MyTask-rerun
清除失败任务的输出
当任务意外失败时,可能会留下不完整或损坏的输出,导致下游错误的结果。使用RemoveCorruptedOutputMixin,Luigi自动删除失败任务的目标输出。这是其他工作流管理系统(如Snakemake)中的默认行为。
from luigi_tools.task import RemoveCorruptedOutputMixin
class TaskA(RemoveCorruptedOutputMixin, luigi.Task):
"""TaskA can remove its output upon failure."""
pass
默认情况下,clean_failed
为false
,必须显式设置为true
。这允许用户将其设置为false
以进行调试,而不必更改代码。
luigi -m my_module TaskA --clean_failed true
复制参数
在某些情况下,几个任务有一些共同的参数。这可能会导致痛苦的情况,luigi提供了一些专门的工具来处理这种情况,如本文档所述。然而,Luigi提供的工具有一个主要缺点:所有具有继承参数的任务都将具有此参数的相同值。在某些情况下,可能希望为具有继承参数的任务提供不同的值,尤其是在开发过程中。这可以通过使用@copy_params
装饰器来实现
from luigi.task import Task
from luigi_tools.task import copy_params
class TaskA(Task):
a = luigi.Parameter(default="default_value_a")
@luigi_tools.task.copy_params(
a=luigi_tools.task.ParamRef(TaskA)
)
class TaskB(Task):
b = luigi.Parameter(default="b")
这里类TaskB
有两个参数
a
,默认值为default_value_a
。b
,默认值为b
。
还可以更改参数名称或更改默认值
from luigi.task import Task
from luigi_tools.task import copy_params
class TaskA(Task):
a = luigi.Parameter(default="default_value_a")
@luigi_tools.task.copy_params(
a=luigi_tools.task.ParamRef(TaskA),
aa=luigi_tools.task.ParamRef(TaskA, "a"),
a_default=luigi_tools.task.ParamRef(TaskA, "a", "given_default_value"),
a_none=luigi_tools.task.ParamRef(TaskA, "a", None),
)
class TaskB(Task):
b = luigi.Parameter(default="b")
在这种情况下,类TaskB
有5个参数
a
,默认值为default_value_a
。aa
,默认值为a
。a_default
,默认值为given_default_value
。a_none
,默认值为None
。b
,默认值为b
。
请注意,ParamRef
的第二个参数是父类中继承参数的名称。如果没有给出,则假设继承和父类中的参数名称相同。
全局参数
除了@copy_params
装饰器外,还可以使用GlobalParamMixin
混合。具有此混合的任务在继承使用@copy_params
的参数方面有一个新功能:如果未在ParamRef
中更改默认值,并且未为任务提供特定值,则任务将采用继承参数的相同值。这种组合的@copy_params
装饰器和GlobalParamMixin
允许以多种方式处理参数。
from luigi.task import Task
from luigi_tools.task import copy_params
from luigi_tools.task import GlobalParamMixin
class TaskA(Task):
a = luigi.Parameter(default="default_value_a")
@luigi_tools.task.copy_params(
a=luigi_tools.task.ParamRef(TaskA)
)
class TaskB(GlobalParamMixin, Task):
b = luigi.Parameter(default="b")
现在如果使用以下配置调用TaskB
[TaskA]
a = "value for a"
[TaskB]
b = "value for b"
则TaskB
的参数a
的值为value for a
。如果TaskB
没有继承自GlobalParamMixin
,则其值将为default_value_a
。
依赖图
luigi-tools
包提供了一些函数来获取任务依赖图并使用GraphViz进行渲染。这可以非常有用,可以显示工作流中任务的组织方式。
资助与致谢
本软件的开发得到了瑞士联邦理工学院洛桑联邦理工学院(EPFL)的研究中心——蓝脑计划的资助。
有关许可和作者,请参阅LICENSE.txt
和AUTHORS.md
。
版权 © 2021-2022 蓝脑项目/EPFL
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建分布
luigi-tools-0.3.4.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a249be5cda54fdde5bc18dc32b21a1c6096e742ffc1d36bd2b5b58dff5f9c131 |
|
MD5 | 0e1da039050707d26b8b18f8d9803061 |
|
BLAKE2b-256 | f1ef7f95172f2b8684339b2ff32d2a743bd380f382a107d175477fd12d06d41d |
luigi_tools-0.3.4-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 0d80281dcefafd50157cfc2df9c3653651194433787361f6d56399c49f0e0cc7 |
|
MD5 | 424caf14bf3fef0c7dd6366ba5e04ba4 |
|
BLAKE2b-256 | c75d271fea6d0c3fa645de0825557d7908e83b0704152e15516131e7c561b268 |