由Astronomer构建的Apache Airflow提供程序包,用于与Anyscale集成。
项目描述
astro-provider-anyscale
本存储库提供了一套工具,用于将Anyscale与Apache Airflow集成,使您能够在Airflow工作流中编排Anyscale作业和服务。它包括一个自定义钩子、两个操作符和两个触发器,专门用于管理和监控Anyscale作业和服务。
组件
钩子
- AnyscaleHook:促进Airflow与Anyscale之间的通信。它使用Anyscale API与Anyscale平台交互,提供提交作业、查询状态和管理服务的方法。
操作符
- SubmitAnyscaleJob:此操作符将作业提交到Anyscale。它接受作业配置参数,例如入口点、图像URI和计算配置。该操作符使用
AnyscaleHook
处理提交过程。 - RolloutAnyscaleService:类似于作业提交操作符,此操作符旨在管理Anyscale上的服务。它可以用于部署新服务或更新现有服务,利用
AnyscaleHook
处理与Anyscale API的所有交互。
触发器
- AnyscaleJobTrigger:监控通过
SubmitAnyscaleJob
操作符提交的异步作业的状态。它确保Airflow任务在作业完成之前等待,然后才能在DAG中继续。 - AnyscaleServiceTrigger:与
AnyscaleJobTrigger
类似,但专注于服务部署过程。它检查正在部署或更新的服务状态,并在完成时将控制权返回给Airflow。
Anyscale集成配置详情
要集成Airflow与Anyscale,您需要提供一些配置细节
-
Anyscale API令牌:通过使用anyscale cli或通过Anyscale控制台获取您的API令牌。
-
计算配置(可选):如果您想约束自动扩展,可以通过以下方式指定将执行您的Ray脚本的计算集群
- 通过
compute_config
输入参数动态提供,或 - 在Anyscale中创建计算配置,并使用生成的ID作为
compute_config_id
参数。
- 通过
-
图像URI:指定操作符要使用的Docker镜像。确保您的镜像在您的Anyscale账户中可访问。注意,您还可以指定一个容器文件,用于动态构建镜像。
用法
使用以下命令安装Anyscale提供程序
pip install astro-provider-anyscale
Airflow连接配置
要集成Airflow与Anyscale,配置一个具有唯一名称的Airflow连接,并将密码设置为通过Anyscale控制台收集的API令牌。
-
访问Airflow Web UI
- 打开Airflow Web界面,并使用您的Airflow凭据登录。
-
在Airflow中创建新的连接
- 转到“管理”选项卡,并从下拉菜单中选择“连接”。
- 单击“添加新记录”按钮以创建新的连接。
-
配置连接
- Conn Id:输入连接的唯一标识符,例如
anyscale_conn
。 - Conn Type:选择
Anyscale
。 - Password:粘贴您从Anyscale控制台复制的API令牌。
- Conn Id:输入连接的唯一标识符,例如
-
保存连接
- 填写所需详细信息后,单击表单底部的“保存”按钮以保存新连接。
代码示例
以下脚本是配置和使用Airflow DAG中的SubmitAnyscaleJob
操作符的示例
from pathlib import Path
from datetime import datetime, timedelta
from airflow import DAG
from anyscale_provider.operators.anyscale import SubmitAnyscaleJob
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 4, 2),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"
# Constants
FOLDER_PATH = Path(__file__).parent / "example_dags/ray_scripts"
dag = DAG(
"sample_anyscale_job_workflow",
default_args=default_args,
description="A DAG to interact with Anyscale triggered manually",
schedule=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)
submit_anyscale_job = SubmitAnyscaleJob(
task_id="submit_anyscale_job",
conn_id=ANYSCALE_CONN_ID,
name="AstroJob",
image_uri="anyscale/image/airflow-integration-testing:1",
compute_config="airflow-integration-testing:1",
working_dir=str(FOLDER_PATH),
entrypoint="python ray-job.py",
requirements=["requests", "pandas", "numpy", "torch"],
max_retries=1,
job_timeout_seconds=3000,
poll_interval=30,
dag=dag,
)
# Defining the task sequence
submit_anyscale_job
以下脚本使用RolloutAnyscaleService
操作符在Anyscale上部署服务
import uuid
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from anyscale_provider.hooks.anyscale import AnyscaleHook
from anyscale_provider.operators.anyscale import RolloutAnyscaleService
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 4, 2),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"
SERVICE_NAME = f"AstroService-CICD-{uuid.uuid4()}"
dag = DAG(
"sample_anyscale_service_workflow",
default_args=default_args,
description="A DAG to interact with Anyscale triggered manually",
schedule=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)
deploy_anyscale_service = RolloutAnyscaleService(
task_id="rollout_anyscale_service",
conn_id=ANYSCALE_CONN_ID,
name=SERVICE_NAME,
image_uri="anyscale/image/airflow-integration-testing:1",
compute_config="airflow-integration-testing:1",
working_dir="https://github.com/anyscale/docs_examples/archive/refs/heads/main.zip",
applications=[{"import_path": "sentiment_analysis.app:model"}],
requirements=["transformers", "requests", "pandas", "numpy", "torch"],
in_place=False,
canary_percent=None,
service_rollout_timeout_seconds=600,
poll_interval=30,
dag=dag,
)
def terminate_service():
hook = AnyscaleHook(conn_id=ANYSCALE_CONN_ID)
result = hook.terminate_service(service_name=SERVICE_NAME, time_delay=5)
print(result)
terminate_anyscale_service = PythonOperator(
task_id="initialize_anyscale_hook",
python_callable=terminate_service,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)
# Defining the task sequence
deploy_anyscale_service >> terminate_anyscale_service
遥测
此提供程序默认收集遥测数据。
收集哪些数据?
更具体地说,它为发送到Anyscale API的每个请求添加了一个标题,如下所示:{"X-Anyscale-Source" = "airflow"}
。遥测不会也不会收集任何个人数据或敏感信息。
这些数据如何使用?
这些数据将被Anyscale工程团队使用,以更好地了解提供商的使用情况,并指导未来的开发。
如何取消遥测数据收集?
要禁用使用统计信息收集,请在您的airflow.cfg
文件中将配置anyscale.telemetry_enabled
设置为False。
更新日志
我们遵循语义版本控制进行发布。检查CHANGELOG.rst以获取最新更改。
贡献指南
欢迎所有贡献,包括错误报告、错误修复、文档改进和增强。
有关如何贡献的详细概述,请参阅贡献指南
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码分发
构建分发
astro_provider_anyscale-1.0.1.tar.gz的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a381080e6ec21137d70ada2bd4a9648c4eccab22bc5a9bfa42c7c2a10d3e95ca |
|
MD5 | 5678e4e9b660bda5732da9513a2ee266 |
|
BLAKE2b-256 | e4646ca0bd0719f13caa5806e1ed3cb9fa90fbae5c467ed8e40a6d6ac2e78ee8 |
astro_provider_anyscale-1.0.1-py3-none-any.whl的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | b150b0e34dcf2ab5741304a358dffb6466540afa5c6a49d259e3508dbe1d5fe0 |
|
MD5 | a3b94cd1168ed2355353cff028da0983 |
|
BLAKE2b-256 | a4cd401a2edd17cce5c769be7842917b0b3cced86ac21aa5177670ed2673c882 |