跳转到主要内容

一个用于安全连接到您的Google Cloud AlloyDB实例的Python客户端库。

项目描述

alloydb-python-connector image

AlloyDB Python Connector

CI pypi python

AlloyDB Python Connector 是一个用于与Python语言一起使用的AlloyDB连接器库。

使用AlloyDB连接器提供以下好处

  • IAM授权:使用IAM权限控制谁/什么可以连接到您的AlloyDB实例

  • 增强安全性:使用强大、更新的TLS 1.3加密和客户端连接器与服务器端代理之间的身份验证,独立于数据库协议。

  • 方便性:无需使用和分发SSL证书,也无需管理防火墙或源/目标IP地址。

  • (可选) IAM数据库身份验证:支持AlloyDB的自动IAM数据库身份验证功能。

AlloyDB Python Connector是一个与数据库驱动程序一起使用的软件包。目前支持的驱动程序有

安装

您可以使用pip install安装此库

pg8000

pip install "google-cloud-alloydb-connector[pg8000]"

请参阅同步驱动程序使用以获取详细信息。

asyncpg

pip install "google-cloud-alloydb-connector[asyncpg]"

请参阅异步驱动程序使用以获取详细信息。

APIs 和服务

此包需要以下内容才能成功连接:

  • 具有AlloyDB 客户端角色或等效角色的 IAM 实体(用户、服务帐户等)。使用 IAM 实体的凭证来授权连接到 AlloyDB 实例。

  • 在您的 Google Cloud 项目中启用AlloyDB API。默认情况下,API 将在关联 IAM 实体的项目中调用。

凭证

此库使用应用程序默认凭证(ADC)策略来解决凭证。请参阅这些说明(Google Cloud 应用程序与本地开发、IAM 用户与服务帐户凭证)或查阅google.auth包。

用法

此包提供了一些用于授权和加密连接的函数。这些函数与您的数据库驱动程序一起使用以连接到您的 AlloyDB 实例。

AlloyDB 支持通过公共 IP 地址和私有、内部 IP 地址以及私有服务连接(PSC)进行网络连接。默认情况下,此包将尝试通过私有 IP 连接进行连接。这样做时,此包必须在连接到托管您的 AlloyDB 私有 IP 地址的VPC 网络的环境中运行。

有关更多详细信息,请参阅配置 AlloyDB 连接

同步驱动程序使用

要使用连接器连接到 AlloyDB,请初始化一个 Connector 对象,并使用正确的输入参数调用其 connect 方法。

Connector 本身通过调用其 connect 方法创建数据库连接对象,但不管理数据库连接池。因此,建议您与可以创建连接池的库(例如SQLAlchemy)一起使用连接器。这将允许连接保持打开并重用,从而减少连接开销和所需的连接数量。

在以下 Connectorconnect 方法中,将您的 AlloyDB 实例 URI 作为第一个位置参数,将数据库驱动程序的名称作为第二个位置参数。插入其余的连接关键字参数,如 userpassworddb 等。

要使用 SQLAlchemy 与此连接器一起使用,请使用 creator 参数为 sqlalchemy.create_engine

from google.cloud.alloydb.connector import Connector
import sqlalchemy

# initialize Connector object
connector = Connector()

# function to return the database connection
def getconn():
    conn = connector.connect(
        "projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
        "pg8000",
        user="my-user",
        password="my-password",
        db="my-db-name"
    )
    return conn

# create connection pool
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

然后,可以使用返回的连接池引擎进行查询和修改数据库。

# insert statement
insert_stmt = sqlalchemy.text(
    "INSERT INTO my_table (id, title) VALUES (:id, :title)",
)

with pool.connect() as db_conn:
    # insert into database
    db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})

    # query database
    result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()

    # commit transaction (SQLAlchemy v2.X.X is commit as you go)
    db_conn.commit()

    # Do something with the results
    for row in result:
        print(row)

要关闭 Connector 对象的后台资源,请按如下方式调用其 close() 方法

connector.close()

同步上下文管理器

Connector 对象还可以用作上下文管理器,以自动关闭和清理资源,无需显式调用 connector.close()

连接器作为上下文管理器

from google.cloud.alloydb.connector import Connector
import sqlalchemy

# helper function to return SQLAlchemy connection pool
def init_connection_pool(connector: Connector) -> sqlalchemy.engine.Engine:
    # function used to generate database connection
    def getconn():
        conn = connector.connect(
            "projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
            "pg8000",
            user="my-user",
            password="my-password",
            db="my-db-name"
        )
        return conn

    # create connection pool
    pool = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )
    return pool

# initialize Connector as context manager
with Connector() as connector:
    # initialize connection pool
    pool = init_connection_pool(connector)
    # insert statement
    insert_stmt = sqlalchemy.text(
        "INSERT INTO my_table (id, title) VALUES (:id, :title)",
    )
    
    # interact with AlloyDB database using connection pool
    with pool.connect() as db_conn:
        # insert into database
        db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})
    
        # commit transaction (SQLAlchemy v2.X.X is commit as you go)
        db_conn.commit()
    
        # query database
        result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()
    
        # Do something with the results
        for row in result:
            print(row)

异步驱动程序使用

AlloyDB 连接器与 asyncio 兼容,以提高数据库连接的速度和效率,通过并发性进行改进。《AsyncConnector》目前支持以下 asyncio 数据库驱动程序:

import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
    # initialize Connector object for connections to AlloyDB
    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect(
            "projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
            "asyncpg",
            user="my-user",
            password="my-password",
            db="my-db-name"
            # ... additional database driver args
        )
        return conn

    # The AlloyDB Python Connector can be used along with SQLAlchemy using the
    # 'async_creator' argument to 'create_async_engine'
    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
    )
    return pool

async def main():
    connector = AsyncConnector()

    # initialize connection pool
    pool = await init_connection_pool(connector)

    # example query
    async with pool.connect() as conn:
        await conn.execute(sqlalchemy.text("SELECT NOW()"))

    # dispose of connection pool
    await pool.dispose()

    # close Connector
    await connector.close()

有关 asyncpg.Connection 的附加参数的更多详细信息,请访问官方文档

异步上下文管理器

AsyncConnector 还可以用作异步上下文管理器,无需显式调用 connector.close() 来清理资源。

import asyncio
import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
    # initialize Connector object for connections to AlloyDB
    async def getconn() -> asyncpg.Connection:
            conn: asyncpg.Connection = await connector.connect(
                "projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
                "asyncpg",
                user="my-user",
                password="my-password",
                db="my-db-name"
                # ... additional database driver args
            )
            return conn

    # The AlloyDB Python Connector can be used along with SQLAlchemy using the
    # 'async_creator' argument to 'create_async_engine'
    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
    )
    return pool

async def main():
    # initialize Connector object for connections to AlloyDB
    async with AsyncConnector() as connector:
        # initialize connection pool
        pool = await init_connection_pool(connector)

        # example query
        async with pool.connect() as conn:
            await conn.execute(sqlalchemy.text("SELECT NOW()"))

        # dispose of connection pool
        await pool.dispose()

自动 IAM 数据库身份验证

Python 连接器支持自动 IAM 数据库身份验证

请确保配置您的 AlloyDB 实例以允许 IAM 身份验证(启用 IAM 身份验证),并(添加 IAM 数据库用户)

可以使用将 enable_iam_auth 参数设置为 True 的自动 IAM 数据库身份验证来配置 ConnectorAsyncConnector 以连接到 AlloyDB 实例。

当配置用于 IAM 身份验证的 connector.connect 调用时,可以省略 password 字段,而 user 字段应格式如下:

  • 对于 IAM 用户帐户,这是用户的电子邮件地址。
  • 对于服务帐户,这是不带 .gserviceaccount.com 域后缀的服务帐户的电子邮件。

例如,要使用服务帐户 test-sa@test-project.iam.gserviceaccount.com 进行 IAM 身份验证连接

connector.connect(
    "projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
    "pg8000",  # asyncpg for AsyncConnector
    user="test-sa@test-project.iam",
    db="my-db-name",
    enable_iam_auth=True,
)

配置懒加载刷新(Cloud Run、Cloud Functions 等)

可以将连接器的 refresh_strategy 参数设置为 "lazy",以配置 Python 连接器按需懒惰检索连接信息。否则,将运行后台刷新周期以定期检索连接信息。此设置在 CPU 可能超出请求上下文的环境(例如 Cloud Run、Cloud Functions 等)中很有用。

要设置刷新策略,初始化 Connector 时设置 refresh_strategy 关键字参数。

connector = Connector(refresh_strategy="lazy")

指定 IP 地址类型

AlloyDB Python 连接器默认会尝试连接到实例的私有 IP。要更改此设置,例如通过公共 IP 地址或私有服务连接 (PSC) 连接到 AlloyDB,请在初始化 Connector() 或调用 connector.connect() 时设置 ip_type 关键字参数。

ip_type 的可能值是 "PRIVATE"(默认值)、"PUBLIC""PSC"

示例

from google.cloud.alloydb.connector import Connector

import sqlalchemy

# initialize Connector object
connector = Connector()

# function to return the database connection
def getconn():
  return connector.connect(
    "projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
    "pg8000",
    user="my-user",
    password="my-password",
    db="my-db-name",
    ip_type="PUBLIC",  # use public IP
  )

# create connection pool
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

# use connection pool...
connector.close()

调试日志

AlloyDB Python 连接器使用标准的 Python 日志模块 以支持调试日志。

将以下代码添加到您的应用程序中,以启用与 AlloyDB Python 连接器的调试日志

import logging

logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
logger = logging.getLogger(name="google.cloud.alloydb.connector")
logger.setLevel(logging.DEBUG)

有关配置日志的更多详细信息,请参阅Python 日志文档

支持策略

主要版本生命周期

本项目使用语义版本控制,并采用以下生命周期来支持主要版本

活跃 - 活跃版本将获得所有新功能和安全修复(不会引入破坏性更改)。新主要版本保证至少活跃 1 年。已弃用 - 已弃用的版本将继续接收安全和关键错误修复,但不会接收新功能。已弃用的版本将公开支持 1 年。不受支持 - 已弃用超过 1 年的任何主要版本被视为公开不受支持。

支持的 Python 版本

我们遵循 Google Cloud Libraries for Python 使用的Python 版本支持策略。支持的 Python 版本更改将视为次要更改,并列在发行说明中。

发布频率

本项目旨在达到最低每月发布频率。如果没有添加新功能或修复,则发布带有最新依赖项的新补丁版本。

贡献

我们欢迎外部贡献。有关如何最佳贡献的详细信息,请参阅我们的 贡献指南

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

google_cloud_alloydb_connector-1.4.0.tar.gz (32.9 kB 查看哈希值)

上传时间 源代码

构建分发

google_cloud_alloydb_connector-1.4.0-py2.py3-none-any.whl (42.1 kB 查看哈希值)

上传时间 Python 2 Python 3

支持者: