与Dask和AICS相关的实用函数和文档
项目描述
AICS Dask Utils
与Dask、Distributed和相关软件包相关的文档。AICS项目常用的实用函数。
特性
- 分布式处理器,用于管理各种调试或集群配置
- 示例集群部署的文档
基础
在我们跳入快速入门之前,有一些基本定义需要理解。
任务
任务是一个要处理的单个静态函数。很简单。然而,对于AICS而言,当使用 aicsimageio
(以及/或 dask.array.Array
)时,您的图像(或 dask.array.Array
)将被拆分为 许多 任务。这取决于图像读取器和您正在读取的文件的大小。但一般来说,可以假设每个读取的图像被拆分为数万个任务。如果您想查看您的图像被拆分为多少任务,您可以计算:
- 伪代码:
sum(2 * size(channel) for channel if channel not in ["Y", "X"])
- Dask图长度:
len(AICSImage.dask_data.__dask_graph__())
映射
将给定的函数应用于作为函数参数提供的可迭代对象。给定 lambda x: x + 1
和 [1, 2, 3]
,在这种情况下,map(func, *iterables)
的结果将是 [2, 3, 4]
。通常,从 map
操作返回的是一个 future
对象的可迭代集合。由于操作是随着资源的可用性而开始的,并且项到项的方差可能导致不同的输出顺序,因此无法保证 map 操作的结果顺序与输入可迭代对象的顺序相同。
Future
一个将变得可用但目前尚未定义的对象。无法保证该对象是有效的结果或错误,您应该在 future 状态解决后(通常这意味着在 gather
操作之后)处理错误。
Gather
阻止进程继续前进,直到所有 future 都已解决。这里的控制流意味着您可能会生成数千个 future,并在这些 future 逐步解决的同时继续本地操作,但如果您想有一个明确的停止点并等待一些 future 完成的话,您就需要将它们聚集起来。
其他注释
Dask 尽可能地模仿标准库 concurrent.futures
,这使得这个库能够围绕 Dask 提供简单的包装,从而允许轻松调试,因为我们只是将 distributed.Client.map
与 concurrent.futures.ThreadPoolExecutor.map
等进行交换。如果您在代码中的任何时候都不想使用 dask
的原因,那么使用 concurrent.futures.ThreadPoolExecutor
或 concurrent.futures.ProcessPoolExecutor
也是同样有效的。
使用分布式处理器的基本映射
如果您有一个(或多个)可迭代对象,其结果会导致少于数十万个任务,则可以直接使用 DistributedHandler.client
提供的正常 map
。
重要提示:请注意,“...可迭代对象会导致少于数十万个任务...”。这很重要,因为当您尝试对数千个图像路径进行 map
操作时,每个路径都会生成一个 AICSImage
对象,每个对象都会向调度器添加数千个更多任务以完成。这将导致崩溃,您应该查看 大型可迭代对象批量处理。
from aics_dask_utils import DistributedHandler
# `None` address provided means use local machine threads
with DistributedHandler(None) as handler:
futures = handler.client.map(
lambda x: x + 1,
[1, 2, 3]
)
results = handler.gather(futures)
from distributed import LocalCluster
cluster = LocalCluster()
# Actual address provided means use the dask scheduler
with DistributedHandler(cluster.scheduler_address) as handler:
futures = handler.client.map(
lambda x: x + 1,
[1, 2, 3]
)
results = handler.gather(futures)
大型可迭代对象批量处理
如果您有一个(或多个)可迭代对象,其结果会导致超过数十万个任务,您应该使用 handler.batched_map
来减少客户端的负载。这将批量处理您的请求,而不是一次性发送。
from aics_dask_utils import DistributedHandler
# `None` address provided means use local machine threads
with DistributedHandler(None) as handler:
results = handler.batched_map(
lambda x: x + 1,
range(1e9) # 1 billion
)
from distributed import LocalCluster
cluster = LocalCluster()
# Actual address provided means use the dask scheduler
with DistributedHandler(cluster.scheduler_address) as handler:
results = handler.batched_map(
lambda x: x + 1,
range(1e9) # 1 billion
)
注意:注意,在 batched_map
之后没有 handler.gather
调用。这是因为 batched_map
在每个批次结束时聚集结果,而不是简单地返回它们的 future。
安装
稳定版本: pip install aics_dask_utils
开发头部: pip install git+https://github.com/AllenCellModeling/aics_dask_utils.git
文档
有关完整包文档,请访问 AllenCellModeling.github.io/aics_dask_utils。
开发
有关开发代码的信息,请参阅 CONTRIBUTING.md。
附加注释
这个 README、提供的工具和文档并不涵盖您可以使用 dask
和其他类似计算系统进行的所有操作。有关进一步阅读,请访问 dask.org。
自由软件:Allen Institute 软件许可证
项目详情
aics_dask_utils-0.2.4.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e1fd4e4d911981c9d513c9d952342df63adae3e830dcb829f5d562fde29c0c91 |
|
MD5 | fc28a40d6bb3998e11855bbd875159a6 |
|
BLAKE2b-256 | 95356a6f43522953bc5098e8e4c51ccea8d1dadf862f149e7877ed018b755a37 |
aics_dask_utils-0.2.4-py2.py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 7b7f3b0297660df3a280d2cdb5bdf972b79677523c265a0f14693b55dbdf9937 |
|
MD5 | 1b6d216260f01b2169e16243a661a19b |
|
BLAKE2b-256 | 902c222ad6984860374dee1a1b81a51c0644e6bca0aff29ee0bf9351dde537df |