跳转到主要内容

Saturn Cloud中的Dask集群对象

项目描述

dask-saturn

用于与Dask集群在Saturn Cloud中交互的Python库。

Dask-Saturn模仿了Dask-Kubernetes的API,但允许用户与在Saturn Cloud中创建的集群交互。

启动集群

在Jupyter笔记本中,您可以启动一个集群

from dask_saturn import SaturnCluster

cluster = SaturnCluster()
cluster

默认情况下,这将启动一个与您已在Saturn UI或之前笔记本中设置的相同设置的dask集群。

使用 n_workers 选项以特定数量的工作进程启动集群。类似地,您可以设置 scheduler_sizeworker_sizeworker_is_spot

注意:如果集群已经在运行,则无法更改设置。尝试这样做将引发警告。

使用 shudown_on_close 选项设置与客户端内核关联的集群。当您的 jupyter 内核死亡或重启时,此功能类似于常规的 dask LocalCluster,dask 集群将关闭。

调整工作进程数量

一旦您有一个集群,您就可以通过 jupyter 小部件或使用 scaleadapt 方法与之交互。

例如,手动扩展到 20 个工作进程

cluster.scale(20)

创建一个自我控制缩放的适应性集群

cluster.adapt(minimum=1, maximum=20)

与客户端交互

要向集群提交任务,您有时需要访问 Client 对象。使用集群作为唯一参数实例化此对象。

from distributed import Client

client = Client(cluster)
client

关闭集群

要终止与集群相关联的所有资源,请使用 close 方法。

cluster.close()

更改设置

要更新现有集群的设置(如 n_workersworker_sizeworker_is_spotnthreads),请使用 reset 方法。

cluster.reset(n_workers=3)

您也可以在不首先实例化集群的情况下调用此方法。

cluster = SaturnCluster.reset(n_workers=3)

默认情况下,您将获得一个工作进程,但您可以使用 n_workers 选项更改该值。同样,您可以使用 scheduler_sizeworker_size 覆盖调度器和工作硬件设置。您可以使用 describe_sizes() 显示可用的尺寸选项。

>>> describe_sizes()
{'medium': 'Medium - 2 cores - 4 GB RAM',
 'large': 'Large - 2 cores - 16 GB RAM',
 'xlarge': 'XLarge - 4 cores - 32 GB RAM',
 '2xlarge': '2XLarge - 8 cores - 64 GB RAM',
 '4xlarge': '4XLarge - 16 cores - 128 GB RAM',
 '8xlarge': '8XLarge - 32 cores - 256 GB RAM',
 '12xlarge': '12XLarge - 48 cores - 384 GB RAM',
 '16xlarge': '16XLarge - 64 cores - 512 GB RAM',
 'g4dnxlarge': 'T4-XLarge - 4 cores - 16 GB RAM - 1 GPU',
 'g4dn4xlarge': 'T4-4XLarge - 16 cores - 64 GB RAM - 1 GPU',
 'g4dn8xlarge': 'T4-8XLarge - 32 cores - 128 GB RAM - 1 GPU',
 'p32xlarge': 'V100-2XLarge - 8 cores - 61 GB RAM - 1 GPU',
 'p38xlarge': 'V100-8XLarge - 32 cores - 244 GB RAM - 4 GPU',
 'p316xlarge': 'V100-16XLarge - 64 cores - 488 GB RAM - 8 GPU'}

以下是一个示例

cluster = SaturnCluster(
    scheduler_size="large",
    worker_size="2xlarge",
    n_workers=3,
)
client = Client(cluster)
client

从 Saturn 外部连接

要从 Saturn 外部连接到您的 Dask 集群,您需要设置两个环境变量:SATURN_TOKENSATURN_BASE_URL

要获取这些值,您需要使用浏览器进入 Saturn。转到您想要连接 Dask 集群的位置。将有一个按钮说:“从外部连接”。点击该按钮将打开一个模态,显示 SATURN_TOKENSATURN_BASE_URL 的值。

请记住 - 该令牌是私有的,所以不要与他人分享!它可能类似于 351e6f2d40bf4d15a0009fc086c602df

export SATURN_BASE_URL="https://app.demo.saturnenterprise.io"
export SATURN_TOKEN="351e6f2d40bf4d15a0009fc086c602df"

在设置环境变量后,您可以在 Python 会话中连接到您的 Dask 集群,就像在 Saturn 内部一样。

from dask_saturn import SaturnCluster
from distributed import Client

cluster = SaturnCluster()
client = Client(cluster)
client

当您完成与 dask 集群的工作时,请确保关闭它。

cluster.close()

将文件同步到工作进程

当使用分布式 dask 集群时,工作进程无法访问与客户端相同的文件系统。因此,您会在 jupyter 服务器上看到文件,但这些文件在工作进程中不可用。要移动文件到工作进程,您可以使用 RegisterFiles 插件,并在您想要更新的任何路径上调用 sync_files

例如,如果您有一个文件结构如下

/home/jovyan/project/
|---- utils/
|   |---- __init__.py
|   |---- hello.py
|
|---- Untitled.ipynb

其中 hello.py 包含

# utils/hello.py
def greet():
    return "Hello"

如果 hello.py 中的代码发生变化,或者您向 utils 中添加了新文件,您将希望将这些更改推送到工作进程。在设置 SaturnClusterClient 后,将 RegisterFiles 插件与工作进程注册。然后,每次您对 utils 中的文件进行更改时,请运行 sync_files。工作进程插件确保任何新的工作进程都会具有您已同步的任何文件。

from dask_saturn import RegisterFiles, sync_files

client.register_worker_plugin(RegisterFiles())
sync_files(client, "utils")

# If a python script has changed, restart the workers so they will see the changes
client.restart()

# import the function and tell the workers to run it
from util.hello import greet
client.run(greet)

提示:您可以通过运行 client.run(os.listdir) 始终检查工作进程上文件系统的状态。

开发

创建/更新 dask-saturn conda 环境

make conda-update

设置环境变量以使用本地 atlas 服务器运行 dask-saturn

export SATURN_BASE_URL=http://dev.localtest.me:8888/
export SATURN_TOKEN=<JUPYTER_SERVER_SATURN_TOKEN>

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

dask-saturn-0.4.3.tar.gz (30.8 kB 查看散列)

上传时间

构建分发

dask_saturn-0.4.3-py2-none-any.whl (16.3 kB 查看散列)

上传时间 Python 2

支持