Astro SDK允许使用Python和SQL快速、干净地开发{提取、加载、转换}工作流程,由Apache Airflow提供支持。
项目描述
astro
工作流变得简单
Astro Python SDK 是一个用于在 Apache Airflow 中快速开发提取、转换和加载工作流的 Python SDK。它允许您将工作流表示为一组数据依赖关系,而不必担心顺序和任务。Astro Python SDK 由 Astronomer 维护。
先决条件
- Apache Airflow >= 2.1.0。
安装
Astro Python SDK 可在 PyPI 上找到。使用标准的 Python 安装工具。
要安装无云版本的 SDK,请运行
pip install astro-sdk-python
您还可以安装与流行云提供商一起使用 SDK 的依赖项
pip install astro-sdk-python[amazon,google,snowflake,postgres]
快速入门
-
确保您的 Airflow 环境设置正确,通过运行以下命令
export AIRFLOW_HOME=`pwd` export AIRFLOW__CORE__XCOM_BACKEND=astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend export AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV=true airflow db init
注意:
AIRFLOW__CORE__ENABLE_XCOM_PICKLING
对于astro-sdk-python
已不再需要启用。此功能现在已弃用,因为我们的自定义 xcom 后端处理序列化。应仅将
AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV
用于本地开发。有关如何在非本地环境中设置此设置的详细信息,请参阅 XCom 后端文档。目前,自定义 XCom 后端仅限于可序列化为 json 的数据类型。由于 DataFrame 不可序列化为 json,我们需要启用 XCom pickling 来存储 DataFrame。
pickle 使用的数据格式是 Python 特定的。这有一个优点,即没有外部标准(如 JSON 或 XDR,无法表示指针共享)强加的限制;然而,这意味着非 Python 程序可能无法重建 pickled Python 对象。
更多信息:请参阅 启用_xcom_pickling 和 pickle
-
为示例创建一个 SQLite 数据库
# The sqlite_default connection has different host for MAC vs. Linux export SQL_TABLE_NAME=`airflow connections get sqlite_default -o yaml | grep host | awk '{print $2}'` sqlite3 "$SQL_TABLE_NAME" "VACUUM;"
-
将以下工作流复制到名为
calculate_popular_movies.py
的文件中,并将其添加到您的 Airflow 项目的dags
目录中或者,您也可以下载
calculate_popular_movies.py
curl -O https://raw.githubusercontent.com/astronomer/astro-sdk/main/python-sdk/example_dags/calculate_popular_movies.py
-
运行示例 DAG
airflow dags test calculate_popular_movies `date -Iseconds`
-
通过运行以下命令检查您的 DAG 的结果
sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
您应该看到以下输出
$ sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit" Toy Story 3 (2010)|8.3 Inside Out (2015)|8.2 How to Train Your Dragon (2010)|8.1 Zootopia (2016)|8.1 How to Train Your Dragon 2 (2014)|7.9
支持的技术
数据库 |
---|
Databricks Delta |
Google BigQuery |
Postgres |
Snowflake |
SQLite |
Amazon Redshift |
Microsoft SQL |
DuckDB |
文件类型 |
---|
CSV |
JSON |
NDJSON |
Parquet |
文件存储 |
---|
Amazon S3 |
文件系统 |
Google GCS |
Google Drive |
SFTP |
FTP |
Azure WASB |
Azure WASBS |
可用操作
以下是在 SDK 中可用的关键功能
load_file
: 将指定文件加载到 SQL 表中transform
: 将 SQL select 语句应用于源表,并将结果保存到目标表drop_table
: 删除 SQL 表run_raw_sql
: 运行任何 SQL 语句,不处理其输出append
: 如果没有冲突,将源 SQL 表的行插入目标 SQL 表merge
: 根据冲突情况将源 SQL 表的行插入目标 SQL 表ignore
: 不添加已存在的行update
: 用新行替换现有行
export_file
: 将 SQL 表的行导出到目标文件dataframe
: 将指定的 SQL 表导出到内存中的 Pandas 数据框
有关可用运算符的完整列表,请参阅 SDK 参考文档。
文档
该文档仍在进行中--我们旨在遵循 Diátaxis 系统
变更日志
Astro Python SDK 遵循语义版本控制进行发布。请检查 变更日志 以了解最新更改。
版本管理
有关我们的发布理念和步骤的更多信息,请参阅 版本管理。
贡献指南
欢迎所有贡献,包括错误报告、错误修复、文档改进、增强和想法。
阅读 贡献指南 了解如何贡献的详细概述。
贡献者和维护者应遵守 贡献者行为准则。
许可证
项目详情
下载文件
下载适合您平台文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建分布
astro_sdk_python-1.8.1.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 89f2559e6ae07e051850b6b7267febd0aefa503504dce5fee85a575afbd3f4c1 |
|
MD5 | eed61eb329c50af602800053b1c1ea35 |
|
BLAKE2b-256 | e03a555cacd6478edd5cb6966358d159466ad325fcf5570e70225a528ca6418f |
astro_sdk_python-1.8.1-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ef2c64c54a1676e73bcb95f094237d76874440775375ab1ebc26626d16f61aaf |
|
MD5 | 5ea6800e8986a4e1e81aec8fae8a2d50 |
|
BLAKE2b-256 | 662420e00334ae3343c8df1a21a27830a89bd138bfdf650517ce95a1bd32c174 |