跳转到主要内容

从任何dbt项目运行python脚本。

项目描述

欢迎来到dbt-fal 👋 使用dbt做更多事情

dbt-fal适配器是✨最简单✨的运行您的dbt Python模型的方式。

从dbt v1.3开始,您现在可以在Python中构建您的dbt模型。这导致了一些曾经难以仅使用SQL构建的有趣用例。以下是一些例子:

  • 使用Python统计库计算统计信息
  • 构建预测
  • 构建其他预测模型,如分类和聚类

这真是太棒了!但是,还有一个问题!Snowflake和Bigquery的开发者体验并不好,Redshift和Postgres没有Python支持。

dbt-fal 为运行与所有其他数据仓库兼容的 Python 模型提供了最佳环境!使用 dbt-fal,您可以

  • 在本地构建和测试您的模型
  • 将每个模型隔离到其自己的环境中运行,并具有自己的依赖关系
  • 【即将推出】在☁️云☁️中运行您的 Python 模型,具有可弹性扩展的 Python 环境。
  • 【即将推出】甚至为您的模型添加 GPU,以处理一些更重的负载,例如训练 ML 模型。

入门指南

1. 安装 dbt-fal

pip install dbt-fal[bigquery,snowflake] 在此处添加您当前的仓库

2. 更新您的 profiles.yml 并添加 fal 适配器

jaffle_shop:
  target: dev_with_fal
  outputs:
    dev_with_fal:
      type: fal
      db_profile: dev_bigquery # This points to your main adapter
    dev_bigquery:
      type: bigquery
      ...

不要忘记使用 db_profile 属性指向您的默认适配器。这样 fal 适配器才知道如何连接到您的数据仓库。

3. 运行 dbt run

就是这样!真的非常简单 😊

4. 【🚨酷功能警报🚨】使用 dbt-fal 环境管理

如果您需要一些环境管理的帮助(而不是坚持使用 dbt 进程运行的默认环境),您可以在与您的 dbt 项目相同的文件夹中创建一个 fal_project.yml 文件,并拥有“命名环境”

在您的 dbt 项目文件夹中

$ touch fal_project.yml

# Paste the config below
environments:
  - name: ml
    type: venv
    requirements:
      - prophet

然后在您的 dbt 模型中

$ vi models/orders_forecast.py

def model(dbt, fal):
    dbt.config(fal_environment="ml") # Add this line

    df: pd.DataFrame = dbt.ref("orders_daily")

dbt.config(fal_environment="ml") 将为您提供隔离的干净环境来运行,这样您就不会污染您的包空间。

5. 【即将推出™️】将您的计算转移到云中!

让我们知道您是否对此感兴趣。我们正在寻找测试用户。

dbt-fal 命令行工具

使用 dbt-fal CLI,您可以

访问我们的 文档站点 以深入了解或尝试 深入示例,以了解 fal 如何帮助您使用 dbt 完成更多工作。

1. 安装 dbt-fal

$ pip install dbt-fal[postgres]

2. 进入您的 dbt 项目目录

$ cd ~/src/my_dbt_project

3. 创建一个 Python 脚本:send_slack_message.py

import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

CHANNEL_ID = os.getenv("SLACK_BOT_CHANNEL")
SLACK_TOKEN = os.getenv("SLACK_BOT_TOKEN")

client = WebClient(token=SLACK_TOKEN)
message_text = f"Model: {context.current_model.name}. Status: {context.current_model.status}."

try:
    response = client.chat_postMessage(
        channel=CHANNEL_ID,
        text=message_text
    )
except SlackApiError as e:
    assert e.response["error"]

4. 在您的 schema.yml 中添加一个 meta 部分

models:
  - name: historical_ozone_levels
    description: Ozone levels
    config:
      materialized: table
    columns:
      - name: ozone_level
        description: Ozone level
      - name: ds
        description: Date
    meta:
      fal:
        scripts:
          - send_slack_message.py

5. 运行 dbt-fal flow run

$ dbt-fal flow run
# both your dbt models and python scripts are run

6. 或者依次运行 dbtfal

$ dbt run
# Your dbt models are run

$ dbt-fal run
# Your python scripts are run

在 dbt 运行之前运行脚本

使用 pre-hook: 配置选项在模型运行之前运行脚本。

给定以下 schema.yml

models:
  - name: boston
    description: Ozone levels
    config:
      materialized: table
    meta:
      owner: "@meder"
      fal:
        pre-hook:
          - fal_scripts/trigger_fivetran.py
        post-hook:
          - fal_scripts/slack.py

dbt-fal flow run 将运行 fal_scripts/trigger_fivetran.py,然后是 boston dbt 模型,最后是 fal_scripts/slack.py。如果选择了一个带有选择标记的模型(例如 --select boston),则始终会与该模型一起运行与其关联的钩子。

$ dbt-fal flow run --select boston

概念

profile.yml 和凭证

fal 集成到 dbtprofile.yml 文件中,以访问和从数据仓库中读取数据。一旦您在 profile.yml 文件中为现有的 dbt 工作流程设置了凭证,每次您使用 refsource 来创建数据帧时,fal 都将使用在 profile.yml 文件中指定的凭证进行身份验证。

meta 语法

models:
  - name: historical_ozone_levels
    ...
    meta:
      owner: "@me"
      fal:
        post-hook:
          - send_slack_message.py
          - another_python_script.py

使用 meta 配置下的 falpost-hook 键,让 fal CLI 知道在哪里查找 Python 脚本。您可以传递上面显示的脚本列表,作为 dbt run 后的 post-hook 操作运行一个或多个脚本。

变量和函数

在 Python 脚本内部,您可以访问一些有用的变量和函数

变量

一个包含通过脚本运行的模型相关信息 context 对象。对于 meta 语法 示例,我们会得到以下内容

context.current_model.name
#= historical_ozone_levels

context.current_model.meta
#= {'owner': '@me'}

context.current_model.meta.get("owner")
#= '@me'

context.current_model.status
# Could be one of
#= 'success'
#= 'error'
#= 'skipped'

context 对象还可以访问与当前模型相关的测试信息。如果之前的 dbt 命令是 testbuild,则 context.current_model.test 属性将填充测试列表

context.current_model.tests
#= [CurrentTest(name='not_null', modelname='historical_ozone_levels, column='ds', status='Pass')]

refsource 函数

还有一些来自 dbt 的熟悉函数可供使用

# Refer to dbt models or sources by name and returns it as `pandas.DataFrame`
ref('model_name')
source('source_name', 'table_name')

# You can use it to get the running model data
ref(context.current_model.name)

write_to_model 函数

❗️ 我们建议使用 dbt-fal 适配器将数据写回到您的数据仓库。

还有可能将数据发送回您的数据仓库。这使得获取数据、处理数据并将其上传回 dbt 领域变得容易。

此函数仅在 fal Python 模型中可用,即位于 fal_models 目录内的 Python 脚本,并将 fal-models-paths 添加到您的 dbt_project.yml

name: "jaffle_shop"
# ...
model-paths: ["models"]
# ...

vars:
  # Add this to your dbt_project.yml
  fal-models-paths: ["fal_models"]

一旦添加,它将自动由 fal 运行,无需在 schema.yml 中添加任何额外配置。

source_df = source('source_name', 'table_name')
ref_df = ref('a_model')

# Your code here
df = ...

# Upload a `pandas.DataFrame` back to the datawarehouse
write_to_model(df)

write_to_model 还接受一个可选的 dtype 参数,允许您指定列的数据类型。它与 DataFrame.to_sql 函数dtype 参数的工作方式相同。

from sqlalchemy.types import Integer
# Upload but specifically create the `value` column with type `integer`
# Can be useful if data has `None` values
write_to_model(df, dtype={'value': Integer()})

fal 作为 Python 包导入

您可能希望从 Jupyter Notebook 或其他 Python 脚本中轻松访问 dbt 模型和源。为此,只需导入 fal 包并实例化一个 FalDbt 项目即可。

from fal.dbt import FalDbt
faldbt = FalDbt(profiles_dir="~/.dbt", project_dir="../my_project")

faldbt.list_sources()
# [
#    DbtSource(name='results' ...),
#    DbtSource(name='ticket_data_sentiment_analysis' ...)
#    ...
# ]

faldbt.list_models()
# [
#    DbtModel(name='zendesk_ticket_data' ...),
#    DbtModel(name='agent_wait_time' ...)
#    ...
# ]


sentiments = faldbt.source('results', 'ticket_data_sentiment_analysis')
# pandas.DataFrame
tickets = faldbt.ref('stg_zendesk_ticket_data')
# pandas.DataFrame

我们为什么要构建这个?

我们认为 dbt 非常棒,因为它让数据人员能够利用他们已经熟悉的工具完成更多工作。

这个库将成为我们尝试在 dbt 下游更全面地启用 数据科学工作负载 的基础。鉴于拥有可靠的数据管道是构建预测分析最重要的组成部分,我们正在构建一个与 dbt 集成良好的库。

有任何反馈或需要帮助?

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

dbt_fal-1.5.9.tar.gz (96.6 kB 查看哈希值)

上传时间

构建分发版

dbt_fal-1.5.9-py3-none-any.whl (123.5 kB 查看哈希值)

上传时间 Python 3

由以下支持