hasql 是一个用于获取与主节点和副本的实际连接的模块
项目描述
hasql
hasql 是一个用于连接高可用 PostgreSQL 集群中主节点和副本的库。
功能
完全异步 API
自动检测集群中的主机角色
对每个主机的健康检查以及自动为不可用主机断开流量
自动检测主机角色变化,例如,如果副本主机被提升为主节点
不同的负载均衡策略
支持 asyncpg、psycopg3、aiopg、sqlalchemy 和 asyncpgsa
用法
一些有用的示例
创建连接池
获取连接时,返回所使用驱动程序的连接对象(对于 aiopg 为 aiopg.connection.Connection,对于 asyncpg 和 asyncpgsa 为 asyncpg.pool.PoolConnectionProxy)
数据库 URL 规范
应通过逗号分隔传递多个主机
多主机示例
postgresql://db1,db2,db3/
分割结果
postgresql://db1:5432/
postgresql://db2:5432/
postgresql://db3:5432/
可以在主机名之后传递每个主机的非默认端口号。例如。
多主机示例
postgresql://db1:1234,db2:5678,db3/
分割结果
postgresql://db1:1234/
postgresql://db2:5678/
postgresql://db3:5432/
所有主机非默认端口的特殊情况
多主机示例
postgresql://db1,db2,db3:6432/
分割结果
postgresql://db1:6432/
postgresql://db2:6432/
postgresql://db3:6432/
对于 aiopg 或 aiopg.sa
aiopg 必须作为依赖项安装。
使用 aiopg 的代码示例
from hasql.aiopg import PoolManager
hosts = ",".join([
"master-host:5432",
"replica-host-1:5432",
"replica-host-2:5432",
])
multihost_dsn = f"postgresql://user:password@{hosts}/dbname"
async def create_pool(dsn) -> PoolManager:
pool = PoolManager(multihost_dsn)
# Waiting for 1 master and 1 replica will be available
await pool.ready(masters_count=1, replicas_count=1)
return pool
使用 aiopg.sa 的代码示例
from hasql.aiopg_sa import PoolManager
hosts = ",".join([
"master-host:5432",
"replica-host-1:5432",
"replica-host-2:5432",
])
multihost_dsn = f"postgresql://user:password@{hosts}/dbname"
async def create_pool(dsn) -> PoolManager:
pool = PoolManager(multihost_dsn)
# Waiting for 1 master and 1 replica will be available
await pool.ready(masters_count=1, replicas_count=1)
return pool
对于 asyncpg
asyncpg 必须作为依赖项安装
from hasql.asyncpg import PoolManager
hosts = ",".join([
"master-host:5432",
"replica-host-1:5432",
"replica-host-2:5432",
])
multihost_dsn = f"postgresql://user:password@{hosts}/dbname"
async def create_pool(dsn) -> PoolManager:
pool = PoolManager(multihost_dsn)
# Waiting for 1 master and 1 replica will be available
await pool.ready(masters_count=1, replicas_count=1)
return pool
对于 sqlalchemy
sqlalchemy[asyncio] & asyncpg 必须作为依赖项安装
from hasql.asyncsqlalchemy import PoolManager
hosts = ",".join([
"master-host:5432",
"replica-host-1:5432",
"replica-host-2:5432",
])
multihost_dsn = f"postgresql://user:password@{hosts}/dbname"
async def create_pool(dsn) -> PoolManager:
pool = PoolManager(
multihost_dsn,
# Use master for acquire_replica, if no replicas available
fallback_master=True,
# You can pass pool-specific options
pool_factory_kwargs=dict(
pool_size=10,
max_overflow=5
)
)
# Waiting for 1 master and 1 replica will be available
await pool.ready(masters_count=1, replicas_count=1)
return pool
对于 asyncpgsa
asyncpgsa 必须作为依赖项安装
from hasql.asyncpgsa import PoolManager
hosts = ",".join([
"master-host:5432",
"replica-host-1:5432",
"replica-host-2:5432",
])
multihost_dsn = f"postgresql://user:password@{hosts}/dbname"
async def create_pool(dsn) -> PoolManager:
pool = PoolManager(multihost_dsn)
# Waiting for 1 master and 1 replica will be available
await asyncio.gather(
pool.wait_masters_ready(1),
pool.wait_replicas_ready(1)
)
return pool
对于 psycopg3
psycopg3 必须作为依赖项安装(包名为 psycopg)
from hasql.psycopg3 import PoolManager
hosts = ",".join([
"master-host:5432",
"replica-host-1:5432",
"replica-host-2:5432",
])
multihost_dsn = f"postgresql://user:password@{hosts}/dbname"
async def create_pool(dsn) -> PoolManager:
pool = PoolManager(multihost_dsn)
# Waiting for 1 master and 1 replica will be available
await pool.ready(masters_count=1, replicas_count=1)
return pool
获取连接
应该使用异步上下文管理器获取连接
获取主连接
async def do_something():
pool = await create_pool(multihost_dsn)
async with pool.acquire(read_only=False) as connection:
...
或
async def do_something():
pool = await create_pool(multihost_dsn)
async with pool.acquire_master() as connection:
...
获取副本连接
async def do_something():
pool = await create_pool(multihost_dsn)
async with pool.acquire(read_only=True) as connection:
...
或
async def do_something():
pool = await create_pool(multihost_dsn)
async with pool.acquire_replica() as connection:
...
不使用上下文管理器(实际上不推荐)
async def do_something():
pool = await create_pool(multihost_dsn)
connection = await pool.acquire(read_only=False)
await pool.release(connection)
或更有用
async def do_something():
pool = await create_pool(multihost_dsn)
try:
connection = await pool.acquire(read_only=False)
finally:
await pool.release(connection)
它是如何工作的?
对于dsn字符串中的每个主机,创建一个连接池。从每个池中保留一个连接,用于检查主机的可用性和其角色。池中连接的最小和最大数量增加1(以保留系统连接)。
为每个池创建一个后台任务,在后台任务中,每 refresh_delay 秒检查一次主机的可用性和其角色(主节点或副本)。
当切换主机角色时,hasql会稍延迟检测到这一点。
对于PostgreSQL,当切换主节点时,所有主机上的所有连接都会断开(PostgreSQL实现的细节)。
如果没有可用主机,方法 acquire()、acquire_master() 和 acquire_replica() 会等待直到具有所需角色的主机启动。
概述
- hasql.base.BasePoolManager
__init__(dsn, acquire_timeout, refresh_delay, refresh_timeout, fallback_master, master_as_replica_weight, balancer_policy, pool_factory_kwargs):
dsn: str - 连接使用的连接字符串。
acquire_timeout: Union[int, float] - 连接操作的默认超时时间(以秒为单位)。默认为1秒。
refresh_delay: Union[int, float] - 主机轮询之间的延迟时间(以秒为单位)。默认为1秒。
refresh_timeout: Union[int, float] - 尝试连接和获取主机角色的超时时间(以秒为单位)。默认为1秒。
fallback_master: bool - 如果副本缺失,则使用主节点的连接。默认为False。
master_as_replica_weight: float - 使用主节点作为副本的概率(从0.到1.;0. - 主节点不作为副本使用;1. - 主节点可以作为副本使用)。
balancer_policy: type - 连接池平衡策略(hasql.balancer_policy.GreedyBalancerPolicy、hasql.balancer_policy.RandomWeightedBalancerPolicy 或 hasql.balancer_policy.RoundRobinBalancerPolicy)。
stopwatch_window_size: int - 计算每个池中响应时间中位数的窗口大小。
pool_factory_kwargs: Optional[dict] - 传递给池工厂的连接池创建参数。
get_pool_freesize(pool) 获取连接池中空闲连接的数量。返回连接池中的空闲连接数量。
pool - 要获取空闲连接数量的池。
coroutine async-with acquire_from_pool(pool, **kwargs) 从池中获取连接。返回数据库连接。
pool - 要获取连接的池。
kwargs - 要传递给池acquire()方法的参数。
协程 release_to_pool(connection, pool, **kwargs) 一个将连接conn还原到池中以便将来回收的协程。
connection - 要释放的连接。
pool - 要将连接返回到的池。
kwargs - 要传递给池release()方法的参数。
is_connection_closed(connection) 如果连接已关闭则返回True。
get_last_response_time(pool) 返回数据库主机最后响应时间(以秒为单位)。
协程 async-with acquire(read_only, fallback_master, timeout, **kwargs) 从空闲池中获取连接。
readonly: bool - 如果需要将连接返回到副本,则为True,否则为False - 主机。默认为False。
fallback_master: Optional[bool] - 如果副本缺失,则使用主机的连接。如果为None,则使用默认值。
master_as_replica_weight: float - 使用主节点作为副本的概率(从0.到1.;0. - 主节点不作为副本使用;1. - 主节点可以作为副本使用)。
timeout: Union[int, float] - 连接操作的超时时间(以秒为单位)。
kwargs - 要传递给池acquire()方法的参数。
协程 async-with acquire_master(timeout, **kwargs) 从空闲主池中获取连接。相当于 acquire(read_only=False)
timeout: Union[int, float] - 连接操作的超时时间(以秒为单位)。
kwargs - 要传递给池acquire()方法的参数。
协程 async-with acquire_replica(fallback_master, timeout, **kwargs) 从空闲副本池中获取连接。相当于 acquire(read_only=True)
fallback_master: Optional[bool] - 如果副本缺失,则使用主机的连接。如果为None,则使用默认值。
master_as_replica_weight: float - 使用主节点作为副本的概率(从0.到1.;0. - 主节点不作为副本使用;1. - 主节点可以作为副本使用)。
timeout: Union[int, float] - 连接操作的超时时间(以秒为单位)。
kwargs - 要传递给池acquire()方法的参数。
协程 release(connection, **kwargs) 一个将连接conn还原到池中以便将来回收的协程。
connection - 要释放的连接。
kwargs - 要传递给池release()方法的参数。
协程 close() 关闭池。在返回池时标记所有池连接为关闭。关闭的池不允许获取新的连接。
协程 terminate() 终止池。关闭池并立即关闭所有已获取的连接。
协程 wait_next_pool_check(timeout) 等待下一步更新主机角色。
协程 ready(masters_count, replicas_count, timeout) 等待连接到数据库主机。如果 masters_count 为 None 且 replicas_count 为 None,则期望连接到所有主机。
masters_count: Optional[int] - 最小主机数量。默认为 None。
replicas_count: Optional[int] - 最小副本主机数量。默认为 None。
timeout: Union[int, float] - 数据库连接的超时时间。默认为10秒。
协程 wait_all_ready() 等待连接到所有数据库主机。
协程 wait_masters_ready(masters_count) 等待连接到指定的数据库主服务器数量。
masters_count: int - 最小主机数量。
协程 wait_replicas_ready(replicas_count) 等待连接到指定的数据库副本服务器数量。
replicas_count: int - 最小副本主机数量。
协程 get_pool(read_only, fallback_master) 返回具有最大空闲连接数的连接池。
readonly: bool - 如果需要返回副本池,则为True,否则为False - 主池。
fallback_master: Optional[bool] - 如果副本缺失,则返回主池。默认为False。
协程 get_master_pools() 返回所有主池的列表。
协程 get_replica_pools(fallback_master) 返回所有副本池的列表。
fallback_master: Optional[bool] - 如果副本缺失,则返回所有主池的列表。默认为False。
pool_is_master(pool) 如果连接是主机的,则返回True。
pool_is_replica(pool) 如果连接是副本的,则返回True。
register_connection(connection, pool) 将连接与从其中取出的连接池匹配。这对于 release() 方法正确工作至关重要。
hasql.aiopg.PoolManager
hasql.aiopg_sa.PoolManager
hasql.asyncpg.PoolManager
hasql.asyncpgsa.PoolManager
hasql.psycopg3.PoolManager
均衡策略
hasql.balancer_policy.GreedyBalancerPolicy 选择拥有最多空闲连接的池。如果有多个这样的池,则随机选择一个。
hasql.balancer_policy.RandomWeightedBalancerPolicy 根据它们的权重随机选择池。权重与相应池数据库的响应时间成反比(响应越快,权重越高)。
hasql.balancer_policy.RoundRobinBalancerPolicy
项目详情
hasql-0.7.1.tar.gz 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | c5a1f8b66a50f0fc75b3b9f42bc7e79b52781a8f2a79ce0279c6be6cff4a3960 |
|
MD5 | 7f42f2e81a102b043880bf7a264d8a50 |
|
BLAKE2b-256 | 9036cd0ac6852966308e399a28838edd57af75f198ed659e709a6fc9c5f86aeb |