RedisGears同步食谱
项目描述
RGSync
RedisGears的写入后和写入通过食谱
演示
示例
以下是一个RedisGears食谱示例,展示了如何使用写入后模式将Redis哈希表中的数据映射到MySQL表中。该食谱将所有前缀为person:<id>
的Redis哈希表映射到persons
表,其中<id>
是主键并映射到person_id
列。类似地,它将所有前缀为car:<id>
的哈希表映射到cars
表。
from rgsync import RGWriteBehind, RGWriteThrough
from rgsync.Connectors import MySqlConnector, MySqlConnection
'''
Create MySQL connection object
'''
connection = MySqlConnection('demouser', 'Password123!', 'localhost:3306/test')
'''
Create MySQL persons connector
'''
personsConnector = MySqlConnector(connection, 'persons', 'person_id')
personsMappings = {
'first_name':'first',
'last_name':'last',
'age':'age'
}
RGWriteBehind(GB, keysPrefix='person', mappings=personsMappings, connector=personsConnector, name='PersonsWriteBehind', version='99.99.99')
'''
Create MySQL cars connector
'''
carsConnector = MySqlConnector(connection, 'cars', 'car_id')
carsMappings = {
'id':'id',
'color':'color'
}
RGWriteBehind(GB, keysPrefix='car', mappings=carsMappings, connector=carsConnector, name='CarsWriteBehind', version='99.99.99')
运行食谱
您可以使用此实用程序发送RedisGears食谱以执行。例如,运行此存储库中的example.py食谱,并使用以下命令安装其依赖项
gears-cli --host <host> --port <post> --password <password> run example.py REQUIREMENTS rgsync PyMySQL cryptography
食谱操作概述
类 RGWriteBehind()
实现了 写后 策略,主要由两个 RedisGears 函数组成,其操作如下
- 向 Redis 哈希键写入操作会触发 RedisGears 函数的执行。
- 该 RedisGears 函数从哈希中读取数据并将其写入 Redis 流。
- 另一个 RedisGears 函数在后台异步执行,并将更改写入目标数据库。
使用 Redis 流的动机
在 写后 策略实现中使用 Redis 流是为了确保捕获的更改的持久性,同时减轻将它们传输到目标数据库的性能损失。
策略的第一个 RedisGears 函数注册为同步运行,这意味着函数在执行命令的同一个主 Redis 线程中运行。这种执行模式是必要的,以便按顺序记录更改事件,并消除在发生故障时丢失事件的可能性。
将更改应用于目标数据库通常要慢得多,实际上排除了在主线程中执行该操作的可能性。第二个 RedisGears 函数以批量和间隔异步执行以实现这一点。
Redis 流是策略两部分通信的通道,其中更改按顺序同步持久化,并在后台异步处理。
控制要复制的操作
有时您想在不复制到目标的情况下修改 Redis 中的数据。为此,可以通过添加特殊字段 #
到哈希的字段并将它设置为以下值来定制该策略
+
- 添加数据但不复制到目标=
- 添加数据并复制(默认行为)-
- 删除数据但不复制~
- 从 Redis 和目标中删除数据(使用del
命令时的默认行为)
当哈希的值包含 #
字段时,策略将根据其值执行,并在之后从哈希中删除 #
字段。例如,以下显示了如何删除哈希而不复制删除操作
redis> HSET person:1 # -
或者,要添加哈希而不复制它
redis> HSET person:007 first_name James last_name Bond age 42 # +
至少一次和精确一次语义
默认情况下,写后 策略为写入提供了 至少一次 属性,这意味着数据将写入目标一次,但在发生故障的情况下可能会写入多次。
可以通过使用流的消息 ID 作为操作的递增 ID 来使策略提供 精确一次 交付语义。写入 RedisGears 函数可以使用该 ID 并将其记录在目标中的另一个表中,以确保任何给定的 ID 只写入一次。
策略的所有 SQL 连接器都支持此功能。要使用它,您需要向连接器提供“精确一次”表的名字。该表应包含 2 列,id
代表写入者的某些唯一 ID(例如用于区分分片)和 val
是写入到目标的最后一个流 ID。可以通过可选的 exactlyOnceTableName
变量在构造函数中将“精确一次”表的名字指定给连接器。
获取写入确认
可以使用该策略并从目标获取成功写入的确认。按照以下步骤操作
- 为每个更改数据操作生成一个
uuid
。 - 将操作的
uuid
立即添加到特殊#
字段之后,即在+
/=
/-
/~
字符之后。启用写入确认需要使用特殊的#
。 - 执行操作后,执行
XREAD BLOCK <timeout> STREAMS {<hash key>}<uuid> 0-0
。一旦配方写入目标,它将在该 ({<hash key>}<uuid>
) 流中创建一条消息,该消息只有一个名为 'status' 的字段,其值为 'done'。 - 出于维护目的,建议在收到确认后删除该流。但这并非必须,因为这些流是使用一小时TTL创建的。
确认示例
127.0.0.1:6379> hset person:007 first_name James last_name Bond age 42 # =6ce0c902-30c2-4ac9-8342-2f04fb359a94
(integer) 1
127.0.0.1:6379> XREAD BLOCK 2000 STREAMS {person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94 0-0
1) 1) "{person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94"
2) 1) 1) "1581927201056-0"
2) 1) "status"
2) "done"
写入通过
写入通过 是通过使用临时键完成的。配方将注册该键的变化并将其写入目标。写入目标是在服务器的主线程中以同步模式执行的,这意味着服务器将在此期间被阻塞,客户端将无法在完成前收到回复。
将更改写入目标可能会成功或失败。如果成功,配方将重命名临时键为其预期的最终名称。失败将阻止重命名。在两种情况下,都将删除临时键。
确认流的语义与 写入后 几乎相同。唯一的不同在于消息的结构。失败的写入将在该 ({<hash key>}<uuid>
) 流中创建一个消息,该消息包含
- 一个值为 'failed' 的 'status' 字段
- 一个包含错误描述的 'error' 字段
请注意,当使用 写入通过 时,必须提供 uuid
并读取确认流。这是唯一能够判断写入是否成功的方法。
写入通过 使用 RGWriteThrough
类进行注册
RGWriteThrough(GB, keysPrefix, mappings, connector, name, version)
keysPrefix
参数是要触发写入的键的前缀。临时键的名称将采用以下格式
<keysPrefix>{<realKeyName>}
成功后,键将被重命名为 <realKeyName>
。
写入目标时的任何失败都将导致配方中止。在这种情况下,临时键不会被重命名并被删除。
请注意,在某些情况下,例如连接失败,可能无法判断目标上的操作是成功还是失败。配方将这些情况视为失败,尽管实际上写入可能已成功。
示例
这些示例假设将 keysPrefix
设置为 "__"。第一个显示成功写入
127.0.0.1:6379> HSET __{person:1} first_name foo last_name bar age 20 # =6ce0c902-30c2-4ac9-8342-2f04fb359a94
(integer) 4
127.0.0.1:6379> XREAD BLOCK 2000 STREAMS {person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94 0-0
1) 1) "{person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94"
2) 1) 1) "1583321726502-0"
2) 1) "status"
2) "done"
127.0.0.1:6379> HGETALL person:1
1) "age"
2) "20"
3) "last_name"
4) "bar"
5) "first_name"
6) "foo"
一个失败的 写入通过 示例
127.0.0.1:6379> HSET __{person:1} first_name foo last_name bar age 20 # =6ce0c902-30c2-4ac9-8342-2f04fb359a94
(integer) 4
127.0.0.1:6379> XREAD BLOCK 2000 STREAMS {person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94 0-0
1) 1) "{person:1}6ce0c902-30c2-4ac9-8342-2f04fb359a94"
2) 1) 1) "1583322141455-0"
2) 1) "status"
2) "failed"
3) "error"
4) "Failed connecting to SQL database, error=\"(pymysql.err.OperationalError) (2003, \"Can't connect to MySQL server on 'localhost' ([Errno 111] Connection refused)\")\n(Background on this error at: http://sqlalche.me/e/e3q8)\""
速度提升
为了提高写入通过更新的速度,用户应考虑为写入通过数据库添加索引。此索引将基于包含要复制的 redis 键 ID 的列创建。使用上面的示例,将创建一个 person_id 列,无论选择哪种后端数据库进行写入通过。因此,根据您的数据量和架构,在 person_id 列上创建索引可能是明智的。
数据持久性和可用性
为了避免 Redis 中的数据丢失以及与目标数据库出现的不一致性,建议仅在与高可用 Redis 环境一起使用此配方。在这样的环境中,主节点失败时,替换它的副本将从停止的点继续执行配方。
此外,应使用 Redis 的 AOF 与复制一起使用,以防止系统级故障期间的数据丢失。
监控 RedisGears 函数注册
使用 此工具 监控 RedisGear 的函数注册。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。