跳转到主要内容

基于Redis的分布式异步速率限制器

项目描述

PyPI test status coverage python version

自我限制器

一个用于根据 并发性时间 调节流量的库。

它实现了一个 信号量,用于在需要限制对API(或其他资源)的并发请求数量时使用。例如,如果您只能发送5个并发请求。

它还实现了 令牌桶算法,可用于限制在给定时间间隔内发出的请求数量。例如,如果您被限制为每秒10个请求。

这两个限制器都是异步的、先入先出(FIFO)和通过Redis进行分布式。您可能只需在需要分布式队列时使用此功能。

此库旨在实现速率限制,但信号量和令牌桶的实现可用于任何目的。

安装

pip install self-limiters

用法

这两种实现都编写为异步上下文管理器。

信号量

可以使用以下方式使用 Semaphore

from self_limiters import Semaphore


# 5 requests at the time
async with Semaphore(name="", capacity=5, max_sleep=60, redis_url=""):
      client.get(...)

底层使用 blpop 来等待信号量被释放,这是一个非阻塞操作。

如果您指定了非零的 max_sleep,如果 blpop 等待的时间超过指定值,将引发 MaxSleepExceededError

令牌桶

与之前一样,使用 TokenBucket 上下文管理器,如下所示

from self_limiters import TokenBucket


# 1 requests per minute
async with TokenBucket(
        name="",
        capacity=1,
        refill_amount=1,
        refill_frequency=60,
        max_sleep=600,
        redis_url=""
):
    client.get(...)

限制器首先估计桶中有容量的时间,即当它是这个实例的轮到时,然后异步休眠直到那时。

如果设置了 max_sleep 且估计的休眠时间超过这个值,将立即引发 MaxSleepExceededError

作为装饰器

该软件包不包含任何装饰器,但如果您想限制整个函数运行的速率,可以创建自己的装饰器,如下所示

from self_limiters import Semaphore


# Define a decorator function
def limit(name, capacity):
  def middle(f):
    async def inner(*args, **kwargs):
      async with Semaphore(
              name=name,
              capacity=capacity,
              redis_url="redis://127.0.0.1:6389"
      ):
        return await f(*args, **kwargs)
    return inner
  return middle


# Then pass the relevant limiter arguments like this
@limit(name="foo", capacity=5)
def fetch_foo(id: UUID) -> Foo:

实现和性能分析

这个库是用 Rust(为了好玩)编写的,并依赖于 Lua 脚本和 pipelining 来提高每个实现的性能。

Redis 允许用户直接在服务器上上传和执行 Lua 脚本,这意味着我们可以用 Lua 编写例如整个令牌桶逻辑。这带来了几个优点

  • 由于它们是在 redis 实例上执行的,我们可以对 redis 发出 1 个请求,而原本需要发出 3 或 4 个请求。减少请求数量所节省的时间非常显著。

  • Redis 是单线程的,并保证脚本的原子执行,这意味着我们不需要担心数据竞争。在之前的迭代中,当我们需要发出 4 个请求来估计令牌桶实例的唤醒时间时,我们曾需要使用 redlock 算法来确保公平性。使用 Lua 脚本后,我们的实现是先入先出(FIFO)的。

总结来说,它们使我们实现更快,因为我们节省了几个往返服务器的时间和资源,并且我们不再需要锁,分布式锁很昂贵。同时,它们使代码变得更加简单。

这就是每个实现最终的样子

信号量实现

  1. 运行一个 Lua 脚本 来在 redis 中创建一个列表数据结构,这是信号量的基础。

    此脚本具有幂等性,如果已创建则跳过。

  2. 运行 BLPOP 以非阻塞方式等待信号量有容量,并在有容量时弹出列表。

  3. 然后运行一个 pipelined 命令 通过添加容量来释放信号量。

所以总共我们向 redis 发出了 3 次调用,而没有脚本我们将发出 6 次,这些都是非阻塞的。

令牌桶实现

令牌桶实现更简单。步骤如下

  1. 运行一个 Lua 脚本 来估计并返回一个唤醒时间。
  2. 休眠直到给定的时间戳。

我们发出 1 次调用而不是 3 次,然后休眠。这两者都是非阻塞的。

换句话说,大部分时间都是在非阻塞方式下等待,这意味着限制器对应用程序事件循环的影响几乎可以忽略不计。

基准测试

我们在 CI 中使用 Github actions 运行基准测试。对于正常的 ubuntu-latest 运行器,我们看到两个限制器的运行时间

同时创建每个实现的 100 个实例并调用它们时,平均运行时间是

  • 信号量实现:每个实例 ~0.6ms
  • 令牌桶实现:每个实例 ~0.03ms

如果您想运行自己的测试,请查看 基准测试脚本

实现参考

信号量实现

信号量实现非常有用,当你需要限制一个进程执行 n 个并发操作时。例如,如果你有多个Web服务器,并且你正在与一个API交互,该API只能容忍一定数量的并发请求,超过这个数量就会将你锁定。

流程可以分解如下

最初的 Lua 脚本 首先检查我们将要构建信号量的 Redis 列表是否存在。它通过在队列键上调用 SETNX 来完成这项工作,并添加一个后缀(如果类实例化中指定的 name 是 "my-queue",则队列名称将是 __self-limiters:my-queue,并且会为 __self-limiters:my-queue-exists 调用 setnx)。如果返回的值是 1,这意味着我们用作信号量的队列尚不存在,需要创建。

可能你会觉得有点奇怪,仅仅为了指示一个列表是否存在,就要维护一个单独的值,当我们可以直接检查列表本身。如果我们能直接在列表上使用 EXISTS,那会很好,但不幸的是,当所有元素都被弹出(即,当信号量完全获取时),列表被认为是存在的,所以我看不到其他的方法来做这件事。如果你有其他方法,欢迎贡献!

然后如果需要创建队列,我们调用 RPUSH,其参数个数等于初始化信号量实例时使用的 capacity 值。对于一个容量为 5 的信号量,我们调用 RPUSH 1 1 1 1 1,其中值是任意指定的。

一旦创建了列表/队列,我们就通过 BLPOP 来阻塞,直到轮到我们。默认情况下,BLPOP 是先入先出(FIFO)。我们还确保根据初始化的信号量实例设置指定 max_sleep。如果没有传递任何内容,我们将允许无限期地睡眠。

__aexit__ 中,我们通过管道查询运行三个命令。我们将 1 放回队列中,以“释放”信号量,并设置队列和我们在其中调用 SETNX 的字符串值的过期时间。

这些过期时间是处理已丢弃容量的折衷方案。如果一个持有信号量的节点死亡,容量可能永远无法返回。然而,如果没有人在过期时间内使用信号量,所有值都将被清除,并在下次使用时以完整容量重新创建信号量。截至写作时,过期时间为 30 秒,但可以使其可配置。

令牌桶实现

令牌桶实现适用于当你需要通过时间间隔限制进程时。例如,每分钟 1 个请求,或者每 10 秒 50 个请求。

实现具有前瞻性。它计算出给定客户端在桶中可能存在容量的时间,并返回该时间。从那里,我们可以异步睡眠,直到执行速率限制操作的时间。

流程可以分解如下

调用 schedule Lua 脚本,该脚本首先 GET 桶的 状态

桶状态包含最后一个预定的时间槽和该时间槽剩余的令牌数。对于一个容量为 1 的桶,拥有一个 tokens_left_for_slot 变量没有意义,但如果容量为 2 或更多,我们可能需要将多个客户端预定到相同的时间槽。

然后脚本计算出是否要减少 tokens_left_for_slot 值,或者根据频率变量增加时间槽值。

最后,我们再次使用SETEX([Redis命令详情](https://redis.ac.cn/commands/setex/))来存储桶的状态。这允许我们同时存储状态和设置过期时间。默认过期时间为30秒,但可以设置为可配置的。

需要注意的是,如果不是因为Redis是单线程的,这个功能将无法正常工作。因此,Redis上的Lua脚本采用先进先出的顺序。如果没有这个特性,我们需要使用锁和更多的逻辑。

然后我们只是简单地休眠!

贡献

请随时贡献!对实现、问题和PR的反馈都受欢迎。更多详情请参阅CONTRIBUTING.md([链接](https://github.com/sondrelg/self-limiters/blob/main/CONTRIBUTING.md))。

请考虑为仓库点赞以增加其可见性。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

此版本没有提供源分发文件。请参阅生成分发存档的教程

构建分发

self_limiters-0.4.0-cp38-abi3-win_amd64.whl (613.3 kB 查看哈希值)

上传时间 CPython 3.8+ Windows x86-64

self_limiters-0.4.0-cp38-abi3-win32.whl (584.5 kB 查看哈希值)

上传时间 CPython 3.8+ Windows x86

self_limiters-0.4.0-cp38-abi3-musllinux_1_1_x86_64.whl (914.0 kB 查看哈希值)

上传时间 CPython 3.8+ musllinux: musl 1.1+ x86-64

self_limiters-0.4.0-cp38-abi3-musllinux_1_1_aarch64.whl (874.9 kB 查看哈希值)

上传时间 CPython 3.8+ musllinux: musl 1.1+ ARM64

self_limiters-0.4.0-cp38-abi3-manylinux_2_24_armv7l.whl (664.7 kB 查看哈希值)

上传时间 CPython 3.8+ manylinux: glibc 2.24+ ARMv7l

self_limiters-0.4.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (742.4 kB 查看哈希值)

上传时间 CPython 3.8+ manylinux: glibc 2.17+ x86-64

self_limiters-0.4.0-cp38-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (698.4 kB 查看哈希值)

上传时间: CPython 3.8+ manylinux: glibc 2.17+ ARM64

self_limiters-0.4.0-cp38-abi3-manylinux_2_12_i686.manylinux2010_i686.whl (803.5 kB 查看哈希值)

上传时间: CPython 3.8+ manylinux: glibc 2.12+ i686

self_limiters-0.4.0-cp38-abi3-macosx_11_0_arm64.whl (677.3 kB 查看哈希值)

上传时间: CPython 3.8+ macOS 11.0+ ARM64

self_limiters-0.4.0-cp38-abi3-macosx_10_7_x86_64.whl (655.9 kB 查看哈希值)

上传时间: CPython 3.8+ macOS 10.7+ x86-64

支持