DuckDB (duckdb.org) 为 Apache Airflow 提供程序
项目描述
airflow-provider-duckdb
为 Airflow 提供的 DuckDB 提供程序。此提供程序暴露了一个钩子/连接,该连接返回一个 DuckDB 连接。
这适用于本地或 MotherDuck 连接。
安装
pip install airflow-provider-duckdb
连接
连接类型是 duckdb
。它支持设置以下参数
Airflow 字段名 | Airflow UI 标签 | 描述 |
---|---|---|
host |
本地数据库文件路径 | 本地文件路径。为内存数据库留空(无密码)。 |
schema |
MotherDuck 数据库名称 | MotherDuck 数据库名称。留空为默认。 |
password |
妈妈鸭服务令牌 | 妈妈鸭服务令牌。留空为本地数据库。 |
这些已在 Airflow UI 中重新标记以提高清晰度。
例如,如果您想连接到本地文件
Airflow 字段名 | Airflow UI 标签 | 值 |
---|---|---|
host |
本地数据库文件路径 | /path/to/file.db |
schema |
MotherDuck 数据库名称 | (留空) |
password |
妈妈鸭服务令牌 | (留空) |
如果您想连接到妈妈鸭数据库
Airflow 字段名 | Airflow UI 标签 | 值 |
---|---|---|
host |
本地数据库文件路径 | (留空) |
schema |
MotherDuck 数据库名称 | <YOUR_DB_NAME> ,或留空为默认 |
password |
妈妈鸭服务令牌 | <YOUR_SERVICE_TOKEN> |
用法
import pandas as pd
import pendulum
from airflow.decorators import dag, task
from duckdb_provider.hooks.duckdb_hook import DuckDBHook
@dag(
schedule=None,
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
)
def duckdb_transform():
@task
def create_df() -> pd.DataFrame:
"""
Create a dataframe with some sample data
"""
df = pd.DataFrame(
{
"a": [1, 2, 3],
"b": [4, 5, 6],
"c": [7, 8, 9],
}
)
return df
@task
def simple_select(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to select a subset of the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# execute a simple query
res = conn.execute("SELECT a, b, c FROM df WHERE a >= 2").df()
return res
@task
def add_col(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to add a column to the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# add a column
conn.execute("CREATE TABLE tb AS SELECT *, a + b AS d FROM df")
# get the table
return conn.execute("SELECT * FROM tb").df()
@task
def aggregate(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to aggregate the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# aggregate
return conn.execute("SELECT SUM(a), COUNT(b) FROM df").df()
create_df_res = create_df()
simple_select_res = simple_select(create_df_res)
add_col_res = add_col(simple_select_res)
aggregate_res = aggregate(add_col_res)
duckdb_transform()
项目详情
关闭
哈希值 for airflow_provider_duckdb-0.2.0-py3-none-any.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 874590e069e143574251fc6e30c6d5cd737d6f375170f53f5ae74ee4fe7c4afd |
|
MD5 | 611aea4fe52439b243cbdf18a40b5eff |
|
BLAKE2b-256 | 5a1ad687f8e34299ce49512b290ac7aa69e37db3d4b5c64e94892f87cc27ac0b |