Neu.ro Airflow插件
项目描述
Neu.ro Airflow插件
此包帮助您从Airflow环境中在
此外,请参阅我们的命令行界面参考和Python API参考。
环境
- Python 3.6+
- apache-airflow >= 1.10.x
- Neuromation >= 20.4.6
安装
此插件被编写为自动与Airflow注册,因此您只需将其安装到您的Python环境中即可
pip install neuro-airflow-plugin
用法
在开始之前,您需要通过命令行获取Neuro令牌
neuro config show-token
并在Airflow中设置Neuro连接(默认为neuro_default
)
airflow connections --add \
--conn_id neuro_default2 \
--conn_type "neuro" \
--conn_extra '{"token": "Put your Token here..."}'
除了token
之外,您还可以在额外的json中提供这些字段
cluster
- 用于计算调度的集群名称。如果没有提供,则默认使用集群。api_url
- Neuro平台的入口URL。仅适用于自定义集群。
您也可以通过UI界面设置连接,只需将相同的JSON文档放入Extra
表单字段中。连接类型无关紧要,因此您可以选择UI允许的任何连接。
有关在Airflow中设置连接的更多信息,请参阅管理连接。
NeuroRunOperator
在Neuro平台上运行作业。示例用法
from airflow.operators.neuro import NeuroRunOperator
run = NeuroRunOperator(
task_id="small-deeds",
job_command="echo 'Big things start from small deeds'",
job_image="ubuntu:{{ var.value.ubuntu_tag }}",
job_resources_preset="{% if var.value.use_large_cpu %}cpu-large{% else %}cpu-small{% endif %}"
)
有关更多用法示例,请参阅存储库中的examples/dags
文件夹。
操作员参数
job_command
str 必需 - 在作业中要执行的命令。如果您需要覆盖镜像的入口点,请参阅job_entrypoint
。job_image
str 必需 - 作业使用的容器镜像。名称可以是托管在外部公共存储库上的Docker镜像名称,或由image://
方案指定的Neuro镜像。job_name
str - 可选的作业名称。请注意,同一用户创建两个具有相同名称的运行作业是被禁止的。job_volumes
list - 描述卷挂载的字符串列表或neuromation.Volume
对象。字符串描述由冒号分隔的三个部分组成:存储URI、挂载路径、挂载模式。例如:storage:my-project:/var/storage:ro
。job_resources_preset
str - 预定义的资源配置(要查看可用值,请运行neuro config show
)job_resources
Resources - 自定义资源配置。有关详细信息,请参阅Python API参考。job_is_preemptible
bool - 作业是否可以在抢占性实例(也称为Spot实例)上运行。仅与自定义资源配置一起使用。job_extshm
bool - 请求扩展'/dev/shm'空间。默认为True
,并且仅与预定义资源配置一起使用。job_tags
list - 标记作业的字符串标签列表。以后可以用于筛选等。job_description
str - 可选的作业描述,格式自由。job_lifespan
float - 可选的作业运行时间限制(以秒为单位)。默认为无限。job_environ
dict - 运行作业时使用的环境变量。仅对值提供Jinja模板支持,而不是对键提供支持,请参阅下面的更多详细信息。job_entrypoint
str - 覆盖容器镜像的ENTRYPOINT。job_http_port
str - 启用HTTP端口转发到指定的容器端口。如果使用它,您可以从Airflow UI中的任务面板上的自定义链接定义中访问它(有关其工作方式,请参阅Airflow文档中的详细信息)。默认禁用。job_http_auth
bool - 在job_http_port
公开的端口上禁用Neuro身份验证。job_tty
bool - 为容器分配一个TTY。job_detach
bool - 启动作业后分离。如果分离,作业日志将无法在Airflow界面中查看,但作业不会消耗Airflow工作器槽位。默认为True
。raise_on_errno
bool - 如果作业返回非零退出代码,则引发错误。如果job_detach
为True
,则忽略。默认为True
。neuro_conn_id
bool - 要用于Neuro身份验证的连接名称。默认为neuro_default
。
另请参阅CLI文档中的neuro run
参考。
Jinja2模板字段
Airflow 支持使用 Jinja 模板字段传递自定义属性和动态定义。此操作符支持以下字段的模板化:
job_command
job_image
job_volumes
job_name
job_resources_preset
job_tags
job_environ
job_entrypoint
job_description
job_http_port
neuro_conn_id
XCom 导出
此操作符导出 2 个 XCom 值:`return_value`(Airflow 中查询的默认值)和 `assigned_job`。两者都是包含以下字段的 JSON 文档:
id
str - Neuro 在启动时分配的作业 ID。exit_code
int - 如果作业已完成,则命令返回码。status
str - 作业状态之一:`pending`、`running`、`succeeded`、`failed` 或 `unknown`。http_url
str - 如果使用了 `job_http_port`,则公开 HTTP 端口的 URL。
NeuroJobStatusSensor
等待作业完成或任何其他状态转换发生。示例用法
from airflow.sensors.neuro import NeuroJobStatusSensor
wait = NeuroJobStatusSensor(
task_id="wait_close",
job_id="{{ task_instance.xcom_pull(task_ids='small-deeds')['id'] }}", # noqa
poke_interval=5,
timeout=10 * 60,
)
操作员参数
job_id
str - 查询状态更新的作业 ID。job_statuses
list - 等待的 JobStatus 枚举值列表。neuro_conn_id
str - 用于 Neuro 认证的连接名称。默认为neuro_default
。
Jinja2模板字段
job_id
XCom 导出
不导出任何 XCom 值。
NeuroHook
在某些情况下,您可能需要访问平台的其他功能。这可以通过使用 NeuroHook 来完成。例如
import yarl
from neuromation.api import ResourceNotFound
from airflow.hooks.neuro import NeuroHook
from airflow.operators.python_operator import BranchPythonOperator
def check_model(templates_dict, **kw):
hook = NeuroHook()
with hook:
try:
hook.run(
hook.client.storage.stat(
yarl.URL("storage:" + templates_dict["model_path"])
)
)
return "process_with_model"
except ResourceNotFound:
return "process_without_model"
check_model = BranchPythonOperator(
task_id="check_model_exists",
python_callable=check_model,
provide_context=True,
templates_dict={"model_path": "{{ var.value.project_home }}/model.pth"},
)
探索 Python SDK 以获取平台的更多功能。
项目详情
下载文件
下载适用于您平台的自定义文件。如果您不确定选择哪一个,请了解有关 安装包 的更多信息。
源分发
neuro-airflow-plugin-0.0.1.tar.gz (18.9 kB 查看哈希值)
构建分发
关闭
哈希值 for neuro_airflow_plugin-0.0.1-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 136af84892fadebe7f04b6cbf834f01419d05b040061611259def6bb5587949e |
|
MD5 | 61b09e9fc0a89d8ef1208f30fd3d2fa4 |
|
BLAKE2b-256 | 47040c7ea05ddc5ef694d72e80ea66b614830823365545482e9a14c2a04b9ff3 |