基于Redis的分布式异步速率限制器
项目描述
自我限制器
一个用于根据 并发性 或 时间 调节流量的库。
它实现了一个 信号量,用于在需要限制对API(或其他资源)的并发请求数量时使用。例如,如果您只能发送5个并发请求。
它还实现了 令牌桶算法,可用于限制在给定时间间隔内发出的请求数量。例如,如果您被限制为每秒10个请求。
这两个限制器都是异步的、先入先出(FIFO)和通过Redis进行分布式。您可能只需在需要分布式队列时使用此功能。
此库旨在实现速率限制,但信号量和令牌桶的实现可用于任何目的。
安装
pip install self-limiters
用法
这两种实现都编写为异步上下文管理器。
信号量
可以使用以下方式使用 Semaphore
from self_limiters import Semaphore
# 5 requests at the time
async with Semaphore(name="", capacity=5, max_sleep=60, redis_url=""):
client.get(...)
底层使用 blpop
来等待信号量被释放,这是一个非阻塞操作。
如果您指定了非零的 max_sleep
,如果 blpop
等待的时间超过指定值,将引发 MaxSleepExceededError
。
令牌桶
与之前一样,使用 TokenBucket
上下文管理器,如下所示
from self_limiters import TokenBucket
# 1 requests per minute
async with TokenBucket(
name="",
capacity=1,
refill_amount=1,
refill_frequency=60,
max_sleep=600,
redis_url=""
):
client.get(...)
限制器首先估计桶中有容量的时间,即当它是这个实例的轮到时,然后异步休眠直到那时。
如果设置了 max_sleep
且估计的休眠时间超过这个值,将立即引发 MaxSleepExceededError
。
作为装饰器
该软件包不包含任何装饰器,但如果您想限制整个函数运行的速率,可以创建自己的装饰器,如下所示
from self_limiters import Semaphore
# Define a decorator function
def limit(name, capacity):
def middle(f):
async def inner(*args, **kwargs):
async with Semaphore(
name=name,
capacity=capacity,
redis_url="redis://127.0.0.1:6389"
):
return await f(*args, **kwargs)
return inner
return middle
# Then pass the relevant limiter arguments like this
@limit(name="foo", capacity=5)
def fetch_foo(id: UUID) -> Foo:
实现和性能分析
这个库是用 Rust(为了好玩)编写的,并依赖于 Lua 脚本和 pipelining 来提高每个实现的性能。
Redis 允许用户直接在服务器上上传和执行 Lua 脚本,这意味着我们可以用 Lua 编写例如整个令牌桶逻辑。这带来了几个优点
-
由于它们是在 redis 实例上执行的,我们可以对 redis 发出 1 个请求,而原本需要发出 3 或 4 个请求。减少请求数量所节省的时间非常显著。
-
Redis 是单线程的,并保证脚本的原子执行,这意味着我们不需要担心数据竞争。在之前的迭代中,当我们需要发出 4 个请求来估计令牌桶实例的唤醒时间时,我们曾需要使用 redlock 算法来确保公平性。使用 Lua 脚本后,我们的实现是先入先出(FIFO)的。
总结来说,它们使我们实现更快,因为我们节省了几个往返服务器的时间和资源,并且我们不再需要锁,分布式锁很昂贵。同时,它们使代码变得更加简单。
这就是每个实现最终的样子
信号量实现
-
运行一个 Lua 脚本 来在 redis 中创建一个列表数据结构,这是信号量的基础。
此脚本具有幂等性,如果已创建则跳过。
-
运行
BLPOP
以非阻塞方式等待信号量有容量,并在有容量时弹出列表。 -
然后运行一个 pipelined 命令 通过添加容量来释放信号量。
所以总共我们向 redis 发出了 3 次调用,而没有脚本我们将发出 6 次,这些都是非阻塞的。
令牌桶实现
令牌桶实现更简单。步骤如下
- 运行一个 Lua 脚本 来估计并返回一个唤醒时间。
- 休眠直到给定的时间戳。
我们发出 1 次调用而不是 3 次,然后休眠。这两者都是非阻塞的。
换句话说,大部分时间都是在非阻塞方式下等待,这意味着限制器对应用程序事件循环的影响几乎可以忽略不计。
基准测试
我们在 CI 中使用 Github actions 运行基准测试。对于正常的 ubuntu-latest
运行器,我们看到两个限制器的运行时间
同时创建每个实现的 100 个实例并调用它们时,平均运行时间是
- 信号量实现:每个实例 ~0.6ms
- 令牌桶实现:每个实例 ~0.03ms
如果您想运行自己的测试,请查看 基准测试脚本!
实现参考
信号量实现
信号量实现非常有用,当你需要限制一个进程执行 n
个并发操作时。例如,如果你有多个Web服务器,并且你正在与一个API交互,该API只能容忍一定数量的并发请求,超过这个数量就会将你锁定。
流程可以分解如下
最初的 Lua 脚本 首先检查我们将要构建信号量的 Redis 列表是否存在。它通过在队列键上调用 SETNX
来完成这项工作,并添加一个后缀(如果类实例化中指定的 name
是 "my-queue",则队列名称将是 __self-limiters:my-queue
,并且会为 __self-limiters:my-queue-exists
调用 setnx)。如果返回的值是 1,这意味着我们用作信号量的队列尚不存在,需要创建。
可能你会觉得有点奇怪,仅仅为了指示一个列表是否存在,就要维护一个单独的值,当我们可以直接检查列表本身。如果我们能直接在列表上使用 EXISTS
,那会很好,但不幸的是,当所有元素都被弹出(即,当信号量完全获取时),列表被认为是存在的,所以我看不到其他的方法来做这件事。如果你有其他方法,欢迎贡献!
然后如果需要创建队列,我们调用 RPUSH
,其参数个数等于初始化信号量实例时使用的 capacity
值。对于一个容量为 5 的信号量,我们调用 RPUSH 1 1 1 1 1
,其中值是任意指定的。
一旦创建了列表/队列,我们就通过 BLPOP
来阻塞,直到轮到我们。默认情况下,BLPOP
是先入先出(FIFO)。我们还确保根据初始化的信号量实例设置指定 max_sleep
。如果没有传递任何内容,我们将允许无限期地睡眠。
在 __aexit__
中,我们通过管道查询运行三个命令。我们将 1
放回队列中,以“释放”信号量,并设置队列和我们在其中调用 SETNX
的字符串值的过期时间。
这些过期时间是处理已丢弃容量的折衷方案。如果一个持有信号量的节点死亡,容量可能永远无法返回。然而,如果没有人在过期时间内使用信号量,所有值都将被清除,并在下次使用时以完整容量重新创建信号量。截至写作时,过期时间为 30 秒,但可以使其可配置。
令牌桶实现
令牌桶实现适用于当你需要通过时间间隔限制进程时。例如,每分钟 1 个请求,或者每 10 秒 50 个请求。
实现具有前瞻性。它计算出给定客户端在桶中可能存在容量的时间,并返回该时间。从那里,我们可以异步睡眠,直到执行速率限制操作的时间。
流程可以分解如下
调用 schedule Lua 脚本,该脚本首先 GET
桶的 状态。
桶状态包含最后一个预定的时间槽和该时间槽剩余的令牌数。对于一个容量为 1 的桶,拥有一个 tokens_left_for_slot
变量没有意义,但如果容量为 2 或更多,我们可能需要将多个客户端预定到相同的时间槽。
然后脚本计算出是否要减少 tokens_left_for_slot
值,或者根据频率变量增加时间槽值。
最后,我们再次使用SETEX
([Redis命令详情](https://redis.ac.cn/commands/setex/))来存储桶的状态。这允许我们同时存储状态和设置过期时间。默认过期时间为30秒,但可以设置为可配置的。
需要注意的是,如果不是因为Redis是单线程的,这个功能将无法正常工作。因此,Redis上的Lua脚本采用先进先出的顺序。如果没有这个特性,我们需要使用锁和更多的逻辑。
然后我们只是简单地休眠!
贡献
请随时贡献!对实现、问题和PR的反馈都受欢迎。更多详情请参阅CONTRIBUTING.md
([链接](https://github.com/sondrelg/self-limiters/blob/main/CONTRIBUTING.md))。
请考虑为仓库点赞以增加其可见性。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分发
构建分发
self_limiters-0.4.0-cp38-abi3-win_amd64.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 03087dcb2194e12d36a6ed880f0af61af5a8aeb20684797c1bf8105bb0a5701d |
|
MD5 | bff1845d40e84f457fa6becb71f66f5c |
|
BLAKE2b-256 | 1dc878dd50b1e30ce0b96197dfbfd0adde469bb27afbd9f35fa83ea1a840319f |
self_limiters-0.4.0-cp38-abi3-win32.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 14e3f7bdf1e4c5a1365f8355a19d44f430c3f1e8f6ca16ee76f1062cdff42ec4 |
|
MD5 | bb7632fe9075b902a0907a14b0a75fa2 |
|
BLAKE2b-256 | 2dde096c7b31a4f19fc6bc2caf2772332a0111020e0fd7534b29ddf132d7888b |
self_limiters-0.4.0-cp38-abi3-musllinux_1_1_x86_64.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f2a10118f6385b1229b03a3adb11092ed219847b0d1eca8610b302c143d07856 |
|
MD5 | 074fb79cda0811be2b486ef519c13e30 |
|
BLAKE2b-256 | 6bd31c10679830447231718b1f69be243246393731c9e20f0736c97ed71c354f |
self_limiters-0.4.0-cp38-abi3-musllinux_1_1_aarch64.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 886109104835794a09d23a52acf5ff09d4b1203f2fad489b41f4cc51b90966f9 |
|
MD5 | 026e15d11133768847258fbc811c0202 |
|
BLAKE2b-256 | b25cb7f86446ced409443026c9857eb2a8b262af14e91c1bba0ca40a1c6e6441 |
self_limiters-0.4.0-cp38-abi3-manylinux_2_24_armv7l.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | cef557a0e645cec993d512b60053df7de646ae63d7b93457f3576daa9cda1d8a |
|
MD5 | a6dfaa8a44cb5780a6b494357763b544 |
|
BLAKE2b-256 | 2eaaf238b2d94c568e40c6ff09cff20eb612dcbbf2f17890424e5297b716e7e0 |
self_limiters-0.4.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a7d556fc3eb3ae0419e4dbd09b5ca083011e9fd6e7fb236466cb35bee90f186c |
|
MD5 | 87799b59f1b3fdbce50c77705de19f89 |
|
BLAKE2b-256 | 62d8db448b00143b42020d106e12080fb9343a5fe939b13a5ff1ef9fa394ce4f |
self_limiters-0.4.0-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 18a4bdfc5f6f173cce724c495e892c536e5d21d060409ba7fefbcc7f91512f88 |
|
MD5 | 2706d9349590f36a63c9c78b1ec9e15d |
|
BLAKE2b-256 | 12525c8817d7169481986478ddeddfbe77cefd7f8f8012043c213dd3ea755142 |
哈希值,适用于 self_limiters-0.4.0-cp38-abi3-manylinux_2_12_i686.manylinux2010_i686.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | db0d109cd5fe6181a6f4df4212102039ef712b54d824684d4b6f0b18cb152731 |
|
MD5 | f13a09aff20be5c3878bac829bdd84e7 |
|
BLAKE2b-256 | 5fdd176a05618c625378ab20074a3fc127b42d114f5b957efa18abd3d90f8b4d |
哈希值,适用于 self_limiters-0.4.0-cp38-abi3-macosx_11_0_arm64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 0a3d024f08a9a2a4c4e7b6cafe41f6e6f1beeeedecba2f2b4b72a38b5a09d6e8 |
|
MD5 | 5f04c78ceba9402d343076295ab682ef |
|
BLAKE2b-256 | 760d311030246413c2b1ef1312b644168028f575bc08e71ca3b6d5edd9702c52 |
哈希值,适用于 self_limiters-0.4.0-cp38-abi3-macosx_10_7_x86_64.whl
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 6a5dfe25550847ca8c610709ede213e3b636004fd11092e201b9b12f15fc11e0 |
|
MD5 | e831e43b21c6cc4fbc860ca3ecf731b2 |
|
BLAKE2b-256 | cdcb531a3b8b194e06b0a9dda5e0f84cf03b2d33cbf17c30c9b45f1c9a48439b |