Apache Airflow API (稳定版)
项目描述
Apache Airflow Python客户端
概述
为了便于管理,Apache Airflow支持其对象的各种REST API端点。本节提供了API设计、方法和支持用例的概述。
大多数端点接受JSON
作为输入并返回JSON
响应。这意味着您通常需要将以下头部添加到您的请求中
Content-type: application/json
Accept: application/json
资源
术语资源
指的是Airflow元数据中的单一类型对象。API通过其端点的对应资源进行划分。资源的名称通常是复数形式,并采用驼峰式表示。例如:dagRuns
。
资源名称用作端点URL的一部分,以及在API参数和响应中。
CRUD操作
该平台支持大多数资源的创建、读取、更新和删除操作。以下列出了这些操作的标准以及它们的参数。
某些端点具有特殊行为,作为例外。
创建
要创建资源,通常需要提交一个包含资源所需元数据的HTTP POST
请求,该请求包含在请求体中。成功时,响应返回包含资源元数据的201 Created
响应代码,包括其内部的id
,该id
包含在响应体中。
读取
可以使用HTTP GET
请求读取资源或列出多个资源。
可以通过在请求参数中提交资源的id
来读取特定资源。成功时,响应通常会返回一个200 OK
响应代码,其中包含资源元数据,该元数据包含在响应体中。
如果GET
请求未包含特定资源的id
,则被视为列表请求。成功时,响应通常会返回一个200 OK
响应代码,其中包含一个对象,该对象包含资源元数据列表,该列表包含在响应体中。
在读取资源时,通常可用一些常见的查询参数。例如:
v1/connections?limit=25&offset=25
查询参数 | 类型 | 描述 |
---|---|---|
limit | 整数 | 要获取的对象的最大数量。默认情况下通常为25。 |
offset | 整数 | 开始返回对象的偏移量。与limit查询参数一起使用。 |
更新
更新资源需要资源id
,通常使用HTTP PATCH
请求进行,其中包含要修改的字段,这些字段包含在请求体中。成功时,响应通常会返回一个200 OK
响应代码,其中包含关于修改后资源的详细信息,该信息包含在响应体中。
删除
删除资源需要资源id
,通常通过HTTP DELETE
请求执行。成功时,响应通常会返回一个204 No Content
响应代码。
约定
-
资源名称是复数形式,并以camelCase表示。
-
URL参数名称和字段名称之间保持名称一致性。
-
字段名称采用snake_case格式。
{
\"name\": \"string\",
\"slots\": 0,
\"occupied_slots\": 0,
\"used_slots\": 0,
\"queued_slots\": 0,
\"open_slots\": 0
}
更新掩码
更新掩码作为查询参数在修补端点中可用。它用于通知API您想要更新哪些字段。使用update_mask
可以更容易地通过帮助服务器知道要更新对象的哪些字段,而不是更新所有字段来更新对象。更新请求会忽略任何未在字段掩码中指定的字段,并保留它们的当前值。
示例
import requests
resource = requests.get("/resource/my-id").json()
resource["my_field"] = "new-value"
requests.patch("/resource/my-id?update_mask=my_field", data=json.dumps(resource))
版本化和端点生命周期
- API版本化与Apache Airflow的具体版本不同步。
- API设计为向后兼容。
- API的任何更改都将首先进入弃用阶段。
尝试API
您可以使用第三方客户端,例如curl、HTTPie、Postman或Insomnia rest客户端来测试Apache Airflow API。
请注意,您需要传递凭据数据。
例如,以下是使用curl暂停DAG的方法,当使用基本认证时
curl -X PATCH 'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused' \\
-H 'Content-Type: application/json' \\
--user \"username:password\" \\
-d '{
\"is_paused\": true
}'
使用图形工具,如Postman或Insomnia,可以直接导入API规范
- 通过点击本文档顶部的下载按钮下载API规范。
- 在您选择的图形工具中导入JSON规范。
- 在Postman中,您可以点击顶部的导入按钮
- 在Insomnia中,您只需将文件拖放到UI上即可
请注意,使用Postman,您还可以通过选择一个请求并点击代码按钮来生成代码片段。
启用CORS
跨源资源共享(CORS)是一种浏览器安全功能,它限制了在浏览器中运行的脚本发起的HTTP请求。
有关启用/配置CORS的详细信息,请参阅启用CORS。
身份验证
为了满足许多组织的需求,Airflow支持许多身份验证方法,甚至可以添加您自己的方法。
如果您想检查当前设置哪个身份验证后端,可以使用以下示例中的airflow config get-value api auth_backends
命令。
$ airflow config get-value api auth_backends
airflow.api.auth.backend.basic_auth
默认情况下,拒绝所有请求。
有关配置身份验证的详细信息,请参阅API授权。
错误
我们遵循在RFC 7807中提出的错误响应格式,也称为HTTP API的问题详细信息。与我们的正常API响应一样,您的客户端必须准备好优雅地处理响应的额外成员。
未认证
这表示请求尚未应用,因为缺乏针对目标资源的有效认证凭据。请检查您是否有有效的凭据。
权限被拒绝
此响应表示服务器理解了请求,但拒绝授权,因为它缺乏对资源的足够权利。当您没有执行您所执行的操作的必要权限时会发生这种情况。您需要获取适当的权限以解决此错误。
请求错误
此响应表示服务器无法或不会处理请求,因为某些内容被认为是客户端错误(例如,请求语法错误、无效请求消息框架或欺骗性请求路由)。要解决此问题,请确保您的语法正确。
找不到
此客户端错误响应表示服务器找不到请求的资源。
方法不允许
表示服务器知道请求方法,但目标资源不支持该方法。
不可接受
根据请求中接收到的主动协商头字段,目标资源没有当前表示形式,该表示形式会被用户代理接受,并且服务器不愿意提供默认表示形式。
已存在
由于与目标资源当前状态的冲突,请求无法完成,例如,它尝试创建的资源已经存在。
未知
这意味着服务器遇到了意外条件,这阻止了它完成请求。
此Python包是由OpenAPI Generator项目自动生成的
- API版本:2.9.0
- 包版本:2.9.0
- 构建包:org.openapitools.codegen.languages.PythonClientCodegen
有关更多信息,请访问https://airflow.org.cn
需求。
Python >=3.8
安装 & 使用
pip安装
您可以使用标准的Python安装工具安装客户端。它在PyPI上托管,具有apache-airflow-client
包ID,因此获取最新版本的最简单方法是运行
pip install apache-airflow-client
如果Python包托管在存储库中,您可以直接使用以下方式安装
pip install git+https://github.com/apache/airflow-client-python.git
导入检查
然后导入包
import airflow_client.client
入门
请按照安装过程进行操作,然后运行以下命令
import time
import airflow_client.client
from pprint import pprint
from airflow_client.client.api import config_api
from airflow_client.client.model.config import Config
from airflow_client.client.model.error import Error
# Defining the host is optional and defaults to /api/v1
# See configuration.py for a list of all supported configuration parameters.
configuration = client.Configuration(host="/api/v1")
# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.
# Configure HTTP basic authorization: Basic
configuration = client.Configuration(username="YOUR_USERNAME", password="YOUR_PASSWORD")
# Enter a context with an instance of the API client
with client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = config_api.ConfigApi(api_client)
try:
# Get current configuration
api_response = api_instance.get_config()
pprint(api_response)
except client.ApiException as e:
print("Exception when calling ConfigApi->get_config: %s\n" % e)
API端点文档
所有URI均相对于/api/v1
类 | 方法 | HTTP请求 | 描述 |
---|---|---|---|
ConfigApi | get_config | GET /config | 获取当前配置 |
ConnectionApi | delete_connection | 删除 /connections/{connection_id} | 删除连接 |
ConnectionApi | get_connection | GET /connections/{connection_id} | 获取连接 |
ConnectionApi | get_connections | GET /connections | 列出连接 |
ConnectionApi | patch_connection | PATCH /connections/{connection_id} | 更新连接 |
ConnectionApi | post_connection | POST /connections | 创建连接 |
ConnectionApi | test_connection | POST /connections/test | 测试连接 |
DAGApi | delete_dag | DELETE /dags/{dag_id} | 删除DAG |
DAGApi | get_dag | GET /dags/{dag_id} | 获取DAG的基本信息 |
DAGApi | get_dag_details | GET /dags/{dag_id}/details | 获取DAG的简化表示 |
DAGApi | get_dag_source | GET /dagSources/{file_token} | 获取源代码 |
DAGApi | get_dags | GET /dags | 列出DAGs |
DAGApi | get_task | GET /dags/{dag_id}/tasks/{task_id} | 获取任务的简化表示 |
DAGApi | get_tasks | GET /dags/{dag_id}/tasks | 获取DAG的任务 |
DAGApi | patch_dag | PATCH /dags/{dag_id} | 更新DAG |
DAGApi | patch_dags | PATCH /dags | 更新DAGs |
DAGApi | post_clear_task_instances | POST /dags/{dag_id}/clearTaskInstances | 清除一组任务实例 |
DAGApi | post_set_task_instances_state | POST /dags/{dag_id}/updateTaskInstancesState | 设置任务实例的状态 |
DAGRunApi | clear_dag_run | POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear | 清除DAG运行 |
DAGRunApi | delete_dag_run | DELETE /dags/{dag_id}/dagRuns/{dag_run_id} | 删除DAG运行 |
DAGRunApi | get_dag_run | GET /dags/{dag_id}/dagRuns/{dag_run_id} | 获取DAG运行 |
DAGRunApi | get_dag_runs | GET /dags/{dag_id}/dagRuns | 列出DAG运行 |
DAGRunApi | get_dag_runs_batch | POST /dags/~/dagRuns/list | 列出DAG运行(批量) |
DAGRunApi | get_upstream_dataset_events | GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents | 获取DAG运行的集合事件 |
DAGRunApi | post_dag_run | POST /dags/{dag_id}/dagRuns | 触发新的DAG运行 |
DAGRunApi | set_dag_run_note | PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/setNote | 更新DagRun备注。 |
DAGRunApi | update_dag_run_state | PATCH /dags/{dag_id}/dagRuns/{dag_run_id} | 修改DAG运行 |
DagWarningApi | get_dag_warnings | GET /dagWarnings | 列出dag警告 |
DatasetApi | get_dataset | GET /datasets/{uri} | 获取数据集 |
DatasetApi | get_dataset_events | GET /datasets/events | 获取数据集事件 |
DatasetApi | get_datasets | GET /datasets | 列出数据集 |
DatasetApi | get_upstream_dataset_events | GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents | 获取DAG运行的集合事件 |
EventLogApi | get_event_log | GET /eventLogs/{event_log_id} | 获取日志条目 |
EventLogApi | get_event_logs | GET /eventLogs | 列出日志条目 |
ImportErrorApi | get_import_error | GET /importErrors/{import_error_id} | 获取导入错误 |
ImportErrorApi | get_import_errors | GET /importErrors | 列出导入错误 |
MonitoringApi | get_health | GET /health | 获取实例状态 |
MonitoringApi | get_version | GET /version | 获取版本信息 |
PermissionApi | get_permissions | GET /permissions | 列出权限 |
PluginApi | get_plugins | GET /plugins | 获取加载的插件列表 |
PoolApi | delete_pool | DELETE /pools/{pool_name} | 删除池 |
PoolApi | get_pool | GET /pools/{pool_name} | 获取池 |
PoolApi | get_pools | GET /pools | 列出池 |
PoolApi | patch_pool | PATCH /pools/{pool_name} | 更新一个池 |
PoolApi | post_pool | POST /pools | 创建一个池 |
ProviderApi | get_providers | GET /providers | 列出提供者 |
RoleApi | delete_role | DELETE /roles/{role_name} | 删除一个角色 |
RoleApi | get_role | GET /roles/{role_name} | 获取一个角色 |
RoleApi | get_roles | GET /roles | 列出角色 |
RoleApi | patch_role | PATCH /roles/{role_name} | 更新一个角色 |
RoleApi | post_role | POST /roles | 创建一个角色 |
TaskInstanceApi | get_extra_links | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links | 列出额外链接 |
TaskInstanceApi | get_log | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number} | 获取日志 |
TaskInstanceApi | get_mapped_task_instance | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} | 获取一个映射任务实例 |
TaskInstanceApi | get_mapped_task_instances | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMapped | 列出映射任务实例 |
TaskInstanceApi | get_task_instance | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} | 获取一个任务实例 |
TaskInstanceApi | get_task_instances | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances | 列出任务实例 |
TaskInstanceApi | get_task_instances_batch | POST /dags/ |
列出任务实例(批量) |
TaskInstanceApi | patch_mapped_task_instance | PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} | 更新映射任务实例的状态 |
TaskInstanceApi | patch_task_instance | PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} | 更新任务实例的状态 |
TaskInstanceApi | set_mapped_task_instance_note | PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/setNote | 更新TaskInstance备注。 |
TaskInstanceApi | set_task_instance_note | PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/setNote | 更新TaskInstance备注。 |
UserApi | delete_user | DELETE /users/{username} | 删除一个用户 |
UserApi | get_user | GET /users/{username} | 获取一个用户 |
UserApi | get_users | GET /users | 列出用户 |
UserApi | patch_user | PATCH /users/{username} | 更新一个用户 |
UserApi | post_user | POST /users | 创建一个用户 |
VariableApi | delete_variable | DELETE /variables/{variable_key} | 删除一个变量 |
VariableApi | get_variable | GET /variables/{variable_key} | 获取一个变量 |
VariableApi | get_variables | GET /variables | 列出变量 |
VariableApi | patch_variable | PATCH /variables/{variable_key} | 更新一个变量 |
VariableApi | post_variables | POST /variables | 创建一个变量 |
XComApi | get_xcom_entries | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries | 列出XCom条目 |
XComApi | get_xcom_entry | GET /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} | 获取一个XCom条目 |
模型文档
- 操作
- 操作集合
- 操作集合全部
- 资源操作
- 基本DAG运行
- 类引用
- 清除DAG运行
- 清除任务实例
- 集合信息
- 颜色
- 配置
- 配置选项
- 配置部分
- 连接
- 连接全部
- 连接集合
- 连接集合全部
- 连接集合项
- 连接测试
- cron表达式
- DAG
- DAG集合
- DAG集合全部
- DAG详情
- DAG详情全部
- DAG运行
- DAG运行集合
- DAG运行集合全部
- DagScheduleDatasetReference
- DAG状态
- DAG警告
- DAG警告集合
- DAG警告集合全部
- 数据集
- 数据集集合
- 数据集集合全部
- 数据集事件
- 数据集事件集合
- 数据集事件集合全部
- 错误
- 事件日志
- 事件日志集合
- 事件日志集合全部
- 额外链接
- 额外链接集合
- 健康信息
- 健康状态
- 导入错误
- 导入错误集合
- 导入错误集合全部
- 内联响应200
- InlineResponse2001
- 工作
- 列表DagRuns表单
- 列表TaskInstance表单
- 元数据库状态
- 插件集合
- 插件集合全部
- 插件集合项目
- 池
- 池集合
- 池集合全部
- 提供商
- 提供商集合
- 相对差分
- 资源
- 角色
- 角色集合
- 角色集合全部
- SLAMiss
- 调度间隔
- 调度器状态
- 设置DagRun备注
- 设置TaskInstance备注
- 标签
- 任务
- 任务集合
- 任务额外链接
- 任务实例
- 任务实例集合
- 任务实例集合全部
- 任务实例引用
- 任务实例引用集合
- 任务出口数据集引用
- 任务状态
- 时间差分
- 触发器
- 触发规则
- 更新DagRun状态
- 更新TaskInstance
- 更新TaskInstances状态
- 用户
- 用户全部
- 用户集合
- 用户集合全部
- 用户集合项目
- 用户集合项目角色
- 变量
- 变量全部
- 变量集合
- 变量集合全部
- 变量集合项目
- 版本信息
- 权重规则
- XCom
- XCom全部
- XCom集合
- XCom集合全部
- XCom集合项目
授权文档
默认情况下,生成的客户端支持三种身份验证方案
- 基本
- GoogleOpenID
- Kerberos
但是,您可以通过在OpenAPI规范的“安全”部分添加自己的方案来生成具有您自己方案的客户端和文档。您可以使用Breeze CLI通过在“breeze release-management prepare-python-client”命令中添加“--security-schemes”选项来完成此操作。
基本“烟雾”测试
您可以使用基本烟雾测试来检查客户端是否正常工作 - 我们有一个简单的测试脚本,该脚本使用API运行测试。要这样做,您需要
- 按照上述说明安装
apache-airflow-client
包 - 安装
rich
Python包 - 下载test_python_client.py文件
- 确保您有一个正在运行的测试airflow安装。不要在您的生产部署上进行实验
- 配置您的airflow webserver以启用基本身份验证 在您的
airflow.cfg
的[api]
部分设置
[api]
auth_backend = airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth
您也可以通过环境变量设置它:export AIRFLOW__API__AUTH_BACKENDS=airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth
- 配置您的airflow webserver以加载示例dags 在您的
airflow.cfg
的[core]
部分设置
[core]
load_examples = True
您也可以通过环境变量设置它:export AIRFLOW__CORE__LOAD_EXAMPLES=True
- 可选地公开配置(注意!这是一个危险的设置)。脚本将愉快地使用默认设置运行,但如果您想查看配置,则需要公开它。在您的
airflow.cfg
的[webserver]
部分设置
[webserver]
expose_config = True
您也可以通过环境变量设置它:export AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
- 在
test_python_client.py
文件中配置您的主机/ip/用户/密码
import airflow_client
# Configure HTTP basic authorization: Basic
configuration = airflow_client.client.Configuration(
host="http://localhost:8080/api/v1", username="admin", password="admin"
)
-
运行调度器(或您已设置的独立dag文件处理器的dag文件处理器)进行几轮解析(您可以将--num-runs参数传递给它或将其在后台保持运行)。脚本依赖于示例DAG序列化到数据库中,这仅在调度器运行时
core/load_examples
设置为True时才会发生。 -
运行webserver - 可在您要运行的测试脚本的host/port上访问。确保它有足够的时间初始化。
运行python test_python_client.py
,您应该看到彩色输出显示连接尝试和状态。
大型OpenAPI文档的说明
如果OpenAPI文档很大,客户端.apis和客户端.models中的导入可能会因RecursionError失败,指示已超过最大递归限制。在这种情况下,有一些解决方案
解决方案 1:为 api 和模型使用特定导入,例如
from airflow_client.client.api.default_api import DefaultApi
from airflow_client.client.model.pet import Pet
解决方案 2:在导入包之前,调整最大递归限制,如下所示
import sys
sys.setrecursionlimit(1500)
import airflow_client.client
from airflow_client.client.apis import *
from airflow_client.client.models import *
作者
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。
源代码分发
构建分发
apache_airflow_client-2.10.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 48641a47ecf2f48d9f62efd10c4607bb617a1d8bb2ec997a0892c151484269a3 |
|
MD5 | 6d6c62089f5e8e413d8191e4b78be3e2 |
|
BLAKE2b-256 | 4ae15f4fb1013bdf92e3770e2be438289ff299eaf259b7eefa0df5cc7e0bbd40 |
哈希值 for apache_airflow_client-2.10.0-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 6a5884648a57f1eecb6ab7dd586daf99859cc65e12405c8abd0f8b86154b7cfc |
|
MD5 | 227ee6adebfe7e80c73a7212a42da72c |
|
BLAKE2b-256 | fb84765e93bdc5b33da1bd41c191ceccd201b337530e49d4d67b4e7074b689f0 |