跳转到主要内容

一个允许用户在Airflow中本地运行SQL查询的装饰器。

项目描述

astro

简化工作流制作

Python versions License Development Status PyPI downloads Contributors Commit activity CI codecov

astro 允许使用Python快速且干净地开发{提取、加载、转换}工作流。它帮助DAG作者用更少的代码实现更多功能。它由 Apache Airflow 驱动并由 Astronomer 维护。

:warning: 免责声明 该项目的开发状态为alpha。换句话说,它还不是生产就绪状态。接口可能会更改。我们欢迎alpha用户和勇敢的灵魂来测试它 - 欢迎任何反馈。

安装

Astro 可在 PyPI 获取。使用标准的Python 安装工具

要安装云无关版本的 Astro,请运行

pip install astro-projects

如果使用云提供商,请使用感兴趣的可选依赖项进行安装

pip install astro-projects[amazon,google,snowflake,postgres]

快速入门

安装Astro后,将以下示例dag calculate_popular_movies.py 复制到名为 dags 的本地目录中。

from datetime import datetime
from airflow import DAG
from astro import sql as aql
from astro.sql.table import Table


@aql.transform()
def top_five_animations(input_table: Table):
    return """
        SELECT Title, Rating
        FROM {{input_table}}
        WHERE Genre1=='Animation'
        ORDER BY Rating desc
        LIMIT 5;
    """


with DAG(
    "calculate_popular_movies",
    schedule_interval=None,
    start_date=datetime(2000, 1, 1),
    catchup=False,
) as dag:
    imdb_movies = aql.load_file(
        path="https://raw.githubusercontent.com/astro-projects/astro/main/tests/data/imdb.csv",
        task_id="load_csv",
        output_table=Table(
            table_name="imdb_movies", database="sqlite", conn_id="sqlite_default"
        ),
    )

    top_five_animations(
        input_table=imdb_movies,
        output_table=Table(
            table_name="top_animation", database="sqlite", conn_id="sqlite_default"
        ),
    )

通过运行以下命令设置Airflow本地实例:

export AIRFLOW_HOME=`pwd`
export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True

airflow db init

为示例运行创建一个SQLite数据库并运行DAG:

# The sqlite_default connection has different host for MAC vs. Linux
export SQL_TABLE_NAME=`airflow connections get sqlite_default -o yaml | grep host | awk '{print $2}'`

sqlite3 "$SQL_TABLE_NAME" "VACUUM;"
airflow dags test calculate_popular_movies `date -Iseconds`

通过运行以下命令检查由您的第一个Astro DAG计算出的前五部热门动画:

sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"

您应该看到以下输出:

$ sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
Toy Story 3 (2010)|8.3
Inside Out (2015)|8.2
How to Train Your Dragon (2010)|8.1
Zootopia (2016)|8.1
How to Train Your Dragon 2 (2014)|7.9

需求

因为 astro 依赖于 Task Flow API,并且它依赖于Apache Airflow >= 2.1.0。

支持的技术

数据库 文件类型 文件位置
Google BigQuery CSV Amazon S3
Postgres JSON 文件系统
Snowflake NDJSON Google GCS
SQLite Parquet

可用操作

astro 当前可用的操作总结。更多详情请参阅参考指南

  • load_file:将指定文件加载到SQL表中
  • transform:对源表应用SQL选择语句并将结果保存到目标表中
  • truncate:从SQL表中删除所有记录
  • run_raw_sql:运行任何SQL语句而不处理其输出
  • append:在没有冲突的情况下,将源SQL表中的行插入到目标SQL表中
  • merge:根据冲突将源SQL表中的行插入到目标SQL表中
    • ignore:不添加已存在的行
    • update:用新行替换现有行
  • save_file:将SQL表行导出到目标文件
  • dataframe:将给定的SQL表导出到内存中的Pandas数据框
  • render:给定包含SQL语句的目录,在DAG中动态创建转换任务

文档

文档正在完善中,我们旨在遵循Diátaxis系统。

  • 教程:对 astro 的动手介绍
  • 如何指南:完成特定任务的简单分步用户指南
  • 参考指南:命令、模块、类和方法
  • 解释:设计项目时的关键决策的说明和讨论。

变更日志

我们遵循语义版本控制进行发布。请参阅变更日志以获取最新更改。

发布管理

有关我们的发布理念和步骤的更多信息,请参阅此处

贡献指南

欢迎所有贡献,包括错误报告、错误修复、文档改进、增强功能和想法。

阅读贡献指南以获取如何贡献的详细概述。

作为此项目的贡献者和维护者,您应遵守贡献者行为准则

许可证

Apache许可证2.0

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

astro-projects-0.8.3.tar.gz (42.0 kB 查看哈希值)

上传时间

构建分布

astro_projects-0.8.3-py3-none-any.whl (54.0 kB 查看哈希值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面