跳转到主要内容

Astro SDK允许使用Python和SQL快速、干净地开发{提取、加载、转换}工作流程,由Apache Airflow提供支持。

项目描述

astro

工作流变得简单

Python versions License Development Status PyPI downloads Contributors Commit activity pre-commit.ci status CI codecov

Astro Python SDK 是一个用于在 Apache Airflow 中快速开发提取、转换和加载工作流的 Python SDK。它允许您将工作流表示为一组数据依赖关系,而不必担心顺序和任务。Astro Python SDK 由 Astronomer 维护。

先决条件

  • Apache Airflow >= 2.1.0。

安装

Astro Python SDK 可在 PyPI 上找到。使用标准的 Python 安装工具

要安装无云版本的 SDK,请运行

pip install astro-sdk-python

您还可以安装与流行云提供商一起使用 SDK 的依赖项

pip install astro-sdk-python[amazon,google,snowflake,postgres]

快速入门

  1. 确保您的 Airflow 环境设置正确,通过运行以下命令

    export AIRFLOW_HOME=`pwd`
    export AIRFLOW__CORE__XCOM_BACKEND=astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend
    export AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV=true
    airflow db init
    

    注意: AIRFLOW__CORE__ENABLE_XCOM_PICKLING 对于 astro-sdk-python 已不再需要启用。此功能现在已弃用,因为我们的自定义 xcom 后端处理序列化。

    应仅将 AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV 用于本地开发。有关如何在非本地环境中设置此设置的详细信息,请参阅 XCom 后端文档

    目前,自定义 XCom 后端仅限于可序列化为 json 的数据类型。由于 DataFrame 不可序列化为 json,我们需要启用 XCom pickling 来存储 DataFrame。

    pickle 使用的数据格式是 Python 特定的。这有一个优点,即没有外部标准(如 JSON 或 XDR,无法表示指针共享)强加的限制;然而,这意味着非 Python 程序可能无法重建 pickled Python 对象。

    更多信息:请参阅 启用_xcom_picklingpickle

  2. 为示例创建一个 SQLite 数据库

    # The sqlite_default connection has different host for MAC vs. Linux
    export SQL_TABLE_NAME=`airflow connections get sqlite_default -o yaml | grep host | awk '{print $2}'`
    sqlite3 "$SQL_TABLE_NAME" "VACUUM;"
    
  3. 将以下工作流复制到名为 calculate_popular_movies.py 的文件中,并将其添加到您的 Airflow 项目的 dags 目录中

    https://github.com/astronomer/astro-sdk/blob/d5aa768b2d4bca72ef98f8d533fe3f99624b172f/example_dags/calculate_popular_movies.py#L1-L37

    或者,您也可以下载 calculate_popular_movies.py

     curl -O https://raw.githubusercontent.com/astronomer/astro-sdk/main/python-sdk/example_dags/calculate_popular_movies.py
    
  4. 运行示例 DAG

    airflow dags test calculate_popular_movies `date -Iseconds`
    
  5. 通过运行以下命令检查您的 DAG 的结果

    sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
    

    您应该看到以下输出

    $ sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
    Toy Story 3 (2010)|8.3
    Inside Out (2015)|8.2
    How to Train Your Dragon (2010)|8.1
    Zootopia (2016)|8.1
    How to Train Your Dragon 2 (2014)|7.9
    

支持的技术

数据库
Databricks Delta
Google BigQuery
Postgres
Snowflake
SQLite
Amazon Redshift
Microsoft SQL
DuckDB
文件类型
CSV
JSON
NDJSON
Parquet
文件存储
Amazon S3
文件系统
Google GCS
Google Drive
SFTP
FTP
Azure WASB
Azure WASBS

可用操作

以下是在 SDK 中可用的关键功能

  • load_file: 将指定文件加载到 SQL 表中
  • transform: 将 SQL select 语句应用于源表,并将结果保存到目标表
  • drop_table: 删除 SQL 表
  • run_raw_sql: 运行任何 SQL 语句,不处理其输出
  • append: 如果没有冲突,将源 SQL 表的行插入目标 SQL 表
  • merge: 根据冲突情况将源 SQL 表的行插入目标 SQL 表
    • ignore: 不添加已存在的行
    • update: 用新行替换现有行
  • export_file: 将 SQL 表的行导出到目标文件
  • dataframe: 将指定的 SQL 表导出到内存中的 Pandas 数据框

有关可用运算符的完整列表,请参阅 SDK 参考文档

文档

该文档仍在进行中--我们旨在遵循 Diátaxis 系统

  • 入门教程: Astro Python SDK 的动手介绍
  • 如何指南: 完成特定任务的简单分步用户指南
  • 参考指南: 命令、模块、类和方法
  • 解释: 阐明和讨论设计项目时的关键决策

变更日志

Astro Python SDK 遵循语义版本控制进行发布。请检查 变更日志 以了解最新更改。

版本管理

有关我们的发布理念和步骤的更多信息,请参阅 版本管理

贡献指南

欢迎所有贡献,包括错误报告、错误修复、文档改进、增强和想法。

阅读 贡献指南 了解如何贡献的详细概述。

贡献者和维护者应遵守 贡献者行为准则

许可证

Apache 许可证 2.0

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适合您平台文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

astro_sdk_python-1.8.1.tar.gz (111.8 kB 查看哈希值)

上传时间

构建分布

astro_sdk_python-1.8.1-py3-none-any.whl (157.2 kB 查看哈希值)

上传时间 Python 3

支持者