pipecutter为luigi提供了一些工具,以便它与pandas、scikit-learn和Jupyter notebooks等数据科学库和环境更好地协同工作。
项目描述
pipecutter
pipecutter为luigi提供了一些工具,以便它与pandas、scikit-learn和Jupyter notebooks等数据科学库和环境更好地协同工作。
目录
安装
pip install pipecutter
需要Python 3.6+。pipecutter遵循语义版本控制。
使用
pipecutter目前提供
- 在Jupyter笔记本等交互式环境中运行和调试luigi任务的一种更便捷的方式
- 一些luigi目标,例如将pandas数据框架保存到parquet、使用joblib的scikit-learn模型等
在交互式环境中调试
使用luigi,您可以通过使用luigi.build
函数(可能需要将local_scheduler=True
作为参数)在Python脚本/Jupyter笔记本/Python控制台中运行任务。然而,如果任务抛出异常,这将会被luigi捕获,您无法进入调试会话。pipecutter.run
是luigi.build
的一个轻量级包装器,它禁用了这种异常处理。
In [1]: import luigi
In [2]: import pipecutter
In [3]: class TaskWhichFails(luigi.Task):
...: def run(self):
...: raise Exception("Something is wrong")
# Traceback below is shortened for readability
In [4]: pipecutter.run(TaskWhichFails())
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-5-a970d52d810a> in <module>
----> 1 pipecutter.run(TaskWhichFails())
...
<ipython-input-3-4e27674090fa> in run(self)
1 class TaskWhichFails(luigi.Task):
2 def run(self):
----> 3 raise Exception
Exception: Something is wrong
# Drop straight into the debugger
In [5]: %debug
> <ipython-input-6-e7528a27d82e>(3)run()
1 class TaskWhichFails(luigi.Task):
2 def run(self):
----> 3 raise Exception
4
ipdb>
这应该会降低在使用模型进行开发时使用luigi任务的技术障碍,从而使得后续的生产部署更加容易。
此外,您可以使用pipecutter.print_tree
(对luigi.tools.deps_tree.print_tree
的包装)打印任务的依赖关系,或者使用pipecutter.build_graph
构建一个Graphviz图,您可以将其保存为.png、.pdf等格式,或者直接在Jupyter笔记本中查看。有关该功能的外观,请参阅完整示例中的截图。函数build_graph
需要您已安装graphviz。
目标
在pipecutter.targets
中,您可以找到一些基于luigi的LocalTarget
的目标,但它们还额外提供了load
和dump
方法。通过使用task_id
来命名目标是一种方便的方法,它在任务名称及其传递的参数上是唯一的。
import luigi
import pipecutter
from pipecutter.targets import JoblibTarget
from sklearn.ensemble import RandomForestClassifier
class TrainModel(luigi.Task):
n_estimators = luigi.IntParameter()
def output(self):
return JoblibTarget(self.task_id + ".joblib")
def run(self):
model = RandomForestClassifier(n_estimators=self.n_estimators)
self.output().dump(model)
pipecutter.run(TrainModel(n_estimators=100))
# -> Produces a file called TrainModel_100_0b0ec0cdea.joblib
如果您在文件名中使用task_id
,则可以使用添加了output
方法的pipecutter.targets.outputs
装饰器将上述任务写得更简洁。默认情况下,它将文件放入名为data
的文件夹中。这可以通过可选的folder
参数进行调整。
from pipeline.targets import outputs
@outputs(JoblibTarget)
class TrainModel(luigi.Task):
n_estimators = luigi.IntParameter()
def run(self):
model = RandomForestClassifier(n_estimators=self.n_estimators)
self.output().dump(model)
完整示例
import luigi
import pandas as pd
import numpy as np
import pipecutter
from luigi.util import requires
from pipecutter.targets import outputs, JoblibTarget, ParquetTarget
from sklearn.ensemble import RandomForestClassifier
@outputs(ParquetTarget)
class PrepareData(luigi.Task):
drop_missings = luigi.BoolParameter()
def run(self):
train_df = pd.DataFrame.from_dict({"A": [0, 1, np.nan], "B": [5, 1, 2], "label": [0, 1, 1]})
if self.drop_missings:
train_df = train_df.dropna()
self.output().dump(train_df)
@requires(PrepareData)
@outputs(JoblibTarget)
class TrainModel(luigi.Task):
n_estimators = luigi.IntParameter()
def run(self):
train_df = self.input().load()
X, y = train_df.drop("label", axis=1), train_df["label"]
model = RandomForestClassifier(n_estimators=self.n_estimators)
model.fit(X, y)
self.output().dump(model)
train_model = TrainModel(n_estimators=100, drop_missings=True)
pipecutter.build_graph(train_model)
最后一个命令可用于可视化依赖关系树,这对于您的管道更复杂时特别有用。它返回一个graphviz.Digraph
对象,它将在Jupyter笔记本中渲染。
最后,运行任务。
pipecutter.run(train_model)
项目详情
下载文件
下载您平台上的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。
源分发
构建分发
pipecutter-2.0.0-py3-none-any.whl的散列值
算法 | 散列摘要 | |
---|---|---|
SHA256 | 90445c92548ff8ec7eb9b78cba826c3fd7b9ad71b14b2e72f851641168b3788e |
|
MD5 | dda8a663906cd8f831b20a35c016afad |
|
BLAKE2b-256 | bbbffd3282c5cc8e1eaaab30d1d1b10d74c44684ec1a99d03990391757fa2b5a |