跳转到主要内容

由Astronomer构建的Apache Airflow提供程序包,用于与Anyscale集成。

项目描述

astro-provider-anyscale

本存储库提供了一套工具,用于将Anyscale与Apache Airflow集成,使您能够在Airflow工作流中编排Anyscale作业和服务。它包括一个自定义钩子、两个操作符和两个触发器,专门用于管理和监控Anyscale作业和服务。

组件

钩子

  • AnyscaleHook:促进Airflow与Anyscale之间的通信。它使用Anyscale API与Anyscale平台交互,提供提交作业、查询状态和管理服务的方法。

操作符

  • SubmitAnyscaleJob:此操作符将作业提交到Anyscale。它接受作业配置参数,例如入口点、图像URI和计算配置。该操作符使用AnyscaleHook处理提交过程。
  • RolloutAnyscaleService:类似于作业提交操作符,此操作符旨在管理Anyscale上的服务。它可以用于部署新服务或更新现有服务,利用AnyscaleHook处理与Anyscale API的所有交互。

触发器

  • AnyscaleJobTrigger:监控通过SubmitAnyscaleJob操作符提交的异步作业的状态。它确保Airflow任务在作业完成之前等待,然后才能在DAG中继续。
  • AnyscaleServiceTrigger:与AnyscaleJobTrigger类似,但专注于服务部署过程。它检查正在部署或更新的服务状态,并在完成时将控制权返回给Airflow。

Anyscale集成配置详情

要集成Airflow与Anyscale,您需要提供一些配置细节

  • Anyscale API令牌:通过使用anyscale cli或通过Anyscale控制台获取您的API令牌。

  • 计算配置(可选):如果您想约束自动扩展,可以通过以下方式指定将执行您的Ray脚本的计算集群

    • 通过compute_config输入参数动态提供,或
    • 在Anyscale中创建计算配置,并使用生成的ID作为compute_config_id参数。
  • 图像URI:指定操作符要使用的Docker镜像。确保您的镜像在您的Anyscale账户中可访问。注意,您还可以指定一个容器文件,用于动态构建镜像。

用法

使用以下命令安装Anyscale提供程序

pip install astro-provider-anyscale

Airflow连接配置

要集成Airflow与Anyscale,配置一个具有唯一名称的Airflow连接,并将密码设置为通过Anyscale控制台收集的API令牌。

  1. 访问Airflow Web UI

    • 打开Airflow Web界面,并使用您的Airflow凭据登录。
  2. 在Airflow中创建新的连接

    • 转到“管理”选项卡,并从下拉菜单中选择“连接”。
    • 单击“添加新记录”按钮以创建新的连接。
  3. 配置连接

    • Conn Id:输入连接的唯一标识符,例如anyscale_conn
    • Conn Type:选择Anyscale
    • Password:粘贴您从Anyscale控制台复制的API令牌。
  4. 保存连接

    • 填写所需详细信息后,单击表单底部的“保存”按钮以保存新连接。

代码示例

以下脚本是配置和使用Airflow DAG中的SubmitAnyscaleJob操作符的示例

from pathlib import Path
from datetime import datetime, timedelta
from airflow import DAG
from anyscale_provider.operators.anyscale import SubmitAnyscaleJob

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2024, 4, 2),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"

# Constants
FOLDER_PATH = Path(__file__).parent / "example_dags/ray_scripts"

dag = DAG(
    "sample_anyscale_job_workflow",
    default_args=default_args,
    description="A DAG to interact with Anyscale triggered manually",
    schedule=None,  # This DAG is not scheduled, only triggered manually
    catchup=False,
)

submit_anyscale_job = SubmitAnyscaleJob(
    task_id="submit_anyscale_job",
    conn_id=ANYSCALE_CONN_ID,
    name="AstroJob",
    image_uri="anyscale/image/airflow-integration-testing:1",
    compute_config="airflow-integration-testing:1",
    working_dir=str(FOLDER_PATH),
    entrypoint="python ray-job.py",
    requirements=["requests", "pandas", "numpy", "torch"],
    max_retries=1,
    job_timeout_seconds=3000,
    poll_interval=30,
    dag=dag,
)


# Defining the task sequence
submit_anyscale_job

以下脚本使用RolloutAnyscaleService操作符在Anyscale上部署服务

import uuid
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

from anyscale_provider.hooks.anyscale import AnyscaleHook
from anyscale_provider.operators.anyscale import RolloutAnyscaleService

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2024, 4, 2),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"
SERVICE_NAME = f"AstroService-CICD-{uuid.uuid4()}"

dag = DAG(
    "sample_anyscale_service_workflow",
    default_args=default_args,
    description="A DAG to interact with Anyscale triggered manually",
    schedule=None,  # This DAG is not scheduled, only triggered manually
    catchup=False,
)

deploy_anyscale_service = RolloutAnyscaleService(
    task_id="rollout_anyscale_service",
    conn_id=ANYSCALE_CONN_ID,
    name=SERVICE_NAME,
    image_uri="anyscale/image/airflow-integration-testing:1",
    compute_config="airflow-integration-testing:1",
    working_dir="https://github.com/anyscale/docs_examples/archive/refs/heads/main.zip",
    applications=[{"import_path": "sentiment_analysis.app:model"}],
    requirements=["transformers", "requests", "pandas", "numpy", "torch"],
    in_place=False,
    canary_percent=None,
    service_rollout_timeout_seconds=600,
    poll_interval=30,
    dag=dag,
)


def terminate_service():
    hook = AnyscaleHook(conn_id=ANYSCALE_CONN_ID)
    result = hook.terminate_service(service_name=SERVICE_NAME, time_delay=5)
    print(result)


terminate_anyscale_service = PythonOperator(
    task_id="initialize_anyscale_hook",
    python_callable=terminate_service,
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag,
)

# Defining the task sequence
deploy_anyscale_service >> terminate_anyscale_service

遥测

此提供程序默认收集遥测数据。

收集哪些数据?

更具体地说,它为发送到Anyscale API的每个请求添加了一个标题,如下所示:{"X-Anyscale-Source" = "airflow"}。遥测不会也不会收集任何个人数据或敏感信息。

这些数据如何使用?

这些数据将被Anyscale工程团队使用,以更好地了解提供商的使用情况,并指导未来的开发。

如何取消遥测数据收集?

要禁用使用统计信息收集,请在您的airflow.cfg文件中将配置anyscale.telemetry_enabled设置为False。

更新日志


我们遵循语义版本控制进行发布。检查CHANGELOG.rst以获取最新更改。

贡献指南


欢迎所有贡献,包括错误报告、错误修复、文档改进和增强。

有关如何贡献的详细概述,请参阅贡献指南

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

astro_provider_anyscale-1.0.1.tar.gz (15.8 kB 查看哈希)

上传时间 源代码

构建分发

astro_provider_anyscale-1.0.1-py3-none-any.whl (17.9 kB 查看哈希)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页