跳转到主要内容

hasql 是一个用于获取与主节点和副本的实际连接的模块

项目描述

https://raw.githubusercontent.com/aiokitchen/hasql/master/resources/logo.svg

hasql

hasql 是一个用于连接高可用 PostgreSQL 集群中主节点和副本的库。

https://raw.githubusercontent.com/aiokitchen/hasql/master/resources/diagram.svg

功能

  • 完全异步 API

  • 自动检测集群中的主机角色

  • 对每个主机的健康检查以及自动为不可用主机断开流量

  • 自动检测主机角色变化,例如,如果副本主机被提升为主节点

  • 不同的负载均衡策略

  • 支持 asyncpgpsycopg3aiopgsqlalchemyasyncpgsa

用法

一些有用的示例

创建连接池

获取连接时,返回所使用驱动程序的连接对象(对于 aiopgaiopg.connection.Connection,对于 asyncpgasyncpgsaasyncpg.pool.PoolConnectionProxy

数据库 URL 规范

  • 应通过逗号分隔传递多个主机

    • 多主机示例

      • postgresql://db1,db2,db3/

    • 分割结果

      • postgresql://db1:5432/

      • postgresql://db2:5432/

      • postgresql://db3:5432/

  • 可以在主机名之后传递每个主机的非默认端口号。例如。

    • 多主机示例

      • postgresql://db1:1234,db2:5678,db3/

    • 分割结果

      • postgresql://db1:1234/

      • postgresql://db2:5678/

      • postgresql://db3:5432/

  • 所有主机非默认端口的特殊情况

    • 多主机示例

      • postgresql://db1,db2,db3:6432/

    • 分割结果

      • postgresql://db1:6432/

      • postgresql://db2:6432/

      • postgresql://db3:6432/

对于 aiopgaiopg.sa

aiopg 必须作为依赖项安装。

使用 aiopg 的代码示例

from hasql.aiopg import PoolManager

hosts = ",".join([
    "master-host:5432",
    "replica-host-1:5432",
    "replica-host-2:5432",
])

multihost_dsn = f"postgresql://user:password@{hosts}/dbname"

async def create_pool(dsn) -> PoolManager:
    pool = PoolManager(multihost_dsn)

    # Waiting for 1 master and 1 replica will be available
    await pool.ready(masters_count=1, replicas_count=1)
    return pool

使用 aiopg.sa 的代码示例

from hasql.aiopg_sa import PoolManager

hosts = ",".join([
    "master-host:5432",
    "replica-host-1:5432",
    "replica-host-2:5432",
])

multihost_dsn = f"postgresql://user:password@{hosts}/dbname"

async def create_pool(dsn) -> PoolManager:
    pool = PoolManager(multihost_dsn)

    # Waiting for 1 master and 1 replica will be available
    await pool.ready(masters_count=1, replicas_count=1)
    return pool

对于 asyncpg

asyncpg 必须作为依赖项安装

from hasql.asyncpg import PoolManager

hosts = ",".join([
    "master-host:5432",
    "replica-host-1:5432",
    "replica-host-2:5432",
])

multihost_dsn = f"postgresql://user:password@{hosts}/dbname"

async def create_pool(dsn) -> PoolManager:
    pool = PoolManager(multihost_dsn)

    # Waiting for 1 master and 1 replica will be available
    await pool.ready(masters_count=1, replicas_count=1)
    return pool

对于 sqlalchemy

sqlalchemy[asyncio] & asyncpg 必须作为依赖项安装

from hasql.asyncsqlalchemy import PoolManager

hosts = ",".join([
    "master-host:5432",
    "replica-host-1:5432",
    "replica-host-2:5432",
])

multihost_dsn = f"postgresql://user:password@{hosts}/dbname"


async def create_pool(dsn) -> PoolManager:
    pool = PoolManager(
        multihost_dsn,

        # Use master for acquire_replica, if no replicas available
        fallback_master=True,

        # You can pass pool-specific options
        pool_factory_kwargs=dict(
            pool_size=10,
            max_overflow=5
        )
    )

    # Waiting for 1 master and 1 replica will be available
    await pool.ready(masters_count=1, replicas_count=1)
    return pool

对于 asyncpgsa

asyncpgsa 必须作为依赖项安装

from hasql.asyncpgsa import PoolManager

hosts = ",".join([
    "master-host:5432",
    "replica-host-1:5432",
    "replica-host-2:5432",
])

multihost_dsn = f"postgresql://user:password@{hosts}/dbname"

async def create_pool(dsn) -> PoolManager:
    pool = PoolManager(multihost_dsn)

    # Waiting for 1 master and 1 replica will be available
    await asyncio.gather(
        pool.wait_masters_ready(1),
        pool.wait_replicas_ready(1)
    )
    return pool

对于 psycopg3

psycopg3 必须作为依赖项安装(包名为 psycopg

from hasql.psycopg3 import PoolManager


hosts = ",".join([
    "master-host:5432",
    "replica-host-1:5432",
    "replica-host-2:5432",
])
multihost_dsn = f"postgresql://user:password@{hosts}/dbname"

async def create_pool(dsn) -> PoolManager:
    pool = PoolManager(multihost_dsn)

    # Waiting for 1 master and 1 replica will be available
    await pool.ready(masters_count=1, replicas_count=1)
    return pool

获取连接

应该使用异步上下文管理器获取连接

获取主连接

async def do_something():
    pool = await create_pool(multihost_dsn)
    async with pool.acquire(read_only=False) as connection:
        ...

async def do_something():
    pool = await create_pool(multihost_dsn)
    async with pool.acquire_master() as connection:
        ...

获取副本连接

async def do_something():
    pool = await create_pool(multihost_dsn)
    async with pool.acquire(read_only=True) as connection:
        ...

async def do_something():
    pool = await create_pool(multihost_dsn)
    async with pool.acquire_replica() as connection:
        ...

它是如何工作的?

对于dsn字符串中的每个主机,创建一个连接池。从每个池中保留一个连接,用于检查主机的可用性和其角色。池中连接的最小和最大数量增加1(以保留系统连接)。

为每个池创建一个后台任务,在后台任务中,每 refresh_delay 秒检查一次主机的可用性和其角色(主节点或副本)。

当切换主机角色时,hasql会稍延迟检测到这一点。

对于PostgreSQL,当切换主节点时,所有主机上的所有连接都会断开(PostgreSQL实现的细节)。

如果没有可用主机,方法 acquire()、acquire_master() 和 acquire_replica() 会等待直到具有所需角色的主机启动。

概述

  • hasql.base.BasePoolManager
    • __init__(dsn, acquire_timeout, refresh_delay, refresh_timeout, fallback_master, master_as_replica_weight, balancer_policy, pool_factory_kwargs):

      • dsn: str - 连接使用的连接字符串。

      • acquire_timeout: Union[int, float] - 连接操作的默认超时时间(以秒为单位)。默认为1秒。

      • refresh_delay: Union[int, float] - 主机轮询之间的延迟时间(以秒为单位)。默认为1秒。

      • refresh_timeout: Union[int, float] - 尝试连接和获取主机角色的超时时间(以秒为单位)。默认为1秒。

      • fallback_master: bool - 如果副本缺失,则使用主节点的连接。默认为False。

      • master_as_replica_weight: float - 使用主节点作为副本的概率(从0.到1.;0. - 主节点不作为副本使用;1. - 主节点可以作为副本使用)。

      • balancer_policy: type - 连接池平衡策略(hasql.balancer_policy.GreedyBalancerPolicyhasql.balancer_policy.RandomWeightedBalancerPolicyhasql.balancer_policy.RoundRobinBalancerPolicy)。

      • stopwatch_window_size: int - 计算每个池中响应时间中位数的窗口大小。

      • pool_factory_kwargs: Optional[dict] - 传递给池工厂的连接池创建参数。

    • get_pool_freesize(pool) 获取连接池中空闲连接的数量。返回连接池中的空闲连接数量。

      • pool - 要获取空闲连接数量的池。

    • coroutine async-with acquire_from_pool(pool, **kwargs) 从池中获取连接。返回数据库连接。

      • pool - 要获取连接的池。

      • kwargs - 要传递给池acquire()方法的参数。

    • 协程 release_to_pool(connection, pool, **kwargs) 一个将连接conn还原到池中以便将来回收的协程。

      • connection - 要释放的连接。

      • pool - 要将连接返回到的池。

      • kwargs - 要传递给池release()方法的参数。

    • is_connection_closed(connection) 如果连接已关闭则返回True。

    • get_last_response_time(pool) 返回数据库主机最后响应时间(以秒为单位)。

    • 协程 async-with acquire(read_only, fallback_master, timeout, **kwargs) 从空闲池中获取连接。

      • readonly: bool - 如果需要将连接返回到副本,则为True,否则为False - 主机。默认为False。

      • fallback_master: Optional[bool] - 如果副本缺失,则使用主机的连接。如果为None,则使用默认值。

      • master_as_replica_weight: float - 使用主节点作为副本的概率(从0.到1.;0. - 主节点不作为副本使用;1. - 主节点可以作为副本使用)。

      • timeout: Union[int, float] - 连接操作的超时时间(以秒为单位)。

      • kwargs - 要传递给池acquire()方法的参数。

    • 协程 async-with acquire_master(timeout, **kwargs) 从空闲主池中获取连接。相当于 acquire(read_only=False)

      • timeout: Union[int, float] - 连接操作的超时时间(以秒为单位)。

      • kwargs - 要传递给池acquire()方法的参数。

    • 协程 async-with acquire_replica(fallback_master, timeout, **kwargs) 从空闲副本池中获取连接。相当于 acquire(read_only=True)

      • fallback_master: Optional[bool] - 如果副本缺失,则使用主机的连接。如果为None,则使用默认值。

      • master_as_replica_weight: float - 使用主节点作为副本的概率(从0.到1.;0. - 主节点不作为副本使用;1. - 主节点可以作为副本使用)。

      • timeout: Union[int, float] - 连接操作的超时时间(以秒为单位)。

      • kwargs - 要传递给池acquire()方法的参数。

    • 协程 release(connection, **kwargs) 一个将连接conn还原到池中以便将来回收的协程。

      • connection - 要释放的连接。

      • kwargs - 要传递给池release()方法的参数。

    • 协程 close() 关闭池。在返回池时标记所有池连接为关闭。关闭的池不允许获取新的连接。

    • 协程 terminate() 终止池。关闭池并立即关闭所有已获取的连接。

    • 协程 wait_next_pool_check(timeout) 等待下一步更新主机角色。

    • 协程 ready(masters_count, replicas_count, timeout) 等待连接到数据库主机。如果 masters_countNonereplicas_count 为 None,则期望连接到所有主机。

      • masters_count: Optional[int] - 最小主机数量。默认为 None

      • replicas_count: Optional[int] - 最小副本主机数量。默认为 None

      • timeout: Union[int, float] - 数据库连接的超时时间。默认为10秒。

    • 协程 wait_all_ready() 等待连接到所有数据库主机。

    • 协程 wait_masters_ready(masters_count) 等待连接到指定的数据库主服务器数量。

      • masters_count: int - 最小主机数量。

    • 协程 wait_replicas_ready(replicas_count) 等待连接到指定的数据库副本服务器数量。

      • replicas_count: int - 最小副本主机数量。

    • 协程 get_pool(read_only, fallback_master) 返回具有最大空闲连接数的连接池。

      • readonly: bool - 如果需要返回副本池,则为True,否则为False - 主池。

      • fallback_master: Optional[bool] - 如果副本缺失,则返回主池。默认为False。

    • 协程 get_master_pools() 返回所有主池的列表。

    • 协程 get_replica_pools(fallback_master) 返回所有副本池的列表。

      • fallback_master: Optional[bool] - 如果副本缺失,则返回所有主池的列表。默认为False。

    • pool_is_master(pool) 如果连接是主机的,则返回True。

    • pool_is_replica(pool) 如果连接是副本的,则返回True。

    • register_connection(connection, pool) 将连接与从其中取出的连接池匹配。这对于 release() 方法正确工作至关重要。

  • hasql.aiopg.PoolManager

  • hasql.aiopg_sa.PoolManager

  • hasql.asyncpg.PoolManager

  • hasql.asyncpgsa.PoolManager

  • hasql.psycopg3.PoolManager

均衡策略

  • hasql.balancer_policy.GreedyBalancerPolicy 选择拥有最多空闲连接的池。如果有多个这样的池,则随机选择一个。

  • hasql.balancer_policy.RandomWeightedBalancerPolicy 根据它们的权重随机选择池。权重与相应池数据库的响应时间成反比(响应越快,权重越高)。

  • hasql.balancer_policy.RoundRobinBalancerPolicy

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分发

hasql-0.7.1.tar.gz (23.9 kB 查看散列)

上传时间 源代码

支持者