跳转到主要内容

提供一个具有SQL后端的令人尴尬的并行工具。

项目描述

BluePyParallel: Bluebrain Python Embarrassingly Parallel library

提供一个具有SQL后端的令人尴尬的并行工具。

简介

提供一个具有SQL后端的令人尴尬的并行工具,灵感来自 @wvangeit 的 BluePyMM

安装

应使用pip安装此软件包

pip install bluepyparallel

用法

通用计算

factory_name = "multiprocessing"  # Can also be None, dask or ipyparallel
batch_size = 10  # This value is used to split the data into batches before processing them
chunk_size = 1000  # This value is used to gather the elements to process before sending them to the workers

# Setup the parallel factory
parallel_factory = init_parallel_factory(
    factory_name,
    batch_size=batch_size,
    chunk_size=chunk_size,
    processes=4,  # This parameter is specific to the multiprocessing factory
)

# Get the mapper from the factory
mapper = parallel_factory.get_mapper()

# Use the mapper to map the given function to each element of mapped_data and gather the results
result = sorted(mapper(function, mapped_data, *function_args, **function_kwargs))

与Pandas协同工作

此库提供了一个与大型 :class:pandas.DataFrame 协同工作的特定函数::func:bluepyparallel.evaluator.evaluate。此函数将DataFrame转换为字典列表(每行一个),然后将给定的函数映射到元素,最后收集结果。

示例

input_df = pd.DataFrame(index=[1, 2], columns=['data'], data=[100, 200])

def evaluation_function(row):
    result_1, result_2 = compute_something(row['data'])
    return {'new_column_1': result_1, 'new_columns_2': result_2}

# Use the mapper to map the given function to each element of the DataFrame
result_df = evaluate(
    input_df,  # This is the DataFrame to process
    evaluation_function,  # This is the function that should be applied to each row of the DataFrame
    parallel_factory="multiprocessing",  # This could also be a Factory previously defined
    new_columns=[['new_column_1', 0], ['new_columns_2', None]],  # this defines default values for columns
)
assert result_df.columns == ['data', 'new_columns_1', 'new_columns_2']

它在某种程度上是pandas .apply方法的泛化。

与SQL后端协同工作

由于它旨在与耗时函数协同工作,因此还提供了一种使用SQL后端的检查点和恢复机制。该SQL后端使用SQLAlchemy库,因此它可以与多种数据库类型(如SQLite、PostgreSQL、MySQL等)协同工作。要激活此功能,只需将SQLAlchemy可以处理的URL传递给:func:bluepyparallel.evaluator.evaluatedb_url参数。

.. note:: 可能需要安装特定驱动程序才能访问数据库(例如,例如,用于PostgreSQL的psycopg2 <https://www.psycopg.org/docs>)。

示例

# Use the mapper to map the given function to each element of the DataFrame
result_df = evaluate(
    input_df,  # This is the DataFrame to process
    evaluation_function,  # This is the function that should be applied to each row of the DataFrame
    parallel_factory="multiprocessing",  # This could also be a Factory previously defined
    db_url="sqlite:///db.sql",  # This could also just be "db.sql" and would be automatically turned to SQLite URL
)

现在,如果由于任何原因计算崩溃,则部分结果将存储在db.sql文件中。如果崩溃是由于外部原因(因此再次执行代码应该可以工作),则可以从最后一个计算元素恢复计算。因此,只需计算缺失的元素,这可以节省大量时间。

在HPC系统上使用分布式Dask MPI运行

这是一个示例sbatch脚本,可以修改以使用分布式dask和MPI执行多个节点和工作者的脚本。在此示例中,由run.py调用的代码应使用BluePyParallel并行化。

Dask变量不是必需的,但强烈建议,并且可以进行微调。

#!/bin/bash -l

# Dask configuration
export DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info"
export DASK_DISTRIBUTED__WORKER__USE_FILE_LOCKING=False
export DASK_DISTRIBUTED__WORKER__MEMORY__TARGET=False  # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=False  # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__PAUSE=0.80  # pause execution at 80% memory use
export DASK_DISTRIBUTED__WORKER__MEMORY__TERMINATE=0.95  # restart the worker at 95% use
export DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=spawn
export DASK_DISTRIBUTED__WORKER__DAEMON=True
# Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms  # Time between statistical profiling queries
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms  # Time between starting new profile

# Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)
export PARALLEL_BATCH_SIZE=1000

srun -v run.py

为了确保仅使用并行Dask运行evaluate函数,必须在代码中执行任何其他操作之前初始化并行工厂。例如,run.py可能看起来像这样

if __name__ == "__main__":
    parallel_factory = init_parallel_factory('dask_dataframe')
    df = pd.read_csv("inuput_data.csv")
    df = some_preprocessing(df)
    df = evaluate(df, function_to_evaluate, parallel_factory=parallel_factory)
    df.to_csv("output_data.csv")

这是因为init_parallel_factory之前的所有内容都将并行运行,因为mpi尚未初始化。

.. note:: 我们建议使用dask_dataframe而不是dask,因为它在实际的大规模计算中更稳定。

资助与致谢

本软件的开发得到了瑞士联邦理工学院(ETH)瑞士联邦理工学院(EPFL)的研究中心Blue Brain Project的资金支持。

有关许可证和作者,请参阅LICENSE.txtAUTHORS.md

版权所有 © 2023-2024 Blue Brain Project/EPFL

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪个,请了解有关安装软件包的更多信息。

源分发

BluePyParallel-0.2.2.tar.gz (46.2 kB 查看哈希值)

上传时间

构建分发

BluePyParallel-0.2.2-py3-none-any.whl (20.2 kB 查看哈希值)

上传时间 Python 3

由以下组织支持