一个允许用户在Airflow中本地运行SQL查询的装饰器。
项目描述
astro
简化工作流制作
astro 允许使用Python快速且干净地开发{提取、加载、转换}工作流。它帮助DAG作者用更少的代码实现更多功能。它由 Apache Airflow 驱动并由 Astronomer 维护。
:warning: 免责声明 该项目的开发状态为alpha。换句话说,它还不是生产就绪状态。接口可能会更改。我们欢迎alpha用户和勇敢的灵魂来测试它 - 欢迎任何反馈。
安装
Astro 可在 PyPI 获取。使用标准的Python 安装工具。
要安装云无关版本的 Astro,请运行
pip install astro-projects
如果使用云提供商,请使用感兴趣的可选依赖项进行安装
pip install astro-projects[amazon,google,snowflake,postgres]
快速入门
安装Astro后,将以下示例dag calculate_popular_movies.py
复制到名为 dags
的本地目录中。
from datetime import datetime
from airflow import DAG
from astro import sql as aql
from astro.sql.table import Table
@aql.transform()
def top_five_animations(input_table: Table):
return """
SELECT Title, Rating
FROM {{input_table}}
WHERE Genre1=='Animation'
ORDER BY Rating desc
LIMIT 5;
"""
with DAG(
"calculate_popular_movies",
schedule_interval=None,
start_date=datetime(2000, 1, 1),
catchup=False,
) as dag:
imdb_movies = aql.load_file(
path="https://raw.githubusercontent.com/astro-projects/astro/main/tests/data/imdb.csv",
task_id="load_csv",
output_table=Table(
table_name="imdb_movies", database="sqlite", conn_id="sqlite_default"
),
)
top_five_animations(
input_table=imdb_movies,
output_table=Table(
table_name="top_animation", database="sqlite", conn_id="sqlite_default"
),
)
通过运行以下命令设置Airflow本地实例:
export AIRFLOW_HOME=`pwd`
export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
airflow db init
为示例运行创建一个SQLite数据库并运行DAG:
# The sqlite_default connection has different host for MAC vs. Linux
export SQL_TABLE_NAME=`airflow connections get sqlite_default -o yaml | grep host | awk '{print $2}'`
sqlite3 "$SQL_TABLE_NAME" "VACUUM;"
airflow dags test calculate_popular_movies `date -Iseconds`
通过运行以下命令检查由您的第一个Astro DAG计算出的前五部热门动画:
sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
您应该看到以下输出:
$ sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
Toy Story 3 (2010)|8.3
Inside Out (2015)|8.2
How to Train Your Dragon (2010)|8.1
Zootopia (2016)|8.1
How to Train Your Dragon 2 (2014)|7.9
需求
因为 astro 依赖于 Task Flow API,并且它依赖于Apache Airflow >= 2.1.0。
支持的技术
数据库 | 文件类型 | 文件位置 |
---|---|---|
Google BigQuery | CSV | Amazon S3 |
Postgres | JSON | 文件系统 |
Snowflake | NDJSON | Google GCS |
SQLite | Parquet |
可用操作
astro 当前可用的操作总结。更多详情请参阅参考指南。
load_file
:将指定文件加载到SQL表中transform
:对源表应用SQL选择语句并将结果保存到目标表中truncate
:从SQL表中删除所有记录run_raw_sql
:运行任何SQL语句而不处理其输出append
:在没有冲突的情况下,将源SQL表中的行插入到目标SQL表中merge
:根据冲突将源SQL表中的行插入到目标SQL表中- ignore:不添加已存在的行
- update:用新行替换现有行
save_file
:将SQL表行导出到目标文件dataframe
:将给定的SQL表导出到内存中的Pandas数据框render
:给定包含SQL语句的目录,在DAG中动态创建转换任务
文档
文档正在完善中,我们旨在遵循Diátaxis系统。
- 教程:对 astro 的动手介绍
- 如何指南:完成特定任务的简单分步用户指南
- 参考指南:命令、模块、类和方法
- 解释:设计项目时的关键决策的说明和讨论。
变更日志
我们遵循语义版本控制进行发布。请参阅变更日志以获取最新更改。
发布管理
有关我们的发布理念和步骤的更多信息,请参阅此处
贡献指南
欢迎所有贡献,包括错误报告、错误修复、文档改进、增强功能和想法。
阅读贡献指南以获取如何贡献的详细概述。
作为此项目的贡献者和维护者,您应遵守贡献者行为准则。
许可证
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
astro-projects-0.8.3.tar.gz (42.0 kB 查看哈希值)
构建分布
astro_projects-0.8.3-py3-none-any.whl (54.0 kB 查看哈希值)