跳转到主要内容

与Dask和AICS相关的实用函数和文档

项目描述

AICS Dask Utils

Build Status Documentation Code Coverage

与Dask、Distributed和相关软件包相关的文档。AICS项目常用的实用函数。


特性

  • 分布式处理器,用于管理各种调试或集群配置
  • 示例集群部署的文档

基础

在我们跳入快速入门之前,有一些基本定义需要理解。

任务

任务是一个要处理的单个静态函数。很简单。然而,对于AICS而言,当使用 aicsimageio(以及/或 dask.array.Array)时,您的图像(或 dask.array.Array)将被拆分为 许多 任务。这取决于图像读取器和您正在读取的文件的大小。但一般来说,可以假设每个读取的图像被拆分为数万个任务。如果您想查看您的图像被拆分为多少任务,您可以计算:

  1. 伪代码:sum(2 * size(channel) for channel if channel not in ["Y", "X"])
  2. Dask图长度:len(AICSImage.dask_data.__dask_graph__())

映射

将给定的函数应用于作为函数参数提供的可迭代对象。给定 lambda x: x + 1[1, 2, 3],在这种情况下,map(func, *iterables) 的结果将是 [2, 3, 4]。通常,从 map 操作返回的是一个 future 对象的可迭代集合。由于操作是随着资源的可用性而开始的,并且项到项的方差可能导致不同的输出顺序,因此无法保证 map 操作的结果顺序与输入可迭代对象的顺序相同。

Future

一个将变得可用但目前尚未定义的对象。无法保证该对象是有效的结果或错误,您应该在 future 状态解决后(通常这意味着在 gather 操作之后)处理错误。

Gather

阻止进程继续前进,直到所有 future 都已解决。这里的控制流意味着您可能会生成数千个 future,并在这些 future 逐步解决的同时继续本地操作,但如果您想有一个明确的停止点并等待一些 future 完成的话,您就需要将它们聚集起来。

其他注释

Dask 尽可能地模仿标准库 concurrent.futures,这使得这个库能够围绕 Dask 提供简单的包装,从而允许轻松调试,因为我们只是将 distributed.Client.mapconcurrent.futures.ThreadPoolExecutor.map 等进行交换。如果您在代码中的任何时候都不想使用 dask 的原因,那么使用 concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor 也是同样有效的。

使用分布式处理器的基本映射

如果您有一个(或多个)可迭代对象,其结果会导致少于数十万个任务,则可以直接使用 DistributedHandler.client 提供的正常 map

重要提示:请注意,“...可迭代对象会导致少于数十万个任务...”。这很重要,因为当您尝试对数千个图像路径进行 map 操作时,每个路径都会生成一个 AICSImage 对象,每个对象都会向调度器添加数千个更多任务以完成。这将导致崩溃,您应该查看 大型可迭代对象批量处理

from aics_dask_utils import DistributedHandler

# `None` address provided means use local machine threads
with DistributedHandler(None) as handler:
    futures = handler.client.map(
        lambda x: x + 1,
        [1, 2, 3]
    )

    results = handler.gather(futures)

from distributed import LocalCluster
cluster = LocalCluster()

# Actual address provided means use the dask scheduler
with DistributedHandler(cluster.scheduler_address) as handler:
    futures = handler.client.map(
        lambda x: x + 1,
        [1, 2, 3]
    )

    results = handler.gather(futures)

大型可迭代对象批量处理

如果您有一个(或多个)可迭代对象,其结果会导致超过数十万个任务,您应该使用 handler.batched_map 来减少客户端的负载。这将批量处理您的请求,而不是一次性发送。

from aics_dask_utils import DistributedHandler

# `None` address provided means use local machine threads
with DistributedHandler(None) as handler:
    results = handler.batched_map(
        lambda x: x + 1,
        range(1e9) # 1 billion
    )

from distributed import LocalCluster
cluster = LocalCluster()

# Actual address provided means use the dask scheduler
with DistributedHandler(cluster.scheduler_address) as handler:
    results = handler.batched_map(
        lambda x: x + 1,
        range(1e9) # 1 billion
    )

注意:注意,在 batched_map 之后没有 handler.gather 调用。这是因为 batched_map 在每个批次结束时聚集结果,而不是简单地返回它们的 future。

安装

稳定版本: pip install aics_dask_utils
开发头部: pip install git+https://github.com/AllenCellModeling/aics_dask_utils.git

文档

有关完整包文档,请访问 AllenCellModeling.github.io/aics_dask_utils

开发

有关开发代码的信息,请参阅 CONTRIBUTING.md

附加注释

这个 README、提供的工具和文档并不涵盖您可以使用 dask 和其他类似计算系统进行的所有操作。有关进一步阅读,请访问 dask.org

自由软件:Allen Institute 软件许可证

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

aics_dask_utils-0.2.4.tar.gz (12.6 kB 查看哈希值)

上传时间 源代码

构建版本

aics_dask_utils-0.2.4-py2.py3-none-any.whl (8.0 kB 查看哈希值)

上传时间 Python 2 Python 3

支持者