SQLAlchemy方言集成到Cloud Spanner数据库
项目描述
Spanner方言为SQLAlchemy提供了一个API接口,旨在使用SQLAlchemy API控制Cloud Spanner数据库。该方言建立在Spanner DB API之上,该API遵循PEP-249。
已知的限制列在此处。所有支持的功能都已经过测试和验证,可以在测试配置中正常工作。可能存在尚未被测试的配置和数据模型变化,并表现出意外行为。请通过创建新问题报告您可能遇到的问题。
快速入门
为了使用此软件包,您首先需要完成以下步骤
安装
软件包的稳定发布版本可在PyPi上找到
pip install sqlalchemy-spanner
要安装软件包的开发版本,请克隆其Git存储库
git clone https://github.com/googleapis/python-spanner-sqlalchemy.git
然后从包的setup.py文件安装软件包
python setup.py install
在设置过程中,方言将注册到入口点。
最小应用程序
数据库URL
为了连接数据库,必须在连接创建步骤使用其URL。SQLAlchemy 1.3和1.4版本在方言前缀部分有细微差别。
# for SQLAlchemy 1.3:
spanner:///projects/project-id/instances/instance-id/databases/database-id
# for SQLAlchemy 1.4:
spanner+spanner:///projects/project-id/instances/instance-id/databases/database-id
要直接传递自定义客户端对象以供使用,请按以下方式创建引擎:
engine = create_engine(
"spanner+spanner:///projects/project-id/instances/instance-id/databases/database-id",
connect_args={'client': spanner.Client(project="project-id")}
)
创建一个表
from sqlalchemy import (
Column,
Integer,
MetaData,
String,
Table,
create_engine,
)
engine = create_engine(
"spanner:///projects/project-id/instances/instance-id/databases/database-id"
)
metadata = MetaData(bind=engine)
user = Table(
"users",
metadata,
Column("user_id", Integer, primary_key=True),
Column("user_name", String(16), nullable=False),
)
metadata.create_all(engine)
插入一行
import uuid
from sqlalchemy import (
MetaData,
Table,
create_engine,
)
engine = create_engine(
"spanner:///projects/project-id/instances/instance-id/databases/database-id"
)
user = Table("users", MetaData(bind=engine), autoload=True)
user_id = uuid.uuid4().hex[:6].lower()
with engine.begin() as connection:
connection.execute(user.insert(), {"user_id": user_id, "user_name": "Full Name"})
读取
from sqlalchemy import MetaData, Table, create_engine, select
engine = create_engine(
"spanner:///projects/project-id/instances/instance-id/databases/database-id"
)
table = Table("users", MetaData(bind=engine), autoload=True)
with engine.begin() as connection:
for row in connection.execute(select(["*"], from_obj=table)).fetchall():
print(row)
迁移
SQLAlchemy使用Alembic工具来组织数据库迁移。
Spanner方言不提供默认的迁移环境,需要用户自己编写。需要注意的是,应显式设置alembic_version表,不要将迁移修订ID用作主键。
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_pk=False, # don't use primary key in the versions table
)
由于Spanner限制修改主键值,不将version_table_pk标志设置为False可能会导致迁移问题。如果alembic_versions表已经创建了主键,将标志设置为False将不会起作用,因为标志只应用于表创建。
请注意,Spanner中的DDL语句不是事务性的。在迁移失败的情况下,它们不会被自动回滚。此外,Spanner鼓励使用autocommit_block()来防止DDL因架构修改而终止迁移事务。
功能和限制
交错表
team = Table(
"team",
metadata,
Column("team_id", Integer, primary_key=True),
Column("team_name", String(16), nullable=False),
)
team.create(engine)
client = Table(
"client",
metadata,
Column("team_id", Integer, primary_key=True),
Column("client_id", Integer, primary_key=True),
Column("client_name", String(16), nullable=False),
spanner_interleave_in="team",
spanner_interleave_on_delete_cascade=True,
)
client.add_is_dependent_on(team)
client.create(engine)
注意:交错表之间存在依赖关系,因此必须先创建父表。在创建具有此功能的表时,请确保在子表上调用add_is_dependent_on(),以请求SQLAlchemy在创建子表之前创建父表。
唯一约束
Cloud Spanner不支持直接创建唯一约束。为了实现列值的唯一性,应使用唯一索引。
而不是直接创建唯一约束
Table(
'table',
metadata,
Column('col1', Integer),
UniqueConstraint('col1', name='uix_1')
)
创建一个唯一索引
Table(
'table',
metadata,
Column('col1', Integer),
Index("uix_1", "col1", unique=True),
)
自动提交模式
Spanner方言支持SERIALIZABLE和AUTOCOMMIT隔离级别。SERIALIZABLE是默认级别,其中事务需要手动提交。AUTOCOMMIT模式对应于在查询执行时自动提交查询。
隔离级别更改示例
from sqlalchemy import create_engine
eng = create_engine("spanner:///projects/project-id/instances/instance-id/databases/database-id")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
自动事务重试
在默认的SERIALIZABLE模式下,事务可能会因Aborted异常而失败。这是一种暂时性错误,通常发生在防止并发修改导致数据损坏的情况下。尽管原始事务变得不可操作,但简单的查询重试可以解决问题。
然而,这可能需要手动重复执行一系列操作,这些操作是在失败的交易中执行的。为了简化,Spanner连接API跟踪当前交易中执行的所有操作及其结果校验和。如果交易因Aborted异常而失败,连接API将自动启动一个新交易,并重新运行所有跟踪的操作,检查其结果是否与原始交易中的结果相同。如果数据已更改且结果不同,则交易将被丢弃,因为没有方法可以自动重试。
在AUTOCOMMIT模式下,自动事务重试机制被禁用,因为每个操作都是即时提交的,并且不会发生Aborted异常。
自增ID
由于性能原因,Cloud Spanner不支持自增ID机制(请参阅更多详情)。我们建议您使用Python的uuid模块来生成主键字段,以避免创建单调递增的键。
尽管不鼓励这样做,但如果您确实需要此功能,可以手动模拟如下
with engine.begin() as connection:
top_id = connection.execute(
select([user.c.user_id]).order_by(user.c.user_id.desc()).limit(1)
).fetchone()
next_id = top_id[0] + 1 if top_id else 1
connection.execute(user.insert(), {"user_id": next_id})
查询提示
Spanner方言支持查询提示,这提供了设置额外查询执行参数的能力。用法示例
session = Session(engine)
Base = declarative_base()
class User(Base):
"""Data model."""
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50))
query = session.query(User)
query = query.with_hint(
selectable=User, text="@{FORCE_INDEX=index_name}"
)
query = query.filter(User.name.in_(["val1", "val2"]))
query.statement.compile(session.bind)
只读事务
默认情况下,由Spanner连接产生的交易处于读写模式。然而,某些应用程序需要授予用户/方法只读访问权限的能力;在这些情况下,Spanner方言支持read_only执行选项,该选项将连接切换到只读模式
with engine.connect().execution_options(read_only=True) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
请注意,执行选项是按需应用的 - 在execute()方法调用之前。
在事务进行时无法更改连接的只读/读写模式 - 您必须首先提交或回滚它。
陈旧读取
要使用Spanner的陈旧读取与SQLAlchemy一起,您可以调整连接执行选项,以获得期望的陈旧值。例如
# maximum staleness
with engine.connect().execution_options(
read_only=True,
staleness={"max_staleness": datetime.timedelta(seconds=5)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
# exact staleness
with engine.connect().execution_options(
read_only=True,
staleness={"exact_staleness": datetime.timedelta(seconds=5)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
# min read timestamp
with engine.connect().execution_options(
read_only=True,
staleness={"min_read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
# read timestamp
with engine.connect().execution_options(
read_only=True,
staleness={"read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)}
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
请注意,在将连接返回到连接池时,将丢弃设置的选项。
请求优先级
为了在Cloud Spanner中使用请求优先级功能,SQLAlchemy提供了一个execution_options参数
from google.cloud.spanner_v1 import RequestOptions
with engine.connect().execution_options(
request_priority=RequestOptions.Priority.PRIORITY_MEDIUM
) as connection:
connection.execute(select(["*"], from_obj=table)).fetchall()
DDL和事务
DDL语句是在常规事务机制之外执行的,这意味着DDL语句在正常事务回滚时不会回滚。
删除表
默认情况下,Cloud Spanner不删除具有二级索引和/或外键约束的表。然而,在Spanner方言为SQLAlchemy中,这个限制被省略了 - 如果您尝试删除的表有索引/外键,它们将在删除表之前自动删除。
数据类型
数据类型表映射SQLAlchemy类型到Cloud Spanner类型
SQLAlchemy |
Spanner |
---|---|
INTEGER |
INT64 |
BIGINT |
INT64 |
DECIMAL |
NUMERIC |
FLOAT |
FLOAT64 |
TEXT |
STRING |
ARRAY |
ARRAY |
BINARY |
BYTES |
VARCHAR |
STRING |
CHAR |
STRING |
BOOLEAN |
BOOL |
DATETIME |
TIMESTAMP |
NUMERIC |
NUMERIC |
其他限制
不支持WITH RECURSIVE语句。
不支持命名架构。
不支持临时表。
数值类型维度(缩放和精度)是常量。请参阅文档。
最佳实践
当调用 SQLAlchemy 函数时,会建立一个数据库的新连接并获取 Spanner 会话对象。在无连接执行的情况下,这些获取操作在每次 execute() 调用时都会执行,这可能导致显著的延迟。为了避免在每次 execute() 调用时启动 Spanner 会话,建议以连接绑定方式编写代码。一旦显式启动 Connection() 对象,它将获取 Spanner 会话对象并用于此 Connection() 对象上的所有后续调用。
非最优的无连接使用
# execute() is called on object, which is not a Connection() object
insert(user).values(user_id=1, user_name="Full Name").execute()
最优的连接绑定使用
with engine.begin() as connection:
# execute() is called on a Connection() object
connection.execute(user.insert(), {"user_id": 1, "user_name": "Full Name"})
自 SQLAlchemy 2.0 开始,无连接的使用方式已被弃用,并将很快被移除(请参阅 SQLAlchemy 文档)。
运行测试
Spanner 方言包含一个合规性、迁移和单元测试套件。要运行测试,可以使用 nox 包命令。
# Run the whole suite $ nox # Run a particular test session $ nox -s migration_test
在 Spanner 模拟器上运行测试
当在模拟器上执行时,方言测试套件可以在 Spanner 模拟器 上运行。一些与数据类型的 NULL 值相关的测试会被跳过。
贡献
欢迎并向本库贡献。请报告问题、提交功能请求和发送拉取请求。有关如何开始的信息,请参阅 CONTRIBUTING。
请注意,该项目不是作为 Cloud Spanner 产品的一部分由 Google 正式支持的。
请注意,该项目以贡献者行为准则发布。通过参与本项目,您同意遵守其条款。有关更多信息,请参阅 行为准则。
项目详情
下载文件
为您的平台下载文件。如果您不确定要选择哪一个,请了解更多关于 安装包 的信息。