提供一个具有SQL后端的令人尴尬的并行工具。
项目描述
BluePyParallel: Bluebrain Python Embarrassingly Parallel library
提供一个具有SQL后端的令人尴尬的并行工具。
简介
提供一个具有SQL后端的令人尴尬的并行工具,灵感来自 @wvangeit 的 BluePyMM。
安装
应使用pip安装此软件包
pip install bluepyparallel
用法
通用计算
factory_name = "multiprocessing" # Can also be None, dask or ipyparallel
batch_size = 10 # This value is used to split the data into batches before processing them
chunk_size = 1000 # This value is used to gather the elements to process before sending them to the workers
# Setup the parallel factory
parallel_factory = init_parallel_factory(
factory_name,
batch_size=batch_size,
chunk_size=chunk_size,
processes=4, # This parameter is specific to the multiprocessing factory
)
# Get the mapper from the factory
mapper = parallel_factory.get_mapper()
# Use the mapper to map the given function to each element of mapped_data and gather the results
result = sorted(mapper(function, mapped_data, *function_args, **function_kwargs))
与Pandas协同工作
此库提供了一个与大型 :class:pandas.DataFrame
协同工作的特定函数::func:bluepyparallel.evaluator.evaluate
。此函数将DataFrame转换为字典列表(每行一个),然后将给定的函数映射到元素,最后收集结果。
示例
input_df = pd.DataFrame(index=[1, 2], columns=['data'], data=[100, 200])
def evaluation_function(row):
result_1, result_2 = compute_something(row['data'])
return {'new_column_1': result_1, 'new_columns_2': result_2}
# Use the mapper to map the given function to each element of the DataFrame
result_df = evaluate(
input_df, # This is the DataFrame to process
evaluation_function, # This is the function that should be applied to each row of the DataFrame
parallel_factory="multiprocessing", # This could also be a Factory previously defined
new_columns=[['new_column_1', 0], ['new_columns_2', None]], # this defines default values for columns
)
assert result_df.columns == ['data', 'new_columns_1', 'new_columns_2']
它在某种程度上是pandas .apply
方法的泛化。
与SQL后端协同工作
由于它旨在与耗时函数协同工作,因此还提供了一种使用SQL后端的检查点和恢复机制。该SQL后端使用SQLAlchemy库,因此它可以与多种数据库类型(如SQLite、PostgreSQL、MySQL等)协同工作。要激活此功能,只需将SQLAlchemy可以处理的URL传递给:func:bluepyparallel.evaluator.evaluate
的db_url
参数。
.. note:: 可能需要安装特定驱动程序才能访问数据库(例如,例如,用于PostgreSQL的psycopg2 <https://www.psycopg.org/docs>
)。
示例
# Use the mapper to map the given function to each element of the DataFrame
result_df = evaluate(
input_df, # This is the DataFrame to process
evaluation_function, # This is the function that should be applied to each row of the DataFrame
parallel_factory="multiprocessing", # This could also be a Factory previously defined
db_url="sqlite:///db.sql", # This could also just be "db.sql" and would be automatically turned to SQLite URL
)
现在,如果由于任何原因计算崩溃,则部分结果将存储在db.sql
文件中。如果崩溃是由于外部原因(因此再次执行代码应该可以工作),则可以从最后一个计算元素恢复计算。因此,只需计算缺失的元素,这可以节省大量时间。
在HPC系统上使用分布式Dask MPI运行
这是一个示例sbatch脚本,可以修改以使用分布式dask和MPI执行多个节点和工作者的脚本。在此示例中,由run.py
调用的代码应使用BluePyParallel并行化。
Dask变量不是必需的,但强烈建议,并且可以进行微调。
#!/bin/bash -l
# Dask configuration
export DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info"
export DASK_DISTRIBUTED__WORKER__USE_FILE_LOCKING=False
export DASK_DISTRIBUTED__WORKER__MEMORY__TARGET=False # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=False # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__PAUSE=0.80 # pause execution at 80% memory use
export DASK_DISTRIBUTED__WORKER__MEMORY__TERMINATE=0.95 # restart the worker at 95% use
export DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=spawn
export DASK_DISTRIBUTED__WORKER__DAEMON=True
# Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms # Time between statistical profiling queries
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms # Time between starting new profile
# Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)
export PARALLEL_BATCH_SIZE=1000
srun -v run.py
为了确保仅使用并行Dask运行evaluate
函数,必须在代码中执行任何其他操作之前初始化并行工厂。例如,run.py
可能看起来像这样
if __name__ == "__main__":
parallel_factory = init_parallel_factory('dask_dataframe')
df = pd.read_csv("inuput_data.csv")
df = some_preprocessing(df)
df = evaluate(df, function_to_evaluate, parallel_factory=parallel_factory)
df.to_csv("output_data.csv")
这是因为init_parallel_factory
之前的所有内容都将并行运行,因为mpi尚未初始化。
.. note:: 我们建议使用dask_dataframe
而不是dask
,因为它在实际的大规模计算中更稳定。
资助与致谢
本软件的开发得到了瑞士联邦理工学院(ETH)瑞士联邦理工学院(EPFL)的研究中心Blue Brain Project的资金支持。
有关许可证和作者,请参阅LICENSE.txt
和AUTHORS.md
。
版权所有 © 2023-2024 Blue Brain Project/EPFL
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定选择哪个,请了解有关安装软件包的更多信息。