Databricks SDK for Python (Beta)
项目描述
Databricks SDK for Python (Beta)
Python 数据库 SDK 包括用于加速 Databricks Lakehouse 开发的功能,支持所有公开的 Databricks REST API 操作。SDK 的内部 HTTP 客户端功能强大,通过智能重试处理不同级别的失败。
内容
入门指南
- 请通过
pip install databricks-sdk
安装 Databricks SDK for Python 并实例化WorkspaceClient
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for c in w.clusters.list():
print(c.cluster_name)
Python 数据库 SDK for Python 兼容 Python 3.7 (直到 2023 年 6 月)、3.8、3.9、3.10 和 3.11。
注意:从版本 13.1 开始,Databricks Runtime 包含 Python SDK 的捆绑版本。
强烈建议升级到最新版本,您可以在笔记本单元中运行以下命令来实现
%pip install --upgrade databricks-sdk
随后
dbutils.library.restartPython()
代码示例
Python 数据库 SDK for Python 随附一系列示例,展示了如何使用库处理各种常见用例,包括
这些示例和其他更多示例位于 examples/
目录的 Github 存储库。
其他使用 SDK 的示例包括
- Unity Catalog 自动迁移 严重依赖 Python SDK 来处理 Databricks API。
- ip-access-list-analyzer 检查并从 IP 访问列表中删除无效条目。
身份验证
如果您使用 Databricks 配置配置文件 或 Databricks 特定的 环境变量 进行 Databricks 身份验证,则开始使用 Databricks 工作空间的唯一代码片段如下,指示 Python 数据库 SDK 使用其 默认身份验证流程
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w. # press <TAB> for autocompletion
Python 数据库 SDK 的常规名称是 w
,它是 workspace
的简称。
本节内容
默认身份验证流程
如果您运行 Databricks Terraform Provider、Databricks SDK for Go、Databricks CLI 或其他语言目标的 Databricks SDK,它们很可能都能很好地协同工作。默认情况下,Python 数据库 SDK 尝试以下 身份验证 方法,按以下顺序,直到成功
- Databricks 原生身份验证
- Azure 原生身份验证
- 如果 SDK 到此为止仍不成功,它将返回一个身份验证错误并停止运行。
您可以通过设置 auth_type
参数来指示 Python 数据库 SDK 使用特定的身份验证方法,有关说明请参阅以下各节。
对于每种身份验证方法,SDK 将按以下顺序在以下位置搜索兼容的身份验证凭据。一旦 SDK 找到它可以使用的一组凭据,它就会停止搜索
-
硬编码到配置参数中的凭据。
:warning: 注意:Databricks不推荐将凭据硬编码到参数中,因为它们可能在版本控制系统中以明文形式暴露。请使用环境变量或配置配置文件代替。
-
Databricks特定环境变量中的凭据。
-
对于Databricks原生身份验证,凭据位于默认文件位置(Linux或macOS为
~
,Windows为%USERPROFILE%
)的.databrickscfg
文件的DEFAULT
配置配置文件中。 -
对于Azure原生身份验证,SDK会根据需要通过Azure CLI搜索凭据。
根据Databricks身份验证方法,SDK使用以下信息。以下列出的是WorkspaceClient
和AccountClient
参数(它们对应于.databrickscfg
文件字段),它们的描述以及任何相应的环境变量。
Databricks 原生身份验证
默认情况下,Databricks Python SDK最初尝试Databricks令牌身份验证(auth_type='pat'
参数)。如果SDK失败,它将尝试Databricks基本(用户名/密码)身份验证(auth_type="basic"
参数)。
- 对于Databricks令牌身份验证,您必须提供
host
和token
;或它们的或环境变量或.databrickscfg
文件字段等效项。 - 对于Databricks基本身份验证,您必须提供
host
、username
和password
(用于AWS工作空间级操作);或host
、account_id
、username
和password
(用于AWS、Azure或GCP账户级操作);或它们的或环境变量或.databrickscfg
文件字段等效项。
参数 | 描述 | 环境变量 |
---|---|---|
host |
(字符串) Databricks主机URL,用于Databricks工作空间端点或Databricks账户端点。 | DATABRICKS_HOST |
account_id |
(字符串) Databricks账户ID,用于Databricks账户端点。只有当Host 是以下之一时才有效:https://accounts.cloud.databricks.com/ (AWS)、https://accounts.azuredatabricks.net/ (Azure)或https://accounts.gcp.databricks.com/ (GCP)。 |
DATABRICKS_ACCOUNT_ID |
token |
(字符串) Databricks个人访问令牌(PAT)(AWS、Azure和GCP)或Azure Active Directory(Azure AD)令牌(Azure)。 | DATABRICKS_TOKEN |
username |
(字符串) Databricks基本身份验证中的Databricks用户名部分。只有当Host 是*.cloud.databricks.com (AWS)时才可能。 |
DATABRICKS_USERNAME |
password |
(字符串) Databricks基本身份验证中的Databricks密码部分。只有当Host 是*.cloud.databricks.com (AWS)时才可能。 |
DATABRICKS_PASSWORD |
例如,要使用Databricks令牌身份验证
from databricks.sdk import WorkspaceClient
w = WorkspaceClient(host=input('Databricks Workspace URL: '), token=input('Token: '))
Azure 原生身份验证
默认情况下,Databricks Python SDK首先尝试Azure客户端密钥身份验证(auth_type='azure-client-secret'
参数)。如果SDK失败,它将尝试Azure CLI身份验证(auth_type='azure-cli'
参数)。请参阅管理服务主体。
Databricks Python SDK会获取Azure CLI令牌,如果您已通过在机器上运行az login
作为Azure用户进行过身份验证。请参阅使用Azure CLI获取用户的Azure AD令牌。
要作为Azure Active Directory(Azure AD)服务主体进行身份验证,您必须提供以下之一。另请参阅将服务主体添加到您的Azure Databricks账户
azure_workspace_resource_id
、azure_client_secret
、azure_client_id
和azure_tenant_id
;或者它们的环境内置变量或.databrickscfg
文件字段等效。azure_workspace_resource_id
和azure_use_msi
;或者它们的环境内置变量或.databrickscfg
文件字段等效。
参数 | 描述 | 环境变量 |
---|---|---|
azure_workspace_resource_id |
(字符串) Azure Databricks 工作空间的 Azure 资源管理器 ID,用于交换 Databricks 主机 URL。 | DATABRICKS_AZURE_RESOURCE_ID |
azure_use_msi |
(布尔值) 使用 Azure 管理服务标识(MSI)无密码身份验证流程进行服务主体的 true 。 此功能尚未在 Python 的 Databricks SDK 中实现。 |
ARM_USE_MSI |
azure_client_secret |
(字符串) Azure AD 服务主体的客户端密钥。 | ARM_CLIENT_SECRET |
azure_client_id |
(字符串) Azure AD 服务主体的应用程序 ID。 | ARM_CLIENT_ID |
azure_tenant_id |
(字符串) Azure AD 服务主体的租户 ID。 | ARM_TENANT_ID |
azure_environment |
(字符串) 某组 API 端点使用的 Azure 环境类型(例如 Public、UsGov、China 和 Germany)。默认为 PUBLIC 。 |
ARM_ENVIRONMENT |
例如,要使用 Azure 客户端密钥身份验证
from databricks.sdk import WorkspaceClient
w = WorkspaceClient(host=input('Databricks Workspace URL: '),
azure_workspace_resource_id=input('Azure Resource ID: '),
azure_tenant_id=input('AAD Tenant ID: '),
azure_client_id=input('AAD Client ID: '),
azure_client_secret=input('AAD Client Secret: '))
请参阅更多示例在此文档中。
Google Cloud Platform 原生身份验证
默认情况下,Python 的 Databricks SDK 首先尝试 GCP 凭据身份验证(auth_type='google-credentials'
参数)。如果 SDK 不成功,它将尝试 Google Cloud Platform(GCP)ID 身份验证(auth_type='google-id'
参数)。
Python 的 Databricks SDK 在 Google 默认应用程序凭据(DAC)流的范围内获取 OAuth 令牌。这意味着如果您在开发机器上运行了 gcloud auth application-default login
,或者在计算上启动了应用程序,该应用程序允许模拟在 google_service_account
中指定的 Google Cloud 服务帐户。身份验证应该能够直接工作。请参阅 创建和管理服务帐户。
要作为 Google Cloud 服务帐户进行身份验证,您必须提供以下之一
host
和google_credentials
;或者它们的环境内置变量或.databrickscfg
文件字段等效。host
和google_service_account
;或者它们的环境内置变量或.databrickscfg
文件字段等效。
参数 | 描述 | 环境变量 |
---|---|---|
google_credentials |
(字符串) GCP 服务帐户凭据 JSON 或这些凭据在本地文件系统上的位置。 | GOOGLE_CREDENTIALS |
google_service_account |
(字符串) 用于默认应用程序凭据流中模拟的 Google Cloud Platform(GCP)服务帐户电子邮件,无需密码。 | DATABRICKS_GOOGLE_SERVICE_ACCOUNT |
例如,要使用 Google ID 身份验证
from databricks.sdk import WorkspaceClient
w = WorkspaceClient(host=input('Databricks Workspace URL: '),
google_service_account=input('Google Service Account: '))
覆盖 .databrickscfg
对于 Databricks 原生身份验证,您可以通过以下方式覆盖使用 .databrickscfg
的默认行为
参数 | 描述 | 环境变量 |
---|---|---|
profile |
(字符串) 在 .databrickscfg 中指定的连接配置文件,用于替代 DEFAULT 。 |
DATABRICKS_CONFIG_PROFILE |
config_file |
(字符串) Databricks CLI 凭据文件的非默认位置。 | DATABRICKS_CONFIG_FILE |
例如,要使用名为 MYPROFILE
的配置文件而不是 DEFAULT
from databricks.sdk import WorkspaceClient
w = WorkspaceClient(profile='MYPROFILE')
# Now call the Databricks workspace APIs as desired...
额外的身份验证配置选项
对于所有身份验证方法,您可以在客户端参数中按以下方式覆盖默认行为
参数 | 描述 | 环境变量 |
---|---|---|
auth_type |
(字符串) 当环境内存在多个身份验证属性时,使用此参数指定的身份验证类型。此参数还保留当前选定的身份验证。 | DATABRICKS_AUTH_TYPE |
http_timeout_seconds |
(整数) HTTP 超时秒数。默认为 60。 | (None) |
retry_timeout_seconds |
(整数) 保持重试 HTTP 请求的秒数。默认为 300(5分钟)。 | (None) |
debug_truncate_bytes |
(整数) 在此限制以上的调试日志中截断 JSON 字段。默认为 96。 | DATABRICKS_DEBUG_TRUNCATE_BYTES |
debug_headers |
(布尔值) 使用 true 来调试应用程序发出的请求的 HTTP 标头。默认为 false ,因为标头包含敏感数据,例如访问令牌。 |
DATABRICKS_DEBUG_HEADERS |
rate_limit |
(整数) 向 Databricks REST API 发送请求的最大请求数量每秒。 | DATABRICKS_RATE_LIMIT |
例如,要开启调试 HTTP 标头
from databricks.sdk import WorkspaceClient
w = WorkspaceClient(debug_headers=True)
# Now call the Databricks workspace APIs as desired...
长时间运行的操作
当你调用长时间运行的操作时,SDK 提供了一个高级 API 来 触发 这些操作并 等待 相关实体达到正确状态或返回错误信息(如果失败)。所有长时间运行的操作都返回通用的 Wait
实例,具有 result()
方法以获取长时间运行操作的结果,一旦完成。Databricks Python SDK 为每个方法选择最合理的默认超时时间,但有时你可能需要为 result()
方法的 timeout
参数提供 datetime.timedelta()
作为值。
Databricks API 中有许多长时间运行的操作,例如管理
- 集群,
- 命令执行
- 作业
- 库
- Delta Live Tables 管道
- Databricks SQL 仓库。
例如,在 Clusters API 中,一旦你创建了一个集群,你就会收到一个集群 ID,该集群处于 PENDING
状态。同时,Databricks 在后台负责从云提供商预配虚拟机。集群仅在 RUNNING
状态下可使用,因此你必须等待达到该状态。
另一个例子是运行作业或修复运行的 API:运行开始后,运行处于 PENDING
状态。作业只有在处于 TERMINATED
或 SKIPPED
状态时才被认为已完成。如果你长时间运行的操作超时并发生错误,你可能还需要错误消息。有时你可能想配置一个除了默认的 20 分钟以外的自定义超时时间。
在以下示例中,w.clusters.create
仅在集群处于 RUNNING
状态时返回 ClusterInfo
,否则将在 10 分钟后超时
import datetime
import logging
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
info = w.clusters.create_and_wait(cluster_name='Created cluster',
spark_version='12.0.x-scala2.12',
node_type_id='m5d.large',
autotermination_minutes=10,
num_workers=1,
timeout=datetime.timedelta(minutes=10))
logging.info(f'Created: {info}')
请查看 examples/starting_job_and_waiting.py
以了解更高级的使用方法
import datetime
import logging
import time
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.jobs as j
w = WorkspaceClient()
# create a dummy file on DBFS that just sleeps for 10 seconds
py_on_dbfs = f'/home/{w.current_user.me().user_name}/sample.py'
with w.dbfs.open(py_on_dbfs, write=True, overwrite=True) as f:
f.write(b'import time; time.sleep(10); print("Hello, World!")')
# trigger one-time-run job and get waiter object
waiter = w.jobs.submit(run_name=f'py-sdk-run-{time.time()}', tasks=[
j.RunSubmitTaskSettings(
task_key='hello_world',
new_cluster=j.BaseClusterInfo(
spark_version=w.clusters.select_spark_version(long_term_support=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=1
),
spark_python_task=j.SparkPythonTask(
python_file=f'dbfs:{py_on_dbfs}'
),
)
])
logging.info(f'starting to poll: {waiter.run_id}')
# callback, that receives a polled entity between state updates
def print_status(run: j.Run):
statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
logging.info(f'workflow intermediate status: {", ".join(statuses)}')
# If you want to perform polling in a separate thread, process, or service,
# you can use w.jobs.wait_get_run_job_terminated_or_skipped(
# run_id=waiter.run_id,
# timeout=datetime.timedelta(minutes=15),
# callback=print_status) to achieve the same results.
#
# Waiter interface allows for `w.jobs.submit(..).result()` simplicity in
# the scenarios, where you need to block the calling thread for the job to finish.
run = waiter.result(timeout=datetime.timedelta(minutes=15),
callback=print_status)
logging.info(f'job finished: {run.run_page_url}')
分页响应
在平台侧,Databricks API 使用不同的等待来处理分页
- 一些 API 遵循偏移量加限制分页
- 一些从 0 开始偏移,而另一些从 1 开始
- 一些使用基于游标的迭代
- 其他的一些只在一个响应中返回所有结果
Databricks Python SDK 在 Iterator[T]
抽象下隐藏了这种复杂性,其中多页结果 yield
项目。Python 类型帮助自动完成单个项目字段。
import logging
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for repo in w.repos.list():
logging.info(f'Found repo: {repo.path}')
请查看 examples/last_job_runs.py
以了解更高级的使用方法
import logging
from collections import defaultdict
from datetime import datetime, timezone
from databricks.sdk import WorkspaceClient
latest_state = {}
all_jobs = {}
durations = defaultdict(list)
w = WorkspaceClient()
for job in w.jobs.list():
all_jobs[job.job_id] = job
for run in w.jobs.list_runs(job_id=job.job_id, expand_tasks=False):
durations[job.job_id].append(run.run_duration)
if job.job_id not in latest_state:
latest_state[job.job_id] = run
continue
if run.end_time < latest_state[job.job_id].end_time:
continue
latest_state[job.job_id] = run
summary = []
for job_id, run in latest_state.items():
summary.append({
'job_name': all_jobs[job_id].settings.name,
'last_status': run.state.result_state,
'last_finished': datetime.fromtimestamp(run.end_time/1000, timezone.utc),
'average_duration': sum(durations[job_id]) / len(durations[job_id])
})
for line in sorted(summary, key=lambda s: s['last_finished'], reverse=True):
logging.info(f'Latest: {line}')
使用 OAuth 的单一登录 (SSO)
带有 PKCE 的授权码流
对于在服务器上运行的常规 Web 应用程序,建议使用授权码流来获取访问令牌和刷新令牌。这种方法被认为是安全的,因为访问令牌直接传输到托管应用程序的服务器,而不通过用户的 Web 浏览器,从而降低泄露风险。
为了增强授权码流的安全性,可以采用PKCE(代码交换的证明密钥)机制。使用PKCE时,调用应用生成一个称为代码验证器的秘密,由授权服务器进行验证。应用还创建代码验证器的变换值,称为代码挑战,并通过HTTPS发送以获取授权码。通过拦截授权码,恶意攻击者无法在没有代码验证器的情况下将其交换为令牌。
所提供的示例样例是一个Python3脚本,它使用Flask Web框架和Databricks Python SDK来演示如何实现带有PKCE安全性的OAuth授权码流。它可以用于构建一个应用程序,每个用户都可以使用他们的身份访问Databricks资源。该脚本可以带或不带自定义OAuth应用程序的客户端和秘密凭据执行。
Databricks Python SDK公开了oauth_client.initiate_consent()
辅助函数来获取用户重定向URL并启动PKCE状态验证。应用开发者应将RefreshableCredentials
持久化在webapp会话中,并通过RefreshableCredentials.from_dict(oauth_client, session['creds'])
辅助函数恢复。
适用于AWS和Azure。目前不支持GCP。
from databricks.sdk.oauth import OAuthClient
oauth_client = OAuthClient(host='<workspace-url>',
client_id='<oauth client ID>',
redirect_url=f'http://host.domain/callback',
scopes=['clusters'])
import secrets
from flask import Flask, render_template_string, request, redirect, url_for, session
APP_NAME = 'flask-demo'
app = Flask(APP_NAME)
app.secret_key = secrets.token_urlsafe(32)
@app.route('/callback')
def callback():
from databricks.sdk.oauth import Consent
consent = Consent.from_dict(oauth_client, session['consent'])
session['creds'] = consent.exchange_callback_parameters(request.args).as_dict()
return redirect(url_for('index'))
@app.route('/')
def index():
if 'creds' not in session:
consent = oauth_client.initiate_consent()
session['consent'] = consent.as_dict()
return redirect(consent.auth_url)
from databricks.sdk import WorkspaceClient
from databricks.sdk.oauth import SessionCredentials
credentials_provider = SessionCredentials.from_dict(oauth_client, session['creds'])
workspace_client = WorkspaceClient(host=oauth_client.host,
product=APP_NAME,
credentials_provider=credentials_provider)
return render_template_string('...', w=workspace_client)
开发机上的本地脚本的SSO
对于在开发者工作站上运行的应用程序,Databricks Python SDK提供auth_type='external-browser'
实用程序,该实用程序为用户打开浏览器以通过SSO流程。Azure支持目前仍处于早期实验阶段。
from databricks.sdk import WorkspaceClient
host = input('Enter Databricks host: ')
w = WorkspaceClient(host=host, auth_type='external-browser')
clusters = w.clusters.list()
for cl in clusters:
print(f' - {cl.cluster_name} is {cl.state}')
创建自定义OAuth应用程序
为了使用Databricks Python SDK进行OAuth,您应使用account_client.custom_app_integration.create
API。
import logging, getpass
from databricks.sdk import AccountClient
account_client = AccountClient(host='https://accounts.cloud.databricks.com',
account_id=input('Databricks Account ID: '),
username=input('Username: '),
password=getpass.getpass('Password: '))
logging.info('Enrolling all published apps...')
account_client.o_auth_enrollment.create(enable_all_published_apps=True)
status = account_client.o_auth_enrollment.get()
logging.info(f'Enrolled all published apps: {status}')
custom_app = account_client.custom_app_integration.create(
name='awesome-app',
redirect_urls=[f'https://host.domain/path/to/callback'],
confidential=True)
logging.info(f'Created new custom app: '
f'--client_id {custom_app.client_id} '
f'--client_secret {custom_app.client_secret}')
用户代理请求归属
Databricks Python SDK使用User-Agent
头将请求元数据包含在每个请求中。默认情况下,这包括Python SDK的版本、应用程序使用的Python语言的版本以及底层操作系统。要静态添加额外的元数据,您可以在databricks.sdk.useragent
模块中使用with_partner()
和with_product()
函数。with_partner()
可用于合作伙伴表明使用Databricks SDK for Go的代码应归因于特定的合作伙伴。可以一次性注册多个合作伙伴。合作伙伴名称可以包含任何数字、字母、.
、-
、_
或+
。
from databricks.sdk import useragent
useragent.with_product("partner-abc")
useragent.with_partner("partner-xyz")
with_product()
可用于定义使用Databricks Python SDK构建的产品名称和版本。产品名称具有与上述合作伙伴名称相同的限制,而产品版本必须是有效的SemVer。后续对with_product()
的调用将用新的用户指定产品替换原始产品。
from databricks.sdk import useragent
useragent.with_product("databricks-example-product", "1.2.0")
如果同时定义了环境变量DATABRICKS_SDK_UPSTREAM
和DATABRICKS_SDK_UPSTREAM_VERSION
,这些也将包含在User-Agent
头中。
如果需要指定上述接口不支持的其他元数据,您可以使用with_user_agent_extra()
函数将任意键值对注册到用户代理中。允许关联与同一键的多个值。键具有与上述合作伙伴名称相同的限制。值必须是上述描述或SemVer字符串。
可以将额外的User-Agent
信息与不同的DatabricksConfig
实例关联。要向特定的DatabricksConfig
实例添加元数据,请使用with_user_agent_extra()
方法。
错误处理
Python 数据库SDK提供了强大的错误处理机制,允许开发者捕获和处理API错误。当发生错误时,SDK将抛出一个包含错误信息的异常,例如HTTP状态码、错误信息和错误详情。开发者可以在代码中捕获这些异常并进行适当处理。
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import ResourceDoesNotExist
w = WorkspaceClient()
try:
w.clusters.get(cluster_id='1234-5678-9012')
except ResourceDoesNotExist as e:
print(f'Cluster not found: {e}')
SDK处理不同服务之间错误响应的不一致性,为开发者提供一个一致的接口。只需捕获适当的异常类型并按需处理错误。Databricks API返回的错误定义在databricks/sdk/errors/platform.py。
日志记录
Python 数据库SDK无缝集成了标准的Python日志设施。这允许开发者轻松启用并自定义他们的Databricks Python项目的日志记录。要在Databricks Python项目中启用调试日志记录,可以参考以下示例
import logging, sys
logging.basicConfig(stream=sys.stderr,
level=logging.INFO,
format='%(asctime)s [%(name)s][%(levelname)s] %(message)s')
logging.getLogger('databricks.sdk').setLevel(logging.DEBUG)
from databricks.sdk import WorkspaceClient
w = WorkspaceClient(debug_truncate_bytes=1024, debug_headers=False)
for cluster in w.clusters.list():
logging.info(f'Found cluster: {cluster.cluster_name}')
在上面的代码片段中,导入了日志模块,并使用basicConfig()
方法将日志级别设置为DEBUG
。这将启用调试级别的日志记录。开发者可以根据需要调整日志级别以控制日志输出的详细程度。SDK将所有请求和响应记录到标准错误输出,使用>
表示请求,使用<
表示响应。在某些情况下,由于大小限制,请求或响应可能被截断。如果发生这种情况,日志消息将包含文本... (XXX额外元素)
以指示请求或响应已被截断。要增加截断限制,开发者可以设置debug_truncate_bytes
配置属性或DATABRICKS_DEBUG_TRUNCATE_BYTES
环境变量。为了保护敏感数据,如认证令牌、密码或任何HTTP头,SDK将在日志输出中将这些值自动替换为**REDACTED**
。开发者可以通过将debug_headers
配置属性设置为True
来禁用此红字。
2023-03-22 21:19:21,702 [databricks.sdk][DEBUG] GET /api/2.0/clusters/list
< 200 OK
< {
< "clusters": [
< {
< "autotermination_minutes": 60,
< "cluster_id": "1109-115255-s1w13zjj",
< "cluster_name": "DEFAULT Test Cluster",
< ... truncated for brevity
< },
< "... (47 additional elements)"
< ]
< }
总的来说,Python 数据库SDK提供的日志记录功能可以是一个强大的监控和故障排除工具。开发者可以使用SDK提供的各种日志记录方法和配置选项来自定义日志输出以满足他们的特定需求。
与dbutils
交互
您可以通过访问WorkspaceClient
上的dbutils
属性来使用dbutils
的客户端实现。大多数dbutils.fs
操作和dbutils.secrets
都在Databricks SDK中以Python原生方式实现。非SDK实现仍然需要一个Databricks集群,您必须通过cluster_id
配置属性或DATABRICKS_CLUSTER_ID
环境变量指定它。如果集群没有运行,不要担心:Python 数据库SDK内部调用w.clusters.ensure_cluster_is_running()
。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
dbutils = w.dbutils
files_in_root = dbutils.fs.ls('/')
print(f'number of files in root: {len(files_in_root)}')
或者,您可以从databricks.sdk.runtime
模块导入dbutils
,但您必须确保所有配置已经存在于环境变量中
from databricks.sdk.runtime import dbutils
for secret_scope in dbutils.secrets.listScopes():
for secret_metadata in dbutils.secrets.list(secret_scope.name):
print(f'found {secret_metadata.key} secret in {secret_scope.name} scope')
接口稳定性
Databricks正在积极稳定Python 数据库SDK的接口。所有服务的API客户端都是从与主平台同步的规范文件生成的。强烈建议您固定确切的依赖版本并阅读变更日志,其中Databricks记录了更改。Databricks可能会有一些已记录的向后不兼容的更改,例如将一些类型名称重命名为提高一致性。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。