跳转到主要内容

与luigi一起工作的工具。

项目描述

Build status License Documentation status

Luigi-tools

这个包扩展并添加了luigi包的新功能。以下是一些这些功能的示例

  • 添加一个新的BoolParameter,当默认值为True时,会自动切换到显式解析(否则无法使用CLI将其设置为False)。
  • 添加几种类型的可选参数。
  • 添加一个OutputLocalTarget类,以帮助构建输出树。
  • 添加一个混合类,添加一个--rerun参数,强制重新运行给定的任务,即使其目标存在,也会重新运行所有依赖于该任务的任务。
  • 添加一个混合类,用于删除失败任务的输出,这可能是损坏或不完整的。此功能应用snakemake规则的默认行为(任务)。
  • 添加一个新的@copy_params机制,用于从任务复制参数到另一个任务(@inherits将相同的对象提供给所有继承的任务,而@copy_params仅复制参数的定义,因此每个继承的任务可以有不同的值)。
  • 为给定任务添加获取和显示依赖图的功能。
  • 添加机制以设置luigi.cfg文件的模板,这样用户只需更新特定值,而不是复制整个luigi.cfg

请阅读完整的API文档以获取更多详细信息。

安装

应该使用pip安装此包

pip install luigi-tools

用法

Luigi包自我描述如下

Luigi是一个Python包,帮助您构建复杂的批处理作业管道。它处理依赖关系解析、工作流管理、可视化、处理故障、命令行集成等。

luigi-tools包旨在使开发者更容易使用Luigi。以下展示了该包主要功能的几个示例。

布尔参数

luigi.BoolParameter可以以两种方式解析:隐式或显式。显式方式要求用户输入一个值:TrueFalse。相反,隐式方式不需要值,如果提供了参数,则将值设置为True。这不兼容默认值设置为True的情况,因为在这种情况下无法使用CLI将该值设置为False

如果您希望默认值为True时自动将解析设置为显式

from luigi.task import Task
from luigi_tools import BoolParameter

class MyTask(Task):

    a_boolean_parameter = BoolParameter(default=True)

    def run(self):
        pass

具有前缀的目标

Luigi工作流基于表示工作流步骤状态的Target对象。这些目标可以是任何东西,但通常是结果目录树中的文件。为了避免为每个目标指定结果目录,可以使用OutputLocalTarget类并给它一个prefix。因此,基于此类的所有目标都将位于同一目录中。

from luigi.task import Task
from luigi_tools.target import OutputLocalTarget

class MyTask(Task):

    def run(self):
        pass

    def output(self):
        # The target will point to the file result_directory/filename.ext
        return OutputLocalTarget("filename.ext")

# Set the default prefix (it could also be called inside another Task)
OutputLocalTarget.set_default_prefix("result_directory")

# Run the task (the task can also be called with the CLI as usual)
luigi.build([MyTask()], local_scheduler=True)

可重试的任务

Luigi中,任务的状态是从它们的依赖关系中推导出来的。如果目标存在,则假定任务已经完成,因此在再次运行工作流时会被跳过。这种行为通常很好,可以避免执行已经完成的计算。然而,有时在开发过程中可能希望覆盖以前的结果。因此,引入了一个mixin,它为任务添加一个--rerun参数。当此参数设置为True时,此任务的所有目标以及依赖于此任务的任务的目标都会被删除。因此,与该任务相关的所有任务都将再次运行。与任何mixin一样,它必须在继承列表中位于Task类的左侧。

from luigi.task import Task
from luigi_tools.task import RerunMixin

class MyTask(RerunMixin, Task):

    def run(self):
        pass

现在任务MyTask有一个布尔参数--rerun,可以在CLI中调用

luigi -m my_module mytask --rerun
luigi -m my_module another_task_that_depends_on_mytask --MyTask-rerun

清除失败任务的输出

当任务意外失败时,可能会留下不完整或损坏的输出,导致下游错误的结果。使用RemoveCorruptedOutputMixin,Luigi自动删除失败任务的目标输出。这是其他工作流管理系统(如Snakemake)中的默认行为。

from luigi_tools.task import RemoveCorruptedOutputMixin

    class TaskA(RemoveCorruptedOutputMixin, luigi.Task):
        """TaskA can remove its output upon failure."""
        pass

默认情况下,clean_failedfalse,必须显式设置为true。这允许用户将其设置为false以进行调试,而不必更改代码。

luigi -m my_module TaskA --clean_failed true

复制参数

在某些情况下,几个任务有一些共同的参数。这可能会导致痛苦的情况,luigi提供了一些专门的工具来处理这种情况,如本文档所述。然而,Luigi提供的工具有一个主要缺点:所有具有继承参数的任务都将具有此参数的相同值。在某些情况下,可能希望为具有继承参数的任务提供不同的值,尤其是在开发过程中。这可以通过使用@copy_params装饰器来实现

from luigi.task import Task
from luigi_tools.task import copy_params

class TaskA(Task):
        a = luigi.Parameter(default="default_value_a")

@luigi_tools.task.copy_params(
    a=luigi_tools.task.ParamRef(TaskA)
)
class TaskB(Task):
    b = luigi.Parameter(default="b")

这里类TaskB有两个参数

  • a,默认值为default_value_a
  • b,默认值为b

还可以更改参数名称或更改默认值

from luigi.task import Task
from luigi_tools.task import copy_params

class TaskA(Task):
        a = luigi.Parameter(default="default_value_a")

@luigi_tools.task.copy_params(
    a=luigi_tools.task.ParamRef(TaskA),
    aa=luigi_tools.task.ParamRef(TaskA, "a"),
    a_default=luigi_tools.task.ParamRef(TaskA, "a", "given_default_value"),
    a_none=luigi_tools.task.ParamRef(TaskA, "a", None),
)
class TaskB(Task):
    b = luigi.Parameter(default="b")

在这种情况下,类TaskB有5个参数

  • a,默认值为default_value_a
  • aa,默认值为a
  • a_default,默认值为given_default_value
  • a_none,默认值为None
  • b,默认值为b

请注意,ParamRef的第二个参数是父类中继承参数的名称。如果没有给出,则假设继承和父类中的参数名称相同。

全局参数

除了@copy_params装饰器外,还可以使用GlobalParamMixin混合。具有此混合的任务在继承使用@copy_params的参数方面有一个新功能:如果未在ParamRef中更改默认值,并且未为任务提供特定值,则任务将采用继承参数的相同值。这种组合的@copy_params装饰器和GlobalParamMixin允许以多种方式处理参数。

from luigi.task import Task
from luigi_tools.task import copy_params
from luigi_tools.task import GlobalParamMixin

class TaskA(Task):
        a = luigi.Parameter(default="default_value_a")

@luigi_tools.task.copy_params(
    a=luigi_tools.task.ParamRef(TaskA)
)
class TaskB(GlobalParamMixin, Task):
    b = luigi.Parameter(default="b")

现在如果使用以下配置调用TaskB

[TaskA]
a = "value for a"

[TaskB]
b = "value for b"

TaskB的参数a的值为value for a。如果TaskB没有继承自GlobalParamMixin,则其值将为default_value_a

依赖图

luigi-tools包提供了一些函数来获取任务依赖图并使用GraphViz进行渲染。这可以非常有用,可以显示工作流中任务的组织方式。

资助与致谢

本软件的开发得到了瑞士联邦理工学院洛桑联邦理工学院(EPFL)的研究中心——蓝脑计划的资助。

有关许可和作者,请参阅LICENSE.txtAUTHORS.md

版权 © 2021-2022 蓝脑项目/EPFL

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

luigi-tools-0.3.4.tar.gz (60.9 kB 查看散列)

上传时间

构建分布

luigi_tools-0.3.4-py3-none-any.whl (23.3 kB 查看哈希值)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面