跳转到主要内容

BigQuery的SQLAlchemy方言

项目描述

GA pypi versions

SQLAlchemy 方言

快速入门

为了使用这个库,您首先需要完成以下步骤

  1. 选择或创建一个云平台项目。

  2. [可选] 为您的项目启用计费。

  3. 启用 BigQuery 存储API。

  4. 设置身份验证。

安装

使用 pip 在 virtualenv 中安装此库。 virtualenv 是一个用于创建隔离的 Python 环境的工具。它解决的基本问题是依赖项和版本,以及间接的权限。

使用 virtualenv,您可以在不需要系统安装权限的情况下安装此库,并且不会与已安装的系统依赖项冲突。

支持的 Python 版本

Python >= 3.8

不支持的 Python 版本

Python <= 3.7.

Mac/Linux

pip install virtualenv
virtualenv <your-env>
source <your-env>/bin/activate
<your-env>/bin/pip install sqlalchemy-bigquery

Windows

pip install virtualenv
virtualenv <your-env>
<your-env>\Scripts\activate
<your-env>\Scripts\pip.exe install sqlalchemy-bigquery

处理大型数据集时的安装

当处理大型数据集时,您还可以通过安装 bqstorage 依赖项来提高速度。请参阅上述关于创建虚拟环境说明,然后使用 bqstorage 扩展安装 sqlalchemy-bigquery

source <your-env>/bin/activate
<your-env>/bin/pip install sqlalchemy-bigquery[bqstorage]

用法

SQLAlchemy

from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *
engine = create_engine('bigquery://project')
table = Table('dataset.table', MetaData(bind=engine), autoload=True)
print(select([func.count('*')], from_obj=table().scalar()))

项目

project in bigquery://project 用于使用特定的项目 ID 实例化 BigQuery 客户端。要从环境中推断项目,请使用 bigquery:// – 不包含 project

身份验证

遵循 Google Cloud 库指南 进行身份验证。

或者,您可以选择以下两种方法中的任意一种

  • create_engine() 中提供服务帐户 JSON 文件的路径,使用 credentials_path 参数

# provide the path to a service account JSON file
engine = create_engine('bigquery://', credentials_path='/path/to/keyfile.json')
  • create_engine() 中作为 Python 字典传递凭据,使用 credentials_info 参数

# provide credentials as a Python dictionary
credentials_info = {
    "type": "service_account",
    "project_id": "your-service-account-project-id"
},
engine = create_engine('bigquery://', credentials_info=credentials_info)

位置

要指定数据集的位置,请将 location 传递给 create_engine()

engine = create_engine('bigquery://project', location="asia-northeast1")

表名

要从非默认项目或数据集中查询表,请使用以下格式的 SQLAlchemy 模式名称: [project.]dataset,例如

# If neither dataset nor project are the default
sample_table_1 = Table('natality', schema='bigquery-public-data.samples')
# If just dataset is not the default
sample_table_2 = Table('natality', schema='bigquery-public-data')

批量大小

默认情况下,arraysize 设置为 5000arraysize 用于设置获取结果时的批量大小。要更改它,请将 arraysize 传递给 create_engine()

engine = create_engine('bigquery://project', arraysize=1000)

数据集.list_tables 的页面大小

默认情况下,list_tables_page_size 设置为 1000list_tables_page_size 用于设置 dataset.list_tables 操作的最大结果数。要更改它,请将 list_tables_page_size 传递给 create_engine()

engine = create_engine('bigquery://project', list_tables_page_size=100)

添加默认数据集

如果您想让 Client 使用默认数据集,请将其指定为连接字符串的“数据库”部分。

engine = create_engine('bigquery://project/dataset')

当使用默认数据集时,不要在表名中包含数据集名称,例如

table = Table('table_name')

请注意,指定默认数据集不会限制使用原始查询时查询的执行范围到该特定数据集,例如

# Set default dataset to dataset_a
engine = create_engine('bigquery://project/dataset_a')

# This will still execute and return rows from dataset_b
engine.execute('SELECT * FROM dataset_b.table').fetchall()

连接字符串参数

存在许多情况,您不能直接调用 create_engine,例如在使用类似 Flask SQLAlchemy 的工具时。对于这种情况,或者当您希望 Client 有一个 default_query_job_config 时,您可以在连接字符串的查询中传递许多参数。

此库使用 credentials_pathcredentials_infocredentials_base64locationarraysizelist_tables_page_size 参数,其余参数用于创建 QueryJobConfig

请注意,如果您想使用查询字符串,使用三个斜杠会更加可靠,因此 'bigquery:///?a=b' 将可靠地工作,但 'bigquery://?a=b' 可能会被解释为具有 ?a=b 的“数据库”,这取决于解析连接字符串的系统。

以下是所有受支持参数的示例。任何不存在的参数要么是针对旧版 sql(该库不支持),要么过于复杂而未实现。

engine = create_engine(
    'bigquery://some-project/some-dataset' '?'
    'credentials_path=/some/path/to.json' '&'
    'location=some-location' '&'
    'arraysize=1000' '&'
    'list_tables_page_size=100' '&'
    'clustering_fields=a,b,c' '&'
    'create_disposition=CREATE_IF_NEEDED' '&'
    'destination=different-project.different-dataset.table' '&'
    'destination_encryption_configuration=some-configuration' '&'
    'dry_run=true' '&'
    'labels=a:b,c:d' '&'
    'maximum_bytes_billed=1000' '&'
    'priority=INTERACTIVE' '&'
    'schema_update_options=ALLOW_FIELD_ADDITION,ALLOW_FIELD_RELAXATION' '&'
    'use_query_cache=true' '&'
    'write_disposition=WRITE_APPEND'
)

如果您希望在连接 URI 中包含完整的凭据,可以将凭据 JSON 文件进行 base64 编码,并将编码后的字符串提供给 credentials_base64 参数。

engine = create_engine(
    'bigquery://some-project/some-dataset' '?'
    'credentials_base64=eyJrZXkiOiJ2YWx1ZSJ9Cg==' '&'
    'location=some-location' '&'
    'arraysize=1000' '&'
    'list_tables_page_size=100' '&'
    'clustering_fields=a,b,c' '&'
    'create_disposition=CREATE_IF_NEEDED' '&'
    'destination=different-project.different-dataset.table' '&'
    'destination_encryption_configuration=some-configuration' '&'
    'dry_run=true' '&'
    'labels=a:b,c:d' '&'
    'maximum_bytes_billed=1000' '&'
    'priority=INTERACTIVE' '&'
    'schema_update_options=ALLOW_FIELD_ADDITION,ALLOW_FIELD_RELAXATION' '&'
    'use_query_cache=true' '&'
    'write_disposition=WRITE_APPEND'
)

要创建 base64 编码的字符串,您可以使用命令行工具 base64openssl base64python -m base64

或者,您可以使用在线生成器(如 www.base64encode.org <https://www.base64encode.org>)粘贴您的凭据 JSON 文件进行编码。

提供您自己的 BigQuery 客户端

上述连接字符串参数允许您影响用于执行查询的 BigQuery 客户端实例的创建方式。如果您需要额外的控制,您可以提供您自己的 BigQuery 客户端。

from google.cloud import bigquery

custom_bq_client = bigquery.Client(...)

engine = create_engine(
    'bigquery://some-project/some-dataset?user_supplied_client=True',
        connect_args={'client': custom_bq_client},
)

创建表

为表添加元数据

table = Table('mytable', ...,
    bigquery_description='my table description',
    bigquery_friendly_name='my table friendly name',
    bigquery_default_rounding_mode="ROUND_HALF_EVEN",
    bigquery_expiration_timestamp=datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00"),
)

为列添加元数据

Column('mycolumn', doc='my column description')

创建一个分组的表

table = Table('mytable', ..., bigquery_clustering_fields=["a", "b", "c"])

创建一个以时间单位分区的表

from google.cloud import bigquery

table = Table('mytable', ...,
    bigquery_time_partitioning=bigquery.TimePartitioning(
        field="mytimestamp",
        type_="MONTH",
        expiration_ms=1000 * 60 * 60 * 24 * 30 * 6, # 6 months
    ),
    bigquery_require_partition_filter=True,
)

创建一个以数据加载时间分区的表

from google.cloud import bigquery

table = Table('mytable', ...,
    bigquery_time_partitioning=bigquery.TimePartitioning(),
    bigquery_require_partition_filter=True,
)

创建一个整数范围分区的表

from google.cloud import bigquery

table = Table('mytable', ...,
    bigquery_range_partitioning=bigquery.RangePartitioning(
        field="zipcode",
        range_=bigquery.PartitionRange(start=0, end=100000, interval=10),
    ),
    bigquery_require_partition_filter=True,
)

线程和进程

由于此客户端使用 grpc 库,因此可以在线程之间安全地共享实例。

在进程池的情况下,最佳实践是在 multiprocessing.pool.Poolmultiprocessing.Processos.fork 调用之后创建客户端实例。

项目详细信息


下载文件

下载适用于您平台的文件。如果您不确定选择哪一个,请了解更多关于安装包的信息。

源代码分发

sqlalchemy_bigquery-1.12.0.tar.gz (114.0 kB 查看散列值)

上传时间 源代码

构建分发

sqlalchemy_bigquery-1.12.0-py2.py3-none-any.whl (38.3 kB 查看散列值)

上传时间 Python 2 Python 3

支持者