跳转到主要内容

Cloud SQL Python Connector 是一个库,可用于与数据库驱动程序配合使用,允许具有足够权限的用户连接到 Cloud SQL 数据库,而无需手动允许 IP 或管理 SSL 证书。

项目描述

cloud-sql-python-connector image

Cloud SQL Python Connector

Open In Colab CI pypi PyPI download month python

Cloud SQL Python Connector 是专为与 Python 语言一起使用而设计的 Cloud SQL 连接器。使用 Cloud SQL 连接器提供对 Cloud SQL Auth Proxy 的本地替代方案,同时提供以下好处

  • IAM 授权: 使用 IAM 权限控制谁/什么可以连接到您的 Cloud SQL 实例
  • 增强安全性: 使用强大的、更新的 TLS 1.3 加密和客户端连接器与服务器端代理之间的身份验证,独立于数据库协议。
  • 方便性: 移除了使用和分发 SSL 证书的要求,以及管理防火墙或源/目标 IP 地址。
  • (可选) IAM 数据库身份验证: 提供对 Cloud SQL 的自动 IAM DB AuthN 功能 的支持。

Cloud SQL Python Connector 是一个与数据库驱动程序一起使用的包。目前支持以下驱动程序

安装

您可以使用 pip install 命令安装此库,并根据您的数据库方言指定驱动程序。

MySQL

pip install "cloud-sql-python-connector[pymysql]"

Postgres

对于Postgres方言,支持两种不同的数据库驱动程序

pg8000

pip install "cloud-sql-python-connector[pg8000]"

asyncpg

pip install "cloud-sql-python-connector[asyncpg]"

SQL Server

pip install "cloud-sql-python-connector[pytds]"

API和服务

此包需要以下内容才能成功建立Cloud SQL连接

  • 具有 Cloud SQL客户端 角色的IAM主体(用户、服务帐户等)。此IAM主体将用于 凭据
  • 在您的Google Cloud项目中启用 Cloud SQL管理员API。默认情况下,API将在与IAM主体关联的项目中调用。

凭据

此库使用 应用程序默认凭据(ADC) 策略来解决凭据。请参阅 这些说明 以设置您的ADC(Google Cloud应用程序与本地开发、IAM用户与服务帐户凭据),或咨询 google.auth 包。

要显式设置凭据的特定来源,请参阅下面的 配置连接器

用法

此包提供了一些用于授权和加密连接的函数。这些函数与您的数据库驱动程序一起使用,以连接到您的Cloud SQL实例。

您的Cloud SQL实例的实例连接名始终为 "project:region:instance" 格式。

如何使用此连接器

要使用连接器连接到Cloud SQL,请初始化一个 Connector 对象,并使用适当的输入参数调用其 connect 方法。

Connector 本身通过调用其 connect 方法创建连接对象,但不管理数据库连接池。因此,建议您使用可以创建连接池的库(如 SQLAlchemy)与连接器一起使用。这将允许连接保持打开状态并重用,从而减少连接开销和所需的连接数量。

在以下 Connectorconnect 方法中,将连接字符串作为第一个位置参数输入,并将数据库驱动程序的名称作为第二个位置参数。输入其余的连接关键字参数,如用户、密码和数据库。您还可以设置可选的 timeoutip_type 关键字参数。

要使用SQLAlchemy使用此连接器,请使用 creator 参数为 sqlalchemy.create_engine

from google.cloud.sql.connector import Connector
import sqlalchemy

# initialize Connector object
connector = Connector()

# function to return the database connection
def getconn() -> pymysql.connections.Connection:
    conn: pymysql.connections.Connection = connector.connect(
        "project:region:instance",
        "pymysql",
        user="my-user",
        password="my-password",
        db="my-db-name"
    )
    return conn

# create connection pool
pool = sqlalchemy.create_engine(
    "mysql+pymysql://",
    creator=getconn,
)

然后可以使用返回的连接池引擎查询和修改数据库。

# insert statement
insert_stmt = sqlalchemy.text(
    "INSERT INTO my_table (id, title) VALUES (:id, :title)",
)

with pool.connect() as db_conn:
    # insert into database
    db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})

    # query database
    result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()

    # commit transaction (SQLAlchemy v2.X.X is commit as you go)
    db_conn.commit()

    # Do something with the results
    for row in result:
        print(row)

要关闭 Connector 对象的后台资源,请按以下方式调用其 close() 方法

connector.close()

[!NOTE]

有关使用SQLAlchemy管理连接池与连接器的更多示例,请参阅 Cloud SQL SQLAlchemy示例

配置连接器

如果您需要自定义连接器或为要创建的每个连接指定默认值,可以按以下方式初始化 Connector 对象

from google.cloud.sql.connector import Connector

# Note: all parameters below are optional
connector = Connector(
    ip_type="public",  # can also be "private" or "psc"
    enable_iam_auth=False,
    timeout=30,
    credentials=custom_creds, # google.auth.credentials.Credentials
    refresh_strategy="lazy",  # can be "lazy" or "background"
)

使用连接器作为上下文管理器

Connector 对象也可以用作上下文管理器,以自动关闭和清理资源,从而无需显式调用 connector.close()

连接器作为上下文管理器

from google.cloud.sql.connector import Connector
import pymysql
import sqlalchemy

# helper function to return SQLAlchemy connection pool
def init_connection_pool(connector: Connector) -> sqlalchemy.engine.Engine:
    # function used to generate database connection
    def getconn() -> pymysql.connections.Connection:
        conn = connector.connect(
            "project:region:instance",
            "pymysql",
            user="my-user",
            password="my-password",
            db="my-db-name"
        )
        return conn

    # create connection pool
    pool = sqlalchemy.create_engine(
        "mysql+pymysql://",
        creator=getconn,
    )
    return pool

# initialize Cloud SQL Python Connector as context manager
with Connector() as connector:
    # initialize connection pool
    pool = init_connection_pool(connector)
    # insert statement
    insert_stmt = sqlalchemy.text(
        "INSERT INTO my_table (id, title) VALUES (:id, :title)",
    )

    # interact with Cloud SQL database using connection pool
    with pool.connect() as db_conn:
        # insert into database
        db_conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"})

        # commit transaction (SQLAlchemy v2.X.X is commit as you go)
        db_conn.commit()

        # query database
        result = db_conn.execute(sqlalchemy.text("SELECT * from my_table")).fetchall()

        # Do something with the results
        for row in result:
            print(row)

配置懒加载刷新(Cloud Run、Cloud Functions等)

将连接器的refresh_strategy参数设置为"lazy",可以配置Python连接器以懒加载和按需检索连接信息。否则,将运行后台刷新周期来定期检索连接信息。此设置在CPU可能在请求上下文之外受到限制的环境中很有用,例如Cloud Run、Cloud Functions等。

要设置刷新策略,请在初始化Connector时设置refresh_strategy关键字参数。

connector = Connector(refresh_strategy="lazy")

指定IP地址类型

Cloud SQL Python连接器可以用于使用公共和私有IP地址以及私有服务连接(PSC)连接到Cloud SQL实例。要指定要连接的IP地址类型,请在初始化Connector()或调用connector.connect()时设置ip_type关键字参数。

ip_type的可能值是"public"(默认值)、"private""psc"

示例

conn = connector.connect(
    "project:region:instance",
    "pymysql",
    ip_type="private"  # use private IP
... insert other kwargs ...
)

[!重要]

如果指定私有IP或私有服务连接(PSC),则您的应用程序必须连接到正确的VPC网络,才能连接到您的Cloud SQL实例。对于大多数应用程序,这需要使用VPC连接器

自动IAM数据库身份验证

当使用Postgres或MySQL驱动程序时,支持使用自动IAM数据库身份验证的连接。首先,请确保配置您的Cloud SQL实例以允许IAM身份验证添加IAM数据库用户

现在,您可以使用用户或服务帐户凭据而不是密码进行连接。在连接调用中,将enable_iam_auth关键字参数设置为true,并将user参数设置为格式正确的IAM主体。

Postgres:对于IAM用户帐户,这是用户的电子邮件地址。对于服务帐户,这是服务帐户的电子邮件地址,不带.gserviceaccount.com域名后缀。

MySQL:对于IAM用户帐户,这是不带@或域名用户的电子邮件地址。例如,对于[email protected],将user参数设置为test-user。对于服务帐户,这是不带@project-id.iam.gserviceaccount.com后缀的服务帐户的电子邮件地址。

示例

conn = connector.connect(
     "project:region:instance",
     "pg8000",
     user="[email protected]",
     db="my-db-name",
     enable_iam_auth=True,
 )

SQL Server(MSSQL)

[!重要]

如果您的SQL Server实例设置为强制SSL连接,则需要下载实例的CA证书,并包含cafile={下载证书的路径}validate_host=False。这是已知问题的解决方案。

Active Directory身份验证

目前,SQL Server实例的Active Directory身份验证仅在Windows上受支持。首先,请确保按照这些步骤设置托管AD域并将您的Cloud SQL实例加入该域。有关Cloud SQL Active Directory集成的更多信息,请参阅此处

一旦您遵循了上面链接的步骤,您就可以运行以下代码来返回连接对象

conn = connector.connect(
    "project:region:instance",
    "pytds",
    db="my-db-name",
    active_directory_auth=True,
    server_name="public.[instance].[location].[project].cloudsql.[domain]",
)

或者,如果使用私有IP

conn = connector.connect(
    "project:region:instance",
    "pytds",
    db="my-db-name",
    active_directory_auth=True,
    server_name="private.[instance].[location].[project].cloudsql.[domain]",
    ip_type="private"
)

使用Python连接器与Python Web框架

Python连接器可以与Flask、FastAPI等流行的Python Web框架一起使用,以在Web应用程序中集成Cloud SQL数据库。

[!NOTE]

对于无服务器环境,例如Cloud Functions、Cloud Run等,初始化Connector为懒加载刷新策略可能很有益。即Connector(refresh_strategy="lazy")

请参阅配置懒加载刷新

Flask-SQLAlchemy

Flask-SQLAlchemy 是一个 Flask 扩展,它为您的应用添加了 SQLAlchemy 的支持。该扩展旨在通过提供有用的默认值和额外的辅助工具来简化 SQLAlchemy 与 Flask 的使用,从而更容易完成常见任务。

您可以通过以下方式配置 Flask-SQLAlchemy 以从您的网络应用连接到 Cloud SQL 数据库:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from google.cloud.sql.connector import Connector


# initialize Python Connector object
connector = Connector()

# Python Connector database connection function
def getconn():
    conn = connector.connect(
        "project:region:instance-name", # Cloud SQL Instance Connection Name
        "pg8000",
        user="my-user",
        password="my-password",
        db="my-database",
        ip_type="public"  # "private" for private IP
    )
    return conn


app = Flask(__name__)

# configure Flask-SQLAlchemy to use Python Connector
app.config['SQLALCHEMY_DATABASE_URI'] = "postgresql+pg8000://"
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    "creator": getconn
}

# initialize the app with the extension
db = SQLAlchemy()
db.init_app(app)

有关如何使用 Flask-SQLAlchemy 的更多详细信息,请参阅 Flask-SQLAlchemy 快速入门指南

FastAPI

FastAPI 是一个现代的、快速(高性能)的基于 Python 标准类型提示构建 API 的网络框架。

您可以使用以下方式配置 FastAPI 以通过 SQLAlchemy ORM 从您的网络应用连接到 Cloud SQL 数据库:

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from google.cloud.sql.connector import Connector

# helper function to return SQLAlchemy connection pool
def init_connection_pool(connector: Connector) -> Engine:
    # Python Connector database connection function
    def getconn():
        conn = connector.connect(
            "project:region:instance-name", # Cloud SQL Instance Connection Name
            "pg8000",
            user="my-user",
            password="my-password",
            db="my-database",
            ip_type="public"  # "private" for private IP
        )
        return conn

    SQLALCHEMY_DATABASE_URL = "postgresql+pg8000://"

    engine = create_engine(
        SQLALCHEMY_DATABASE_URL , creator=getconn
    )
    return engine

# initialize Cloud SQL Python Connector
connector = Connector()

# create connection pool engine
engine = init_connection_pool(connector)

# create SQLAlchemy ORM session
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

要了解如何将数据库集成到您的 FastAPI 应用中,请遵循 FastAPI SQL 数据库指南

异步驱动程序使用

Cloud SQL 连接器与 asyncio 兼容,通过并发来提高数据库连接的速度和效率。您可以通过 Connector.connect_async 函数使用所有非异步 I/O 驱动程序,以及以下异步数据库驱动程序:

Cloud SQL 连接器有一个名为 create_async_connector 的辅助函数,该函数推荐用于异步数据库连接。它返回一个使用当前线程的运行事件循环的 Connector 对象。这与默认情况下在后台线程中初始化新事件循环的 Connector() 不同。

create_async_connector 允许使用与 Connector 对象相同的所有输入参数。

一旦 create_async_connector 返回 Connector 对象,您就可以调用其 connect_async 方法,就像调用 connect 方法一样

import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.sql.connector import Connector, create_async_connector

async def init_connection_pool(connector: Connector) -> AsyncEngine:
    # initialize Connector object for connections to Cloud SQL
    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect_async(
            "project:region:instance",  # Cloud SQL instance connection name
            "asyncpg",
            user="my-user",
            password="my-password",
            db="my-db-name"
            # ... additional database driver args
        )
        return conn

    # The Cloud SQL Python Connector can be used along with SQLAlchemy using the
    # 'async_creator' argument to 'create_async_engine'
    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
    )
    return pool

async def main():
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()

    # initialize connection pool
    pool = await init_connection_pool(connector)

    # example query
    async with pool.connect() as conn:
        await conn.execute(sqlalchemy.text("SELECT NOW()"))

    # close Connector
    await connector.close_async()

    # dispose of connection pool
    await pool.dispose()

有关 asyncpg.Connection 的附加数据库参数的更多详细信息,请访问 官方文档

异步上下文管理器

使用 create_async_connector 函数的替代方案是将 Connector 初始化为异步上下文管理器,从而无需显式调用 connector.close_async() 来清理资源。

[!NOTE]

此替代方案要求将运行的事件循环作为 loop 参数传递给 Connector()

import asyncio
import asyncpg

import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from google.cloud.sql.connector import Connector

async def init_connection_pool(connector: Connector) -> AsyncEngine:
    # initialize Connector object for connections to Cloud SQL
    async def getconn() -> asyncpg.Connection:
            conn: asyncpg.Connection = await connector.connect_async(
                "project:region:instance",  # Cloud SQL instance connection name
                "asyncpg",
                user="my-user",
                password="my-password",
                db="my-db-name"
                # ... additional database driver args
            )
            return conn

    # The Cloud SQL Python Connector can be used along with SQLAlchemy using the
    # 'async_creator' argument to 'create_async_engine'
    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
    )
    return pool

async def main():
    # initialize Connector object for connections to Cloud SQL
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # initialize connection pool
        pool = await init_connection_pool(connector)

        # example query
        async with pool.connect() as conn:
            await conn.execute(sqlalchemy.text("SELECT NOW()"))

        # dispose of connection pool
        await pool.dispose()

调试日志记录

Cloud SQL Python 连接器使用标准的 Python 日志模块 来支持调试日志记录。

将以下代码添加到您的应用中,以启用 Cloud SQL Python 连接器的调试日志记录

import logging

logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
logger = logging.getLogger(name="google.cloud.sql.connector")
logger.setLevel(logging.DEBUG)

有关配置日志的更多详细信息,请参阅 Python 日志文档

支持策略

主要版本生命周期

本项目使用 语义版本控制,并使用以下生命周期来处理主要版本的支持:

活跃 - 活跃版本将获得所有新功能和安全修复(这些修复不会引入破坏性更改)。新主要版本保证至少活跃 1 年。已弃用 - 已弃用的版本将继续接收安全和关键错误修复,但不会接收新功能。已弃用的版本将公开支持 1 年。不受支持 - 任何已弃用且 >=1 年的主要版本都被视为公开不受支持。

支持的 Python 版本

我们遵循Google Cloud Libraries for Python使用的Python版本支持策略。支持的Python版本的变化将被视为一个小的更改,并在发布说明中列出。

发布节奏

此项目旨在实现最低每月发布节奏。如果没有添加新功能或修复,则发布带有最新依赖项的新补丁版本。

贡献

我们欢迎外部贡献。有关如何最佳贡献的详细信息,请参阅我们的贡献指南

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

cloud_sql_python_connector-1.12.1.tar.gz (32.4 kB 查看哈希值)

上传时间

构建分布

cloud_sql_python_connector-1.12.1-py2.py3-none-any.whl (43.1 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面