跳转到主要内容

RedisGears同步食谱

项目描述

license CircleCI GitHub Actions PyPI version Known Vulnerabilities Language grade: Python

RGSync

Forum Discord

RedisGears的写入后写入通过食谱

演示

WriteBehind demo

示例

以下是一个RedisGears食谱示例,展示了如何使用写入后模式将Redis哈希表中的数据映射到MySQL表中。该食谱将所有前缀为person:<id>的Redis哈希表映射到persons表,其中<id>是主键并映射到person_id列。类似地,它将所有前缀为car:<id>的哈希表映射到cars表。

from rgsync import RGWriteBehind, RGWriteThrough
from rgsync.Connectors import MySqlConnector, MySqlConnection

'''
Create MySQL connection object
'''
connection = MySqlConnection('demouser', 'Password123!', 'localhost:3306/test')

'''
Create MySQL persons connector
'''
personsConnector = MySqlConnector(connection, 'persons', 'person_id')

personsMappings = {
	'first_name':'first',
	'last_name':'last',
	'age':'age'
}

RGWriteBehind(GB,  keysPrefix='person', mappings=personsMappings, connector=personsConnector, name='PersonsWriteBehind',  version='99.99.99')

'''
Create MySQL cars connector
'''
carsConnector = MySqlConnector(connection, 'cars', 'car_id')

carsMappings = {
	'id':'id',
	'color':'color'
}

RGWriteBehind(GB, keysPrefix='car', mappings=carsMappings, connector=carsConnector, name='CarsWriteBehind', version='99.99.99')

运行食谱

您可以使用此实用程序发送RedisGears食谱以执行。例如,运行此存储库中的example.py食谱,并使用以下命令安装其依赖项

gears-cli --host <host> --port <post> --password <password> run example.py REQUIREMENTS rgsync PyMySQL cryptography

食谱操作概述

RGWriteBehind() 实现了 写后 策略,主要由两个 RedisGears 函数组成,其操作如下

  1. 向 Redis 哈希键写入操作会触发 RedisGears 函数的执行。
  2. 该 RedisGears 函数从哈希中读取数据并将其写入 Redis 流。
  3. 另一个 RedisGears 函数在后台异步执行,并将更改写入目标数据库。

使用 Redis 流的动机

写后 策略实现中使用 Redis 流是为了确保捕获的更改的持久性,同时减轻将它们传输到目标数据库的性能损失。

策略的第一个 RedisGears 函数注册为同步运行,这意味着函数在执行命令的同一个主 Redis 线程中运行。这种执行模式是必要的,以便按顺序记录更改事件,并消除在发生故障时丢失事件的可能性。

将更改应用于目标数据库通常要慢得多,实际上排除了在主线程中执行该操作的可能性。第二个 RedisGears 函数以批量和间隔异步执行以实现这一点。

Redis 流是策略两部分通信的通道,其中更改按顺序同步持久化,并在后台异步处理。

控制要复制的操作

有时您想在不复制到目标的情况下修改 Redis 中的数据。为此,可以通过添加特殊字段 # 到哈希的字段并将它设置为以下值来定制该策略

  • + - 添加数据但不复制到目标
  • = - 添加数据并复制(默认行为)
  • - - 删除数据但不复制
  • ~ - 从 Redis 和目标中删除数据(使用 del 命令时的默认行为)

当哈希的值包含 # 字段时,策略将根据其值执行,并在之后从哈希中删除 # 字段。例如,以下显示了如何删除哈希而不复制删除操作

redis> HSET person:1 # -

或者,要添加哈希而不复制它

redis> HSET person:007 first_name James last_name Bond age 42 # +

至少一次和精确一次语义

默认情况下,写后 策略为写入提供了 至少一次 属性,这意味着数据将写入目标一次,但在发生故障的情况下可能会写入多次。

可以通过使用流的消息 ID 作为操作的递增 ID 来使策略提供 精确一次 交付语义。写入 RedisGears 函数可以使用该 ID 并将其记录在目标中的另一个表中,以确保任何给定的 ID 只写入一次。

策略的所有 SQL 连接器都支持此功能。要使用它,您需要向连接器提供“精确一次”表的名字。该表应包含 2 列,id 代表写入者的某些唯一 ID(例如用于区分分片)和 val 是写入到目标的最后一个流 ID。可以通过可选的 exactlyOnceTableName 变量在构造函数中将“精确一次”表的名字指定给连接器。

获取写入确认

可以使用该策略并从目标获取成功写入的确认。按照以下步骤操作

  1. 为每个更改数据操作生成一个 uuid
  2. 将操作的 uuid 立即添加到特殊 # 字段之后,即在 +/=/-/~ 字符之后。启用写入确认需要使用特殊的 #
  3. 执行操作后,执行 XREAD BLOCK <timeout> STREAMS {<hash key>}<uuid> 0-0。一旦配方写入目标,它将在该 ({<hash key>}<uuid>) 流中创建一条消息,该消息只有一个名为 'status' 的字段,其值为 'done'。
  4. 出于维护目的,建议在收到确认后删除该流。但这并非必须,因为这些流是使用一小时TTL创建的。

确认示例

127.0.0.1:6379> hset person:007 first_name James last_name Bond age 42 # =6ce0c902-30c2-4ac9-8342-2f04fb359a94
(integer) 1
127.0.0.1:6379> XREAD BLOCK 2000 STREAMS {person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94 0-0
1) 1) "{person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94"
   2) 1) 1) "1581927201056-0"
         2) 1) "status"
            2) "done"

写入通过

写入通过 是通过使用临时键完成的。配方将注册该键的变化并将其写入目标。写入目标是在服务器的主线程中以同步模式执行的,这意味着服务器将在此期间被阻塞,客户端将无法在完成前收到回复。

将更改写入目标可能会成功或失败。如果成功,配方将重命名临时键为其预期的最终名称。失败将阻止重命名。在两种情况下,都将删除临时键。

确认流的语义与 写入后 几乎相同。唯一的不同在于消息的结构。失败的写入将在该 ({<hash key>}<uuid>) 流中创建一个消息,该消息包含

  • 一个值为 'failed' 的 'status' 字段
  • 一个包含错误描述的 'error' 字段

请注意,当使用 写入通过 时,必须提供 uuid 并读取确认流。这是唯一能够判断写入是否成功的方法。

写入通过 使用 RGWriteThrough 类进行注册

RGWriteThrough(GB, keysPrefix, mappings, connector, name, version)

keysPrefix 参数是要触发写入的键的前缀。临时键的名称将采用以下格式

<keysPrefix>{<realKeyName>}

成功后,键将被重命名为 <realKeyName>

写入目标时的任何失败都将导致配方中止。在这种情况下,临时键不会被重命名并被删除。

请注意,在某些情况下,例如连接失败,可能无法判断目标上的操作是成功还是失败。配方将这些情况视为失败,尽管实际上写入可能已成功。

示例

这些示例假设将 keysPrefix 设置为 "__"。第一个显示成功写入

127.0.0.1:6379> HSET __{person:1} first_name foo last_name bar age 20 # =6ce0c902-30c2-4ac9-8342-2f04fb359a94
(integer) 4
127.0.0.1:6379> XREAD BLOCK 2000 STREAMS {person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94 0-0
1) 1) "{person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94"
   2) 1) 1) "1583321726502-0"
         2) 1) "status"
            2) "done"
127.0.0.1:6379> HGETALL person:1
1) "age"
2) "20"
3) "last_name"
4) "bar"
5) "first_name"
6) "foo"

一个失败的 写入通过 示例

127.0.0.1:6379> HSET __{person:1} first_name foo last_name bar age 20 # =6ce0c902-30c2-4ac9-8342-2f04fb359a94
(integer) 4
127.0.0.1:6379> XREAD BLOCK 2000 STREAMS {person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94 0-0
1) 1) "{person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94"
   2) 1) 1) "1583322141455-0"
         2) 1) "status"
            2) "failed"
            3) "error"
            4) "Failed connecting to SQL database, error=\"(pymysql.err.OperationalError) (2003, \"Can't connect to MySQL server on 'localhost' ([Errno 111] Connection refused)\")\n(Background on this error at: http://sqlalche.me/e/e3q8)\""

速度提升

为了提高写入通过更新的速度,用户应考虑为写入通过数据库添加索引。此索引将基于包含要复制的 redis 键 ID 的列创建。使用上面的示例,将创建一个 person_id 列,无论选择哪种后端数据库进行写入通过。因此,根据您的数据量和架构,在 person_id 列上创建索引可能是明智的。

数据持久性和可用性

为了避免 Redis 中的数据丢失以及与目标数据库出现的不一致性,建议仅在与高可用 Redis 环境一起使用此配方。在这样的环境中,主节点失败时,替换它的副本将从停止的点继续执行配方。

此外,应使用 Redis 的 AOF 与复制一起使用,以防止系统级故障期间的数据丢失。

监控 RedisGears 函数注册

使用 此工具 监控 RedisGear 的函数注册。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

rgsync-1.2.0.tar.gz (22.3 kB 查看哈希值)

上传时间 源代码

构建分发

rgsync-1.2.0-py3-none-any.whl (21.6 kB 查看哈希值)

上传时间 Python 3

支持