跳转到主要内容

DuckDB (duckdb.org) 为 Apache Airflow 提供程序

项目描述

airflow-provider-duckdb

为 Airflow 提供的 DuckDB 提供程序。此提供程序暴露了一个钩子/连接,该连接返回一个 DuckDB 连接。

这适用于本地或 MotherDuck 连接。

安装

pip install airflow-provider-duckdb

连接

连接类型是 duckdb。它支持设置以下参数

Airflow 字段名 Airflow UI 标签 描述
host 本地数据库文件路径 本地文件路径。为内存数据库留空(无密码)。
schema MotherDuck 数据库名称 MotherDuck 数据库名称。留空为默认。
password 妈妈鸭服务令牌 妈妈鸭服务令牌。留空为本地数据库。

这些已在 Airflow UI 中重新标记以提高清晰度。

例如,如果您想连接到本地文件

Airflow 字段名 Airflow UI 标签
host 本地数据库文件路径 /path/to/file.db
schema MotherDuck 数据库名称 (留空)
password 妈妈鸭服务令牌 (留空)

如果您想连接到妈妈鸭数据库

Airflow 字段名 Airflow UI 标签
host 本地数据库文件路径 (留空)
schema MotherDuck 数据库名称 <YOUR_DB_NAME>,或留空为默认
password 妈妈鸭服务令牌 <YOUR_SERVICE_TOKEN>

用法

import pandas as pd
import pendulum

from airflow.decorators import dag, task
from duckdb_provider.hooks.duckdb_hook import DuckDBHook


@dag(
    schedule=None,
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
)
def duckdb_transform():
    @task
    def create_df() -> pd.DataFrame:
        """
        Create a dataframe with some sample data
        """
        df = pd.DataFrame(
            {
                "a": [1, 2, 3],
                "b": [4, 5, 6],
                "c": [7, 8, 9],
            }
        )
        return df

    @task
    def simple_select(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to select a subset of the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # execute a simple query
        res = conn.execute("SELECT a, b, c FROM df WHERE a >= 2").df()

        return res

    @task
    def add_col(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to add a column to the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # add a column
        conn.execute("CREATE TABLE tb AS SELECT *, a + b AS d FROM df")

        # get the table
        return conn.execute("SELECT * FROM tb").df()

    @task
    def aggregate(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to aggregate the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # aggregate
        return conn.execute("SELECT SUM(a), COUNT(b) FROM df").df()

    create_df_res = create_df()
    simple_select_res = simple_select(create_df_res)
    add_col_res = add_col(simple_select_res)
    aggregate_res = aggregate(add_col_res)


duckdb_transform()

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪一个,请了解有关安装包的更多信息。

源代码分布

airflow-provider-duckdb-0.2.0.tar.gz (7.5 kB 查看哈希值)

上传时间 源代码

构建分布

airflow_provider_duckdb-0.2.0-py3-none-any.whl (8.5 kB 查看哈希值)

上传时间 Python 3

由以下支持