跳转到主要内容

一个用于定义、管理和执行函数管道的Python库。

项目描述

PipeFunc:结构化、自动化和简化您的计算工作流程 🕸

🚫 停止微观管理执行。专注于科学。使用函数管道捕获您工作流程的本质,将计算表示为DAG 🕸️,并自动化并行扫描 🔄。

Python PyPi Ruff pytest Conda Coverage CodSpeed Badge Documentation Downloads GitHub

:books: 目录

:thinking: 这是关于什么?

asciicast

pipefunc 是一个用于创建和执行 函数管道 的 Python 库。通过简单注释函数并指定它们的输出,它可以构建一个管道,根据依赖关系自动管理执行顺序。将管道视为有向图,执行所有(或特定)输出,添加多维扫描,自动并行化管道,并获得结构化的数据。

[!注意] 管道 是一系列相互连接的函数,以 有向无环图 (DAG) 的形式组织,其中一个或多个函数的输出作为后续函数的输入。pipefunc 简化了这些管道的创建和管理,提供了强大的工具以高效地执行它们。

无论您是在处理数据处理、科学计算、机器学习(AI)工作流程,还是任何涉及相互依赖函数的其他场景,pipefunc 都可以帮助您专注于代码的逻辑,同时它处理函数的依赖关系和执行顺序的复杂性。

:rocket: 关键功能

  1. 🚀 函数组合和管道化:使用 @pipefunc 装饰器创建管道;执行顺序自动处理。
  2. 📊 管道可视化:生成管道的可视化图形,以更好地理解数据流。
  3. 👥 多个输出:处理返回多个结果的函数,允许每个结果作为其他函数的输入。
  4. 🔁 Map-Reduce 支持:执行“map”操作将函数应用于数据,以及“reduce”操作聚合结果,允许多维映射。
  5. ➡️ 管道简化:合并复杂管道中的节点,以在单个步骤中运行多个函数。
  6. 👮 类型注解验证:验证函数之间的类型注解以确保类型一致性。
  7. 🎛️ 资源使用分析:获取有关 CPU 使用、内存消耗和执行时间的报告,以识别瓶颈并优化您的代码。
  8. 🔄 自动并行化:自动以并行(本地或远程)方式运行管道,具有共享内存和磁盘缓存选项。
  9. 🔍 参数扫描工具:生成参数组合用于参数扫描,并通过结果缓存优化扫描。
  10. 💡 灵活的函数参数:使用不同的参数组合调用函数,让 pipefunc 根据提供的参数确定调用哪些其他函数。
  11. 🏗️ 利用巨人:基于 NetworkX 进行图算法,NumPy 进行多维数组,可选 Xarray 进行标记的多维数组,Zarr 存储结果在内存/磁盘/云或任何键值存储中,以及 Adaptive 进行并行扫描。
  12. 🤓 内行数据:>600 个测试,100% 测试覆盖率,完全类型化,仅 4 个必需依赖项,所有 Ruff 规则,所有 公共 API 已文档化。

:test_tube: 它是如何工作的?

pipefunc 提供了一个 Pipeline 类,您可以使用它来定义您的函数管道。您可以使用 pipefunc 装饰器将函数添加到管道中,这还允许您指定函数的输出名称。一旦定义了管道,您就可以针对特定输出值执行它,通过合并函数节点简化它,将其可视化为有向图,并分析管道函数的资源使用情况。有关更详细的用法说明和示例,请参阅包中提供的用法示例。

以下是一个 pipefunc 的简单示例用法,用于说明其主要功能

from pipefunc import pipefunc, Pipeline

# Define three functions that will be a part of the pipeline
@pipefunc(output_name="c")
def f_c(a, b):
    return a + b

@pipefunc(output_name="d")
def f_d(b, c):
    return b * c

@pipefunc(output_name="e")
def f_e(c, d, x=1):
    return c * d * x

# Create a pipeline with these functions
pipeline = Pipeline([f_c, f_d, f_e], profile=True)  # `profile=True` enables resource profiling

# Call the pipeline directly for different outputs:
assert pipeline("d", a=2, b=3) == 15
assert pipeline("e", a=2, b=3, x=1) == 75

# Or create a new function for a specific output
h_d = pipeline.func("d")
assert h_d(a=2, b=3) == 15

h_e = pipeline.func("e")
assert h_e(a=2, b=3, x=1) == 75
# Instead of providing the root arguments, you can also provide the intermediate results directly
assert h_e(c=5, d=15, x=1) == 75

# Visualize the pipeline
pipeline.visualize()

# Get all possible argument mappings for each function
all_args = pipeline.all_arg_combinations
print(all_args)

# Show resource reporting (only works if profile=True)
pipeline.print_profiling_stats()

此示例演示了使用 f_cf_df_e 函数定义管道,使用管道访问和执行这些函数,可视化管道图,获取所有可能的参数映射,并报告资源使用情况。这个基本示例应该能给您一个关于如何使用 pipefunc 构建和管理函数管道的思路。

以下示例演示了如何使用 pipefunc 执行 map-reduce 操作。

from pipefunc import pipefunc, Pipeline
from pipefunc.map import load_outputs
import numpy as np

@pipefunc(output_name="c", mapspec="a[i], b[j] -> c[i, j]")  # the mapspec is used to specify the mapping
def f(a: int, b: int):
    return a + b

@pipefunc(output_name="mean")  # there is no mapspec, so this function takes the full 2D array
def g(c: np.ndarray):
    return np.mean(c)

pipeline = Pipeline([f, g])
inputs = {"a": [1, 2, 3], "b": [4, 5, 6]}
pipeline.map(inputs, run_folder="my_run_folder", parallel=True)
result = load_outputs("mean", run_folder="my_run_folder")
print(result)  # prints 7.0

在这里,mapspec 参数用于指定函数 f 的输入和输出之间的映射,它创建了输入列表 ab 的乘积,并计算每对数字的和。然后,g 函数计算结果二维数组的平均值。map 方法对 inputs 执行管道,而 load_outputs 函数用于从指定的运行文件夹中加载 g 函数的结果。

:notebook: Jupyter Notebook示例

请参阅我们的 example.ipynb 中的详细用法示例和其他内容。

:computer: 安装

建议从 conda 安装 最新稳定版(推荐)

conda install pipefunc

或从 PyPI 安装

pip install "pipefunc[all]"

或使用以下命令安装 main

pip install -U https://github.com/pipefunc/pipefunc/archive/main.zip

或克隆存储库并执行开发安装(推荐用于开发)

git clone git@github.com:pipefunc/pipefunc.git
cd pipefunc
pip install -e ".[dev]"

:hammer_and_wrench: 开发

我们使用 pre-commit 来管理预提交钩子,这有助于我们确保我们的代码始终干净并符合我们的编码标准。要设置它,请使用 pip 安装 pre-commit,然后运行安装命令

pip install pre-commit
pre-commit install

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

pipefunc-0.35.1.tar.gz (144.7 KB 查看哈希值

上传时间:

构建分布

pipefunc-0.35.1-py3-none-any.whl (120.4 KB 查看哈希值

上传时间: Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面