跳转到主要内容

Neu.ro Airflow插件

项目描述

Neu.ro Airflow插件

此包帮助您从Airflow环境中在neu.ro平台上执行您的机器学习工作负载。

此外,请参阅我们的命令行界面参考Python API参考

环境

  • Python 3.6+
  • apache-airflow >= 1.10.x
  • Neuromation >= 20.4.6

安装

此插件被编写为自动与Airflow注册,因此您只需将其安装到您的Python环境中即可

pip install neuro-airflow-plugin

用法

在开始之前,您需要通过命令行获取Neuro令牌

neuro config show-token

并在Airflow中设置Neuro连接(默认为neuro_default

airflow connections --add \
    --conn_id neuro_default2 \
    --conn_type "neuro" \
    --conn_extra '{"token": "Put your Token here..."}'

除了token之外,您还可以在额外的json中提供这些字段

  • cluster - 用于计算调度的集群名称。如果没有提供,则默认使用集群。
  • api_url - Neuro平台的入口URL。仅适用于自定义集群。

您也可以通过UI界面设置连接,只需将相同的JSON文档放入Extra表单字段中。连接类型无关紧要,因此您可以选择UI允许的任何连接。

有关在Airflow中设置连接的更多信息,请参阅管理连接

NeuroRunOperator

在Neuro平台上运行作业。示例用法

from airflow.operators.neuro import NeuroRunOperator


run = NeuroRunOperator(
    task_id="small-deeds",

    job_command="echo 'Big things start from small deeds'",
    job_image="ubuntu:{{ var.value.ubuntu_tag }}",
    job_resources_preset="{% if var.value.use_large_cpu %}cpu-large{% else %}cpu-small{% endif %}"
)

有关更多用法示例,请参阅存储库中的examples/dags文件夹。

操作员参数

  • job_command str 必需 - 在作业中要执行的命令。如果您需要覆盖镜像的入口点,请参阅job_entrypoint
  • job_image str 必需 - 作业使用的容器镜像。名称可以是托管在外部公共存储库上的Docker镜像名称,或由image://方案指定的Neuro镜像。
  • job_name str - 可选的作业名称。请注意,同一用户创建两个具有相同名称的运行作业是被禁止的。
  • job_volumes list - 描述卷挂载的字符串列表或neuromation.Volume对象。字符串描述由冒号分隔的三个部分组成:存储URI挂载路径挂载模式。例如:storage:my-project:/var/storage:ro
  • job_resources_preset str - 预定义的资源配置(要查看可用值,请运行neuro config show
  • job_resources Resources - 自定义资源配置。有关详细信息,请参阅Python API参考
  • job_is_preemptible bool - 作业是否可以在抢占性实例(也称为Spot实例)上运行。仅与自定义资源配置一起使用。
  • job_extshm bool - 请求扩展'/dev/shm'空间。默认为True,并且仅与预定义资源配置一起使用。
  • job_tags list - 标记作业的字符串标签列表。以后可以用于筛选等。
  • job_description str - 可选的作业描述,格式自由。
  • job_lifespan float - 可选的作业运行时间限制(以秒为单位)。默认为无限。
  • job_environ dict - 运行作业时使用的环境变量。仅对值提供Jinja模板支持,而不是对键提供支持,请参阅下面的更多详细信息。
  • job_entrypoint str - 覆盖容器镜像的ENTRYPOINT。
  • job_http_port str - 启用HTTP端口转发到指定的容器端口。如果使用它,您可以从Airflow UI中的任务面板上的自定义链接定义中访问它(有关其工作方式,请参阅Airflow文档中的详细信息)。默认禁用。
  • job_http_auth bool - 在job_http_port公开的端口上禁用Neuro身份验证。
  • job_tty bool - 为容器分配一个TTY。
  • job_detach bool - 启动作业后分离。如果分离,作业日志将无法在Airflow界面中查看,但作业不会消耗Airflow工作器槽位。默认为True
  • raise_on_errno bool - 如果作业返回非零退出代码,则引发错误。如果job_detachTrue,则忽略。默认为True
  • neuro_conn_id bool - 要用于Neuro身份验证的连接名称。默认为neuro_default

另请参阅CLI文档中的neuro run参考。

Jinja2模板字段

Airflow 支持使用 Jinja 模板字段传递自定义属性和动态定义。此操作符支持以下字段的模板化:

  • job_command
  • job_image
  • job_volumes
  • job_name
  • job_resources_preset
  • job_tags
  • job_environ
  • job_entrypoint
  • job_description
  • job_http_port
  • neuro_conn_id

XCom 导出

此操作符导出 2 个 XCom 值:`return_value`(Airflow 中查询的默认值)和 `assigned_job`。两者都是包含以下字段的 JSON 文档:

  • id str - Neuro 在启动时分配的作业 ID。
  • exit_code int - 如果作业已完成,则命令返回码。
  • status str - 作业状态之一:`pending`、`running`、`succeeded`、`failed` 或 `unknown`。
  • http_url str - 如果使用了 `job_http_port`,则公开 HTTP 端口的 URL。

NeuroJobStatusSensor

等待作业完成或任何其他状态转换发生。示例用法

from airflow.sensors.neuro import NeuroJobStatusSensor


wait = NeuroJobStatusSensor(
    task_id="wait_close",
    job_id="{{ task_instance.xcom_pull(task_ids='small-deeds')['id'] }}",  # noqa
    poke_interval=5,
    timeout=10 * 60,
)

操作员参数

  • job_id str - 查询状态更新的作业 ID。
  • job_statuses list - 等待的 JobStatus 枚举值列表。
  • neuro_conn_id str - 用于 Neuro 认证的连接名称。默认为 neuro_default

Jinja2模板字段

  • job_id

XCom 导出

不导出任何 XCom 值。

NeuroHook

在某些情况下,您可能需要访问平台的其他功能。这可以通过使用 NeuroHook 来完成。例如

import yarl
from neuromation.api import ResourceNotFound

from airflow.hooks.neuro import NeuroHook
from airflow.operators.python_operator import BranchPythonOperator


def check_model(templates_dict, **kw):
    hook = NeuroHook()
    with hook:
        try:
            hook.run(
                hook.client.storage.stat(
                    yarl.URL("storage:" + templates_dict["model_path"])
                )
            )
            return "process_with_model"
        except ResourceNotFound:
            return "process_without_model"


check_model = BranchPythonOperator(
    task_id="check_model_exists",
    python_callable=check_model,
    provide_context=True,
    templates_dict={"model_path": "{{ var.value.project_home }}/model.pth"},
)

探索 Python SDK 以获取平台的更多功能。

项目详情


下载文件

下载适用于您平台的自定义文件。如果您不确定选择哪一个,请了解有关 安装包 的更多信息。

源分发

neuro-airflow-plugin-0.0.1.tar.gz (18.9 kB 查看哈希值)

上传时间

构建分发

neuro_airflow_plugin-0.0.1-py3-none-any.whl (17.3 kB 查看哈希值)

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面