跳转到主要内容

Apache Airflow API (稳定版)

项目描述

Apache Airflow Python客户端

概述

为了便于管理,Apache Airflow支持其对象的各种REST API端点。本节提供了API设计、方法和支持用例的概述。

大多数端点接受JSON作为输入并返回JSON响应。这意味着您通常需要将以下头部添加到您的请求中

Content-type: application/json
Accept: application/json

资源

术语资源指的是Airflow元数据中的单一类型对象。API通过其端点的对应资源进行划分。资源的名称通常是复数形式,并采用驼峰式表示。例如:dagRuns

资源名称用作端点URL的一部分,以及在API参数和响应中。

CRUD操作

该平台支持大多数资源的创建读取更新删除操作。以下列出了这些操作的标准以及它们的参数。

某些端点具有特殊行为,作为例外。

创建

要创建资源,通常需要提交一个包含资源所需元数据的HTTP POST请求,该请求包含在请求体中。成功时,响应返回包含资源元数据的201 Created响应代码,包括其内部的id,该id包含在响应体中。

读取

可以使用HTTP GET请求读取资源或列出多个资源。

可以通过在请求参数中提交资源的id来读取特定资源。成功时,响应通常会返回一个200 OK响应代码,其中包含资源元数据,该元数据包含在响应体中。

如果GET请求未包含特定资源的id,则被视为列表请求。成功时,响应通常会返回一个200 OK响应代码,其中包含一个对象,该对象包含资源元数据列表,该列表包含在响应体中。

在读取资源时,通常可用一些常见的查询参数。例如:

v1/connections?limit=25&offset=25
查询参数 类型 描述
limit 整数 要获取的对象的最大数量。默认情况下通常为25。
offset 整数 开始返回对象的偏移量。与limit查询参数一起使用。

更新

更新资源需要资源id,通常使用HTTP PATCH请求进行,其中包含要修改的字段,这些字段包含在请求体中。成功时,响应通常会返回一个200 OK响应代码,其中包含关于修改后资源的详细信息,该信息包含在响应体中。

删除

删除资源需要资源id,通常通过HTTP DELETE请求执行。成功时,响应通常会返回一个204 No Content响应代码。

约定

  • 资源名称是复数形式,并以camelCase表示。

  • URL参数名称和字段名称之间保持名称一致性。

  • 字段名称采用snake_case格式。

{
    \"name\": \"string\",
    \"slots\": 0,
    \"occupied_slots\": 0,
    \"used_slots\": 0,
    \"queued_slots\": 0,
    \"open_slots\": 0
}

更新掩码

更新掩码作为查询参数在修补端点中可用。它用于通知API您想要更新哪些字段。使用update_mask可以更容易地通过帮助服务器知道要更新对象的哪些字段,而不是更新所有字段来更新对象。更新请求会忽略任何未在字段掩码中指定的字段,并保留它们的当前值。

示例

import requests

resource = requests.get("/resource/my-id").json()
resource["my_field"] = "new-value"
requests.patch("/resource/my-id?update_mask=my_field", data=json.dumps(resource))

版本化和端点生命周期

  • API版本化与Apache Airflow的具体版本不同步。
  • API设计为向后兼容。
  • API的任何更改都将首先进入弃用阶段。

尝试API

您可以使用第三方客户端,例如curlHTTPiePostmanInsomnia rest客户端来测试Apache Airflow API。

请注意,您需要传递凭据数据。

例如,以下是使用curl暂停DAG的方法,当使用基本认证时

curl -X PATCH 'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused' \\
-H 'Content-Type: application/json' \\
--user \"username:password\" \\
-d '{
    \"is_paused\": true
}'

使用图形工具,如PostmanInsomnia,可以直接导入API规范

  1. 通过点击本文档顶部的下载按钮下载API规范。
  2. 在您选择的图形工具中导入JSON规范。
  • Postman中,您可以点击顶部的导入按钮
  • Insomnia中,您只需将文件拖放到UI上即可

请注意,使用Postman,您还可以通过选择一个请求并点击代码按钮来生成代码片段。

启用CORS

跨源资源共享(CORS)是一种浏览器安全功能,它限制了在浏览器中运行的脚本发起的HTTP请求。

有关启用/配置CORS的详细信息,请参阅启用CORS

身份验证

为了满足许多组织的需求,Airflow支持许多身份验证方法,甚至可以添加您自己的方法。

如果您想检查当前设置哪个身份验证后端,可以使用以下示例中的airflow config get-value api auth_backends命令。

$ airflow config get-value api auth_backends
airflow.api.auth.backend.basic_auth

默认情况下,拒绝所有请求。

有关配置身份验证的详细信息,请参阅API授权

错误

我们遵循在RFC 7807中提出的错误响应格式,也称为HTTP API的问题详细信息。与我们的正常API响应一样,您的客户端必须准备好优雅地处理响应的额外成员。

未认证

这表示请求尚未应用,因为缺乏针对目标资源的有效认证凭据。请检查您是否有有效的凭据。

权限被拒绝

此响应表示服务器理解了请求,但拒绝授权,因为它缺乏对资源的足够权利。当您没有执行您所执行的操作的必要权限时会发生这种情况。您需要获取适当的权限以解决此错误。

请求错误

此响应表示服务器无法或不会处理请求,因为某些内容被认为是客户端错误(例如,请求语法错误、无效请求消息框架或欺骗性请求路由)。要解决此问题,请确保您的语法正确。

找不到

此客户端错误响应表示服务器找不到请求的资源。

方法不允许

表示服务器知道请求方法,但目标资源不支持该方法。

不可接受

根据请求中接收到的主动协商头字段,目标资源没有当前表示形式,该表示形式会被用户代理接受,并且服务器不愿意提供默认表示形式。

已存在

由于与目标资源当前状态的冲突,请求无法完成,例如,它尝试创建的资源已经存在。

未知

这意味着服务器遇到了意外条件,这阻止了它完成请求。

此Python包是由OpenAPI Generator项目自动生成的

  • API版本:2.9.0
  • 包版本:2.9.0
  • 构建包:org.openapitools.codegen.languages.PythonClientCodegen

有关更多信息,请访问https://airflow.org.cn

需求。

Python >=3.8

安装 & 使用

pip安装

您可以使用标准的Python安装工具安装客户端。它在PyPI上托管,具有apache-airflow-client包ID,因此获取最新版本的最简单方法是运行

pip install apache-airflow-client

如果Python包托管在存储库中,您可以直接使用以下方式安装

pip install git+https://github.com/apache/airflow-client-python.git

导入检查

然后导入包

import airflow_client.client

入门

请按照安装过程进行操作,然后运行以下命令

import time
import airflow_client.client
from pprint import pprint
from airflow_client.client.api import config_api
from airflow_client.client.model.config import Config
from airflow_client.client.model.error import Error

# Defining the host is optional and defaults to /api/v1
# See configuration.py for a list of all supported configuration parameters.
configuration = client.Configuration(host="/api/v1")

# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.

# Configure HTTP basic authorization: Basic
configuration = client.Configuration(username="YOUR_USERNAME", password="YOUR_PASSWORD")


# Enter a context with an instance of the API client
with client.ApiClient(configuration) as api_client:
    # Create an instance of the API class
    api_instance = config_api.ConfigApi(api_client)

    try:
        # Get current configuration
        api_response = api_instance.get_config()
        pprint(api_response)
    except client.ApiException as e:
        print("Exception when calling ConfigApi->get_config: %s\n" % e)

API端点文档

所有URI均相对于/api/v1

方法 HTTP请求 描述
ConfigApi get_config GET /config 获取当前配置
ConnectionApi delete_connection 删除 /connections/{connection_id} 删除连接
ConnectionApi get_connection GET /connections/{connection_id} 获取连接
ConnectionApi get_connections GET /connections 列出连接
ConnectionApi patch_connection PATCH /connections/{connection_id} 更新连接
ConnectionApi post_connection POST /connections 创建连接
ConnectionApi test_connection POST /connections/test 测试连接
DAGApi delete_dag DELETE /dags/{dag_id} 删除DAG
DAGApi get_dag GET /dags/{dag_id} 获取DAG的基本信息
DAGApi get_dag_details GET /dags/{dag_id}/details 获取DAG的简化表示
DAGApi get_dag_source GET /dagSources/{file_token} 获取源代码
DAGApi get_dags GET /dags 列出DAGs
DAGApi get_task GET /dags/{dag_id}/tasks/{task_id} 获取任务的简化表示
DAGApi get_tasks GET /dags/{dag_id}/tasks 获取DAG的任务
DAGApi patch_dag PATCH /dags/{dag_id} 更新DAG
DAGApi patch_dags PATCH /dags 更新DAGs
DAGApi post_clear_task_instances POST /dags/{dag_id}/clearTaskInstances 清除一组任务实例
DAGApi post_set_task_instances_state POST /dags/{dag_id}/updateTaskInstancesState 设置任务实例的状态
DAGRunApi clear_dag_run POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear 清除DAG运行
DAGRunApi delete_dag_run DELETE /dags/{dag_id}/dagRuns/{dag_run_id} 删除DAG运行
DAGRunApi get_dag_run GET /dags/{dag_id}/dagRuns/{dag_run_id} 获取DAG运行
DAGRunApi get_dag_runs GET /dags/{dag_id}/dagRuns 列出DAG运行
DAGRunApi get_dag_runs_batch POST /dags/~/dagRuns/list 列出DAG运行(批量)
DAGRunApi get_upstream_dataset_events GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents 获取DAG运行的集合事件
DAGRunApi post_dag_run POST /dags/{dag_id}/dagRuns 触发新的DAG运行
DAGRunApi set_dag_run_note PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/setNote 更新DagRun备注。
DAGRunApi update_dag_run_state PATCH /dags/{dag_id}/dagRuns/{dag_run_id} 修改DAG运行
DagWarningApi get_dag_warnings GET /dagWarnings 列出dag警告
DatasetApi get_dataset GET /datasets/{uri} 获取数据集
DatasetApi get_dataset_events GET /datasets/events 获取数据集事件
DatasetApi get_datasets GET /datasets 列出数据集
DatasetApi get_upstream_dataset_events GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents 获取DAG运行的集合事件
EventLogApi get_event_log GET /eventLogs/{event_log_id} 获取日志条目
EventLogApi get_event_logs GET /eventLogs 列出日志条目
ImportErrorApi get_import_error GET /importErrors/{import_error_id} 获取导入错误
ImportErrorApi get_import_errors GET /importErrors 列出导入错误
MonitoringApi get_health GET /health 获取实例状态
MonitoringApi get_version GET /version 获取版本信息
PermissionApi get_permissions GET /permissions 列出权限
PluginApi get_plugins GET /plugins 获取加载的插件列表
PoolApi delete_pool DELETE /pools/{pool_name} 删除池
PoolApi get_pool GET /pools/{pool_name} 获取池
PoolApi get_pools GET /pools 列出池
PoolApi patch_pool PATCH /pools/{pool_name} 更新一个池
PoolApi post_pool POST /pools 创建一个池
ProviderApi get_providers GET /providers 列出提供者
RoleApi delete_role DELETE /roles/{role_name} 删除一个角色
RoleApi get_role GET /roles/{role_name} 获取一个角色
RoleApi get_roles GET /roles 列出角色
RoleApi patch_role PATCH /roles/{role_name} 更新一个角色
RoleApi post_role POST /roles 创建一个角色
TaskInstanceApi get_extra_links GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links 列出额外链接
TaskInstanceApi get_log GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number} 获取日志
TaskInstanceApi get_mapped_task_instance GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} 获取一个映射任务实例
TaskInstanceApi get_mapped_task_instances GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMapped 列出映射任务实例
TaskInstanceApi get_task_instance GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} 获取一个任务实例
TaskInstanceApi get_task_instances GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances 列出任务实例
TaskInstanceApi get_task_instances_batch POST /dags//dagRuns//taskInstances/list 列出任务实例(批量)
TaskInstanceApi patch_mapped_task_instance PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} 更新映射任务实例的状态
TaskInstanceApi patch_task_instance PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} 更新任务实例的状态
TaskInstanceApi set_mapped_task_instance_note PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/setNote 更新TaskInstance备注。
TaskInstanceApi set_task_instance_note PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/setNote 更新TaskInstance备注。
UserApi delete_user DELETE /users/{username} 删除一个用户
UserApi get_user GET /users/{username} 获取一个用户
UserApi get_users GET /users 列出用户
UserApi patch_user PATCH /users/{username} 更新一个用户
UserApi post_user POST /users 创建一个用户
VariableApi delete_variable DELETE /variables/{variable_key} 删除一个变量
VariableApi get_variable GET /variables/{variable_key} 获取一个变量
VariableApi get_variables GET /variables 列出变量
VariableApi patch_variable PATCH /variables/{variable_key} 更新一个变量
VariableApi post_variables POST /variables 创建一个变量
XComApi get_xcom_entries GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries 列出XCom条目
XComApi get_xcom_entry GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} 获取一个XCom条目

模型文档

授权文档

默认情况下,生成的客户端支持三种身份验证方案

  • 基本
  • GoogleOpenID
  • Kerberos

但是,您可以通过在OpenAPI规范的“安全”部分添加自己的方案来生成具有您自己方案的客户端和文档。您可以使用Breeze CLI通过在“breeze release-management prepare-python-client”命令中添加“--security-schemes”选项来完成此操作。

基本“烟雾”测试

您可以使用基本烟雾测试来检查客户端是否正常工作 - 我们有一个简单的测试脚本,该脚本使用API运行测试。要这样做,您需要

  • 按照上述说明安装apache-airflow-client
  • 安装rich Python包
  • 下载test_python_client.py文件
  • 确保您有一个正在运行的测试airflow安装。不要在您的生产部署上进行实验
  • 配置您的airflow webserver以启用基本身份验证 在您的airflow.cfg[api]部分设置
[api]
auth_backend = airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth

您也可以通过环境变量设置它:export AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth

  • 配置您的airflow webserver以加载示例dags 在您的airflow.cfg[core]部分设置
[core]
load_examples = True

您也可以通过环境变量设置它:export AIRFLOW__CORE__LOAD_EXAMPLES=True

  • 可选地公开配置(注意!这是一个危险的设置)。脚本将愉快地使用默认设置运行,但如果您想查看配置,则需要公开它。在您的airflow.cfg[webserver]部分设置
[webserver]
expose_config = True

您也可以通过环境变量设置它:export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

  • test_python_client.py文件中配置您的主机/ip/用户/密码
import airflow_client

# Configure HTTP basic authorization: Basic
configuration = airflow_client.client.Configuration(
    host="http://localhost:8080/api/v1", username="admin", password="admin"
)
  • 运行调度器(或您已设置的独立dag文件处理器的dag文件处理器)进行几轮解析(您可以将--num-runs参数传递给它或将其在后台保持运行)。脚本依赖于示例DAG序列化到数据库中,这仅在调度器运行时core/load_examples设置为True时才会发生。

  • 运行webserver - 可在您要运行的测试脚本的host/port上访问。确保它有足够的时间初始化。

运行python test_python_client.py,您应该看到彩色输出显示连接尝试和状态。

大型OpenAPI文档的说明

如果OpenAPI文档很大,客户端.apis和客户端.models中的导入可能会因RecursionError失败,指示已超过最大递归限制。在这种情况下,有一些解决方案

解决方案 1:为 api 和模型使用特定导入,例如

  • from airflow_client.client.api.default_api import DefaultApi
  • from airflow_client.client.model.pet import Pet

解决方案 2:在导入包之前,调整最大递归限制,如下所示

import sys

sys.setrecursionlimit(1500)
import airflow_client.client
from airflow_client.client.apis import *
from airflow_client.client.models import *

作者

dev@airflow.apache.org

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源代码分发

apache_airflow_client-2.10.0.tar.gz (273.3 kB 查看哈希值)

上传时间 源代码

构建分发

apache_airflow_client-2.10.0-py3-none-any.whl (1.5 MB 查看哈希值)

上传时间 Python 3

支持