从任何dbt项目运行python脚本。
项目描述
欢迎来到dbt-fal 👋 使用dbt做更多事情
dbt-fal适配器是✨最简单✨的运行您的dbt Python模型的方式。
从dbt v1.3开始,您现在可以在Python中构建您的dbt模型。这导致了一些曾经难以仅使用SQL构建的有趣用例。以下是一些例子:
- 使用Python统计库计算统计信息
- 构建预测
- 构建其他预测模型,如分类和聚类
这真是太棒了!但是,还有一个问题!Snowflake和Bigquery的开发者体验并不好,Redshift和Postgres没有Python支持。
dbt-fal 为运行与所有其他数据仓库兼容的 Python 模型提供了最佳环境!使用 dbt-fal,您可以
- 在本地构建和测试您的模型
- 将每个模型隔离到其自己的环境中运行,并具有自己的依赖关系
- 【即将推出】在☁️云☁️中运行您的 Python 模型,具有可弹性扩展的 Python 环境。
- 【即将推出】甚至为您的模型添加 GPU,以处理一些更重的负载,例如训练 ML 模型。
入门指南
1. 安装 dbt-fal
pip install dbt-fal[bigquery,snowflake]
在此处添加您当前的仓库
2. 更新您的 profiles.yml
并添加 fal 适配器
jaffle_shop:
target: dev_with_fal
outputs:
dev_with_fal:
type: fal
db_profile: dev_bigquery # This points to your main adapter
dev_bigquery:
type: bigquery
...
不要忘记使用 db_profile
属性指向您的默认适配器。这样 fal 适配器才知道如何连接到您的数据仓库。
3. 运行 dbt run
!
就是这样!真的非常简单 😊
4. 【🚨酷功能警报🚨】使用 dbt-fal 环境管理
如果您需要一些环境管理的帮助(而不是坚持使用 dbt 进程运行的默认环境),您可以在与您的 dbt 项目相同的文件夹中创建一个 fal_project.yml 文件,并拥有“命名环境”
在您的 dbt 项目文件夹中
$ touch fal_project.yml
# Paste the config below
environments:
- name: ml
type: venv
requirements:
- prophet
然后在您的 dbt 模型中
$ vi models/orders_forecast.py
def model(dbt, fal):
dbt.config(fal_environment="ml") # Add this line
df: pd.DataFrame = dbt.ref("orders_daily")
dbt.config(fal_environment="ml")
将为您提供隔离的干净环境来运行,这样您就不会污染您的包空间。
5. 【即将推出™️】将您的计算转移到云中!
让我们知道您是否对此感兴趣。我们正在寻找测试用户。
dbt-fal
命令行工具
使用 dbt-fal
CLI,您可以
- 在 dbt 模型成功或失败时发送 Slack 通知。
- 在模型开始运行之前从外部数据源加载数据。
- 使用
FalDbt
以熟悉的语法:ref('my_dbt_model')
将 dbt 模型下载到 Python 上下文中。 - 以编程方式访问有关您的 dbt 项目的丰富元数据。
访问我们的 文档站点 以深入了解或尝试 深入示例,以了解 fal 如何帮助您使用 dbt 完成更多工作。
1. 安装 dbt-fal
$ pip install dbt-fal[postgres]
2. 进入您的 dbt 项目目录
$ cd ~/src/my_dbt_project
3. 创建一个 Python 脚本:send_slack_message.py
import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
CHANNEL_ID = os.getenv("SLACK_BOT_CHANNEL")
SLACK_TOKEN = os.getenv("SLACK_BOT_TOKEN")
client = WebClient(token=SLACK_TOKEN)
message_text = f"Model: {context.current_model.name}. Status: {context.current_model.status}."
try:
response = client.chat_postMessage(
channel=CHANNEL_ID,
text=message_text
)
except SlackApiError as e:
assert e.response["error"]
4. 在您的 schema.yml
中添加一个 meta
部分
models:
- name: historical_ozone_levels
description: Ozone levels
config:
materialized: table
columns:
- name: ozone_level
description: Ozone level
- name: ds
description: Date
meta:
fal:
scripts:
- send_slack_message.py
5. 运行 dbt-fal flow run
$ dbt-fal flow run
# both your dbt models and python scripts are run
6. 或者依次运行 dbt
和 fal
$ dbt run
# Your dbt models are run
$ dbt-fal run
# Your python scripts are run
在 dbt 运行之前运行脚本
使用 pre-hook:
配置选项在模型运行之前运行脚本。
给定以下 schema.yml
models:
- name: boston
description: Ozone levels
config:
materialized: table
meta:
owner: "@meder"
fal:
pre-hook:
- fal_scripts/trigger_fivetran.py
post-hook:
- fal_scripts/slack.py
dbt-fal flow run
将运行 fal_scripts/trigger_fivetran.py
,然后是 boston
dbt 模型,最后是 fal_scripts/slack.py
。如果选择了一个带有选择标记的模型(例如 --select boston
),则始终会与该模型一起运行与其关联的钩子。
$ dbt-fal flow run --select boston
概念
profile.yml 和凭证
fal
集成到 dbt
的 profile.yml
文件中,以访问和从数据仓库中读取数据。一旦您在 profile.yml
文件中为现有的 dbt
工作流程设置了凭证,每次您使用 ref
或 source
来创建数据帧时,fal
都将使用在 profile.yml
文件中指定的凭证进行身份验证。
meta
语法
models:
- name: historical_ozone_levels
...
meta:
owner: "@me"
fal:
post-hook:
- send_slack_message.py
- another_python_script.py
使用 meta
配置下的 fal
和 post-hook
键,让 fal
CLI 知道在哪里查找 Python 脚本。您可以传递上面显示的脚本列表,作为 dbt run
后的 post-hook 操作运行一个或多个脚本。
变量和函数
在 Python 脚本内部,您可以访问一些有用的变量和函数
变量
一个包含通过脚本运行的模型相关信息 context
对象。对于 meta
语法 示例,我们会得到以下内容
context.current_model.name
#= historical_ozone_levels
context.current_model.meta
#= {'owner': '@me'}
context.current_model.meta.get("owner")
#= '@me'
context.current_model.status
# Could be one of
#= 'success'
#= 'error'
#= 'skipped'
context
对象还可以访问与当前模型相关的测试信息。如果之前的 dbt 命令是 test
或 build
,则 context.current_model.test
属性将填充测试列表
context.current_model.tests
#= [CurrentTest(name='not_null', modelname='historical_ozone_levels, column='ds', status='Pass')]
ref
和 source
函数
还有一些来自 dbt
的熟悉函数可供使用
# Refer to dbt models or sources by name and returns it as `pandas.DataFrame`
ref('model_name')
source('source_name', 'table_name')
# You can use it to get the running model data
ref(context.current_model.name)
write_to_model
函数
❗️ 我们建议使用
dbt-fal
适配器将数据写回到您的数据仓库。
还有可能将数据发送回您的数据仓库。这使得获取数据、处理数据并将其上传回 dbt 领域变得容易。
此函数仅在 fal Python 模型中可用,即位于 fal_models
目录内的 Python 脚本,并将 fal-models-paths
添加到您的 dbt_project.yml
name: "jaffle_shop"
# ...
model-paths: ["models"]
# ...
vars:
# Add this to your dbt_project.yml
fal-models-paths: ["fal_models"]
一旦添加,它将自动由 fal 运行,无需在 schema.yml
中添加任何额外配置。
source_df = source('source_name', 'table_name')
ref_df = ref('a_model')
# Your code here
df = ...
# Upload a `pandas.DataFrame` back to the datawarehouse
write_to_model(df)
write_to_model
还接受一个可选的 dtype
参数,允许您指定列的数据类型。它与 DataFrame.to_sql
函数 的 dtype
参数的工作方式相同。
from sqlalchemy.types import Integer
# Upload but specifically create the `value` column with type `integer`
# Can be useful if data has `None` values
write_to_model(df, dtype={'value': Integer()})
将 fal
作为 Python 包导入
您可能希望从 Jupyter Notebook 或其他 Python 脚本中轻松访问 dbt 模型和源。为此,只需导入 fal
包并实例化一个 FalDbt 项目即可。
from fal.dbt import FalDbt
faldbt = FalDbt(profiles_dir="~/.dbt", project_dir="../my_project")
faldbt.list_sources()
# [
# DbtSource(name='results' ...),
# DbtSource(name='ticket_data_sentiment_analysis' ...)
# ...
# ]
faldbt.list_models()
# [
# DbtModel(name='zendesk_ticket_data' ...),
# DbtModel(name='agent_wait_time' ...)
# ...
# ]
sentiments = faldbt.source('results', 'ticket_data_sentiment_analysis')
# pandas.DataFrame
tickets = faldbt.ref('stg_zendesk_ticket_data')
# pandas.DataFrame
我们为什么要构建这个?
我们认为 dbt
非常棒,因为它让数据人员能够利用他们已经熟悉的工具完成更多工作。
这个库将成为我们尝试在 dbt
下游更全面地启用 数据科学工作负载 的基础。鉴于拥有可靠的数据管道是构建预测分析最重要的组成部分,我们正在构建一个与 dbt 集成良好的库。
有任何反馈或需要帮助?
- 加入我们 在 Discord 上的 fal
- 加入 dbt 社区 并进入我们的 #tools-fal 频道
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分布
构建分发版
dbt_fal-1.5.9.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 50f13af042fcaf91285fb8a58e494841de996d6ec9d614d93d8d167cae168077 |
|
MD5 | 8f5418ca93dc169eac12e81cc42ff063 |
|
BLAKE2b-256 | 01234f20974fe054fb7b864da982d56b370ca7fb094c62dd72e5e8d11d194c2d |
dbt_fal-1.5.9-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 81de0baf3d3e19dddbf51a7007fd70c4b0976f1d516ee983fa77493c3ff311fa |
|
MD5 | f1a3397b2cf6484aec8e912991fae07d |
|
BLAKE2b-256 | 284dbd038a65e5ef70218951e94582296dbcfba16c282863bf0d54437b3df270 |