跳转到主要内容

SQLAlchemy方言集成到Cloud Spanner数据库

项目描述

Spanner方言为SQLAlchemy提供了一个API接口,旨在使用SQLAlchemy API控制Cloud Spanner数据库。该方言建立在Spanner DB API之上,该API遵循PEP-249

已知的限制列在此处。所有支持的功能都已经过测试和验证,可以在测试配置中正常工作。可能存在尚未被测试的配置和数据模型变化,并表现出意外行为。请通过创建新问题报告您可能遇到的问题。

快速入门

为了使用此软件包,您首先需要完成以下步骤

  1. 选择或创建一个云平台项目。

  2. 为您的项目启用计费。

  3. 启用Google Cloud Spanner API。

  4. 设置身份验证。

安装

软件包的稳定发布版本可在PyPi上找到

pip install sqlalchemy-spanner

要安装软件包的开发版本,请克隆其Git存储库

git clone https://github.com/googleapis/python-spanner-sqlalchemy.git

然后从包的setup.py文件安装软件包

python setup.py install

在设置过程中,方言将注册到入口点。

最小应用程序

数据库URL

为了连接数据库,必须在连接创建步骤使用其URL。SQLAlchemy 1.3和1.4版本在方言前缀部分有细微差别。

# for SQLAlchemy 1.3:
spanner:///projects/project-id/instances/instance-id/databases/database-id

# for SQLAlchemy 1.4:
spanner+spanner:///projects/project-id/instances/instance-id/databases/database-id

要直接传递自定义客户端对象以供使用,请按以下方式创建引擎:

engine = create_engine(
    "spanner+spanner:///projects/project-id/instances/instance-id/databases/database-id",
    connect_args={'client': spanner.Client(project="project-id")}
)

创建一个表

from sqlalchemy import (
    Column,
    Integer,
    MetaData,
    String,
    Table,
    create_engine,
)

engine = create_engine(
    "spanner:///projects/project-id/instances/instance-id/databases/database-id"
)
metadata = MetaData(bind=engine)

user = Table(
    "users",
    metadata,
    Column("user_id", Integer, primary_key=True),
    Column("user_name", String(16), nullable=False),
)

metadata.create_all(engine)

插入一行

import uuid

from sqlalchemy import (
    MetaData,
    Table,
    create_engine,
)

engine = create_engine(
    "spanner:///projects/project-id/instances/instance-id/databases/database-id"
)
user = Table("users", MetaData(bind=engine), autoload=True)
user_id = uuid.uuid4().hex[:6].lower()

with engine.begin() as connection:
    connection.execute(user.insert(), {"user_id": user_id, "user_name": "Full Name"})

读取

from sqlalchemy import MetaData, Table, create_engine, select

engine = create_engine(
    "spanner:///projects/project-id/instances/instance-id/databases/database-id"
)
table = Table("users", MetaData(bind=engine), autoload=True)

with engine.begin() as connection:
    for row in connection.execute(select(["*"], from_obj=table)).fetchall():
        print(row)

迁移

SQLAlchemy使用Alembic工具来组织数据库迁移。

Spanner方言不提供默认的迁移环境,需要用户自己编写。需要注意的是,应显式设置alembic_version表,不要将迁移修订ID用作主键。

with connectable.connect() as connection:
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        version_table_pk=False,  # don't use primary key in the versions table
    )

由于Spanner限制修改主键值,不将version_table_pk标志设置为False可能会导致迁移问题。如果alembic_versions表已经创建了主键,将标志设置为False将不会起作用,因为标志只应用于表创建。

请注意,Spanner中的DDL语句不是事务性的。在迁移失败的情况下,它们不会被自动回滚。此外,Spanner鼓励使用autocommit_block()来防止DDL因架构修改而终止迁移事务。

警告!
迁移脚本可以生成大量的DDL语句。如果每个语句都单独执行,可能会出现性能问题。为了避免这种情况,强烈建议使用Alembic批处理上下文功能,将DDL语句打包成语句组。

功能和限制

交错表

Cloud Spanner方言为Table构造函数包含两个特定于方言的参数,这些参数有助于定义交错关系:spanner_interleave_in - 父表名spanner_inverleave_on_delete_cascade - 一个标志,指定是否必须使用ON DELETE CASCADE语句来指定交错关系
交错关系定义的示例
team = Table(
    "team",
    metadata,
    Column("team_id", Integer, primary_key=True),
    Column("team_name", String(16), nullable=False),
)
team.create(engine)

client = Table(
    "client",
    metadata,
    Column("team_id", Integer, primary_key=True),
    Column("client_id", Integer, primary_key=True),
    Column("client_name", String(16), nullable=False),
    spanner_interleave_in="team",
    spanner_interleave_on_delete_cascade=True,
)
client.add_is_dependent_on(team)

client.create(engine)

注意:交错表之间存在依赖关系,因此必须先创建父表。在创建具有此功能的表时,请确保在子表上调用add_is_dependent_on(),以请求SQLAlchemy在创建子表之前创建父表。

唯一约束

Cloud Spanner不支持直接创建唯一约束。为了实现列值的唯一性,应使用唯一索引。

而不是直接创建唯一约束

Table(
    'table',
    metadata,
    Column('col1', Integer),
    UniqueConstraint('col1', name='uix_1')
)

创建一个唯一索引

Table(
    'table',
    metadata,
    Column('col1', Integer),
    Index("uix_1", "col1", unique=True),
)

自动提交模式

Spanner方言支持SERIALIZABLEAUTOCOMMIT隔离级别。SERIALIZABLE是默认级别,其中事务需要手动提交。AUTOCOMMIT模式对应于在查询执行时自动提交查询。

隔离级别更改示例

from sqlalchemy import create_engine

eng = create_engine("spanner:///projects/project-id/instances/instance-id/databases/database-id")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

自动事务重试

在默认的SERIALIZABLE模式下,事务可能会因Aborted异常而失败。这是一种暂时性错误,通常发生在防止并发修改导致数据损坏的情况下。尽管原始事务变得不可操作,但简单的查询重试可以解决问题。

然而,这可能需要手动重复执行一系列操作,这些操作是在失败的交易中执行的。为了简化,Spanner连接API跟踪当前交易中执行的所有操作及其结果校验和。如果交易因Aborted异常而失败,连接API将自动启动一个新交易,并重新运行所有跟踪的操作,检查其结果是否与原始交易中的结果相同。如果数据已更改且结果不同,则交易将被丢弃,因为没有方法可以自动重试。

AUTOCOMMIT模式下,自动事务重试机制被禁用,因为每个操作都是即时提交的,并且不会发生Aborted异常。

自增ID

由于性能原因,Cloud Spanner不支持自增ID机制(请参阅更多详情)。我们建议您使用Python的uuid模块来生成主键字段,以避免创建单调递增的键。

尽管不鼓励这样做,但如果您确实需要此功能,可以手动模拟如下

with engine.begin() as connection:
    top_id = connection.execute(
        select([user.c.user_id]).order_by(user.c.user_id.desc()).limit(1)
    ).fetchone()
    next_id = top_id[0] + 1 if top_id else 1

    connection.execute(user.insert(), {"user_id": next_id})

查询提示

Spanner方言支持查询提示,这提供了设置额外查询执行参数的能力。用法示例

session = Session(engine)

Base = declarative_base()

class User(Base):
    """Data model."""

    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(50))


query = session.query(User)
query = query.with_hint(
    selectable=User, text="@{FORCE_INDEX=index_name}"
)
query = query.filter(User.name.in_(["val1", "val2"]))
query.statement.compile(session.bind)

只读事务

默认情况下,由Spanner连接产生的交易处于读写模式。然而,某些应用程序需要授予用户/方法只读访问权限的能力;在这些情况下,Spanner方言支持read_only执行选项,该选项将连接切换到只读模式

with engine.connect().execution_options(read_only=True) as connection:
    connection.execute(select(["*"], from_obj=table)).fetchall()

请注意,执行选项是按需应用的 - 在execute()方法调用之前。

在事务进行时无法更改连接的只读/读写模式 - 您必须首先提交或回滚它。

陈旧读取

要使用Spanner的陈旧读取与SQLAlchemy一起,您可以调整连接执行选项,以获得期望的陈旧值。例如

# maximum staleness
with engine.connect().execution_options(
    read_only=True,
    staleness={"max_staleness": datetime.timedelta(seconds=5)}
) as connection:
    connection.execute(select(["*"], from_obj=table)).fetchall()
# exact staleness
with engine.connect().execution_options(
    read_only=True,
    staleness={"exact_staleness": datetime.timedelta(seconds=5)}
) as connection:
    connection.execute(select(["*"], from_obj=table)).fetchall()
# min read timestamp
with engine.connect().execution_options(
    read_only=True,
    staleness={"min_read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
) as connection:
    connection.execute(select(["*"], from_obj=table)).fetchall()
# read timestamp
with engine.connect().execution_options(
    read_only=True,
    staleness={"read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
) as connection:
    connection.execute(select(["*"], from_obj=table)).fetchall()

请注意,在将连接返回到连接池时,将丢弃设置的选项。

请求优先级

为了在Cloud Spanner中使用请求优先级功能,SQLAlchemy提供了一个execution_options参数

from google.cloud.spanner_v1 import RequestOptions

with engine.connect().execution_options(
    request_priority=RequestOptions.Priority.PRIORITY_MEDIUM
) as connection:
    connection.execute(select(["*"], from_obj=table)).fetchall()

DDL和事务

DDL语句是在常规事务机制之外执行的,这意味着DDL语句在正常事务回滚时不会回滚。

删除表

默认情况下,Cloud Spanner不删除具有二级索引和/或外键约束的表。然而,在Spanner方言为SQLAlchemy中,这个限制被省略了 - 如果您尝试删除的表有索引/外键,它们将在删除表之前自动删除。

数据类型

数据类型表映射SQLAlchemy类型到Cloud Spanner类型

SQLAlchemy

Spanner

INTEGER

INT64

BIGINT

INT64

DECIMAL

NUMERIC

FLOAT

FLOAT64

TEXT

STRING

ARRAY

ARRAY

BINARY

BYTES

VARCHAR

STRING

CHAR

STRING

BOOLEAN

BOOL

DATETIME

TIMESTAMP

NUMERIC

NUMERIC

其他限制

  • 不支持WITH RECURSIVE语句。

  • 不支持命名架构。

  • 不支持临时表。

  • 数值类型维度(缩放和精度)是常量。请参阅文档

最佳实践

当调用 SQLAlchemy 函数时,会建立一个数据库的新连接并获取 Spanner 会话对象。在无连接执行的情况下,这些获取操作在每次 execute() 调用时都会执行,这可能导致显著的延迟。为了避免在每次 execute() 调用时启动 Spanner 会话,建议以连接绑定方式编写代码。一旦显式启动 Connection() 对象,它将获取 Spanner 会话对象并用于此 Connection() 对象上的所有后续调用。

非最优的无连接使用

# execute() is called on object, which is not a Connection() object
insert(user).values(user_id=1, user_name="Full Name").execute()

最优的连接绑定使用

with engine.begin() as connection:
    # execute() is called on a Connection() object
    connection.execute(user.insert(), {"user_id": 1, "user_name": "Full Name"})

自 SQLAlchemy 2.0 开始,无连接的使用方式已被弃用,并将很快被移除(请参阅 SQLAlchemy 文档)。

运行测试

Spanner 方言包含一个合规性、迁移和单元测试套件。要运行测试,可以使用 nox 包命令。

# Run the whole suite
$ nox

# Run a particular test session
$ nox -s migration_test

在 Spanner 模拟器上运行测试

当在模拟器上执行时,方言测试套件可以在 Spanner 模拟器 上运行。一些与数据类型的 NULL 值相关的测试会被跳过。

贡献

欢迎并向本库贡献。请报告问题、提交功能请求和发送拉取请求。有关如何开始的信息,请参阅 CONTRIBUTING

请注意,该项目不是作为 Cloud Spanner 产品的一部分由 Google 正式支持的。

请注意,该项目以贡献者行为准则发布。通过参与本项目,您同意遵守其条款。有关更多信息,请参阅 行为准则

项目详情


下载文件

为您的平台下载文件。如果您不确定要选择哪一个,请了解更多关于 安装包 的信息。

源分布

sqlalchemy_spanner-1.7.0.tar.gz (77.8 kB 查看哈希值)

上传时间

构建分布

sqlalchemy_spanner-1.7.0-py3-none-any.whl (27.7 kB 查看哈希值)

上传时间 Python 3

由以下机构支持