跳至主要内容

由sentinel pool支持的读写redis客户端

项目描述

TwiceRedis

带有断开sentinel客户端和redis客户端的读写sentinel连接池支持redis客户端

当使用由SentinelConnectionPool支持的redis连接时,该池初始化为连接到主服务器或从服务器。如果您想从从服务器读取并偶尔向主服务器写入,这可能会很烦人。TwiceRedis通过拥有两个客户端来解决这个问题,一个用于主服务器,另一个用于从服务器

  • tr.master也称为tr.write

  • tr.slave也称为tr.read

每个客户端都由一个单独的SentinelConnectionPool支持,初始化为分别连接到主服务器或从服务器

TwiceRedis还使用一个DisconnectingSentinel类来大大减少与redis sentinel服务(s)的活跃连接数。该类在选择主服务器或从服务器后断开与所选sentinel的连接

DisconnectionSentinel类在过滤从服务器方面比基本Sentinel类更为智能。除了确保从服务器不是sdownodown之外,它还确保从服务器的master-link-statusok

TwiceRedis 会随机化哨兵列表,以便每个 TwiceRedis 对象连接到列表中的一个随机哨兵,而不是所有对象都连接到第一个哨兵(只要可行)。这种随机化可能与 DisconnectingSentinel 结合使用略显多余,但最坏的情况是会减少哨兵列表中第一个哨兵的负载。

TwiceRedis 还使用了名为 DisconnectRedisStrictRedis 子类,它为所有客户端添加了一个 disconnect() 函数,使管理对 Redis 服务的单个连接更加容易。

使用方法

from twiceredis import TwiceRedis
sentinels = [('10.10.10.10', 26379),
             ('10.10.10.11', 26379),
             ('10.10.10.12', 26379)]
tr = TwiceRedis('master01', sentinels, 'tötes_passowrd')
x = tr.slave.get('superkey')
tr.master.set('je mange', 'huehue')
x = tr.read.get('nous mangeons')
tr.write.del('superkey')

管道线也运行良好,您只需决定是否需要在一个管道中写入,如果需要写入,使用 tr.master,否则使用 tr.slave

with tr.master.pipeline() as wpipe:
    wpipe.set('turle', 'power')
    wpipe.set('tr3buchet', 'tötes')
    wpipe.execute()

为了连接、获取密钥然后断开连接以减少活动连接

x = tr.slave.get('some key')
tr.slave.disconnect()

之后,根据需要无缝重新连接,您可能会连接到不同的从节点。

x = tr.slave.get('some other key')

如果您想同时断开 tr.mastertr.slave 的连接,可以使用 tr.disconnect()。它会在从节点和主节点客户端上调用 disconnect()

x = tr.slave.get('some key')
tr.master.publish('topic', x)
tr.disconnect()

# ... and later on reconnect seamlessly
tr.master.set('some key', 'le totes!')
x = tr.slave.get('some_key')

关于调整,请查看 TwiceRedis 的定义源代码中的文档字符串(https://github.com/tr3buchet/twiceredis/blob/master/twiceredis/client.py)。pool_kwargssentinel_kwargs 的默认参数定义得易于调整以满足您的需求。这主要是超时和 TCP 保持活动连接的东西,但每个环境都不同,所以我在我的环境中有效的默认值可能在您的环境中无效。以下是一个调整示例。

from twiceredis import TwiceRedis
sentinels = [('10.10.10.10', 26379),
             ('10.10.10.11', 26379),
             ('10.10.10.12', 26379)]
pool_kwargs = TwiceRedis.DEFAULT_POOL_KWARGS
pool_kwargs['tcp_keepalive'] = False
sentinel_kwargs = TwiceRedis.DEFAULT_SENTINEL_KWARGS
sentinel_kwargs['min_other_sentinels'] = 2
tr = TwiceRedis('master01', sentinels, 'tötes_passowrd',
                pool_kwargs=pool_kwargs, sentinel_kwargs=sentinel_kwargs)

监听器

TwiceRedis 基于的强大耐用消息监听器,具有持久消息和正在传输的消息存储。

创建此监听器是因为我试图使用 Redis pubsub,但被防火墙断开连接并丢失消息。从发布端处理是否已订阅消息真的很痛苦且充满失败。 Listener 允许您可靠地监听任何列表中推送的消息。当接收到消息时,并且只在一次事务中,该消息将被移动到处理列表。只有处理完消息后,它才会从处理列表中删除。 Listener 可以无限期地监听并处理可能发生的任何连接失败或主节点故障转移,不管防火墙被动掉线。

使用方法

# on the listener side of things
from twiceredis import TwiceRedis
from twiceredis import Listener
sentinels = [('10.10.10.10', 26379),
             ('10.10.10.11', 26379),
             ('10.10.10.12', 26379)]
tr = TwiceRedis('master01', sentinels, 'tötes_passowrd')
l = Listener(tr, 'message_list')
l.listen()      # <--- blocks and logs all messages that comes through

# on the publisher side of things
redis_client.lpush('message_list', 'incredibly important message')

就是这样。它很容易使用。如果您需要更多自定义,请尝试使用自己的处理程序。我建议始终从处理程序返回 message,这样它与 get_message() 一起工作得很好,后者返回处理程序的结果,无论它是您自定义的处理程序还是内置默认处理程序,后者记录 message 然后返回它。

# again on the listener side of things
def f(msg):
    do_thing(msg)
    print msg
    return msg

l = Listener(tr, 'message_list', handler=f)
l.listen()      # <--- blocks and calls f(msg) for each msg that comes through

如果您不喜欢阻塞,那也没关系,请查看 get_message()。此示例使用默认处理程序记录 message 然后将其返回供您使用。像上一个示例一样,您可以定义自己的处理程序并忽略或对 get_message() 的结果进行任何操作。

# always with the listener side of things
l = Listener(tr, 'message_list')
while some_loop_construct_is_true:
    msg = l.get_message()     # <--- does not block, returns None immediately if there is no message
    # do whatever with msg

您还可以使用 get_message() 手动处理消息。默认处理程序仍然会被调用,但它只记录并返回消息,因此您可以按自己的意愿处理它。

# always with the listener side of things
l = Listener(tr, 'message_list')
while some_loop_construct_is_true:
    some_handler(l.get_message())
    # do other things in your loop

read_time 表示在未接收到消息时,listen() 函数每次迭代将阻塞多长时间。只要这个值低于您在实例化时传递给 ListenerTwiceRedis 对象所配置的 socket_timeout,这个值是多少都无关紧要。如果它大于 socket_timeout,则每次迭代都会抛出异常,虽然会捕获这个异常,但效率低下。我决定实现一个 read_time 伪超时,这样标准循环就不需要抛出异常,并且可以防止在套接字因任何原因挂起时陷入无限循环。注意!!read_time 与消息处理速率无关。只要它正在接收消息,循环就会尽可能快地迭代。

processing_suffix 会被添加到事件列表名称中,以构建用于存储待处理消息的列表键,直到处理完成,可以将其更改为任何您喜欢的字符串。

至于异常或 Redis 连接处理,如果您启动了一个 listen(),您可以杀死 Redis 或做任何事,它可以在一周内关闭,但只要它恢复,Listener 就会从上次离开的地方继续,就像什么都没发生一样。每次迭代都会重用或尝试创建一个连接,这依赖于 TwiceRedis 的哨兵支持性质,以重新连接到正确的主节点(即使它因故障转移或维护等而更改)。它被构建得非常耐用。

安装

pip install twiceredis 或克隆存储库并 python setup.py install

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

twiceredis-2.0.0.tar.gz (11.8 kB 查看散列)

上传时间

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面