跳转到主要内容

为py3.5+ asyncio提供节流工具。

项目描述

asyncioasync/await 是相当棒的工具!异步操作!在 Python!不疯狂!

嗯,不幸的是,鉴于这种编程风格的本性,有时候并不容易。

嗯,这个项目至少提供了一个工具,以相对简单的方式帮助您。

asyncio_throttler 是为 Python 3.5+ 设计的节流系统,考虑到 aiohttp 节流,但设计得足够灵活,可以处理大多数节流和速率限制需求。

用法

嗯,获取它

$ pyvenv env/
$ . env/bin/activate
$ pip install asyncio_throttler # Pin the damned version in setup.py
                                # you bloody savage.

如果您没有 Python 3.5,请查看 pyenv(不要与 pyvenv 混淆),一个类似于 rbenv 的 Python 版本管理器。

如果您真的不想使用 pyenv, brew install python3 将以非破坏性的方式安装 Python 3.5+。

无论如何,这是我在开发过程中写的糟糕的使用示例。代码有良好的文档,简洁,希望人类容易理解,但应该能让您入门。我会写更好的文档,我保证。我必须睡觉。

警告:这将永远不会完成,节流错误将永远弹跳。故意为之。为了演示事情不会丢失。真的。

"""Dump test module I built while writing this thing. Need to make real tests,
but whatcha gonna do ya got schedules and stuff amirite?

"""
import logging
import asyncio
from asyncio_throttler import Throttler, ThrottleException

# Demonstrates that windowing, throttling, and every other known feature
# works, I think.
if __name__ == '__main__':
    logger = logging.getLogger('testthrottler')
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    format_template = '%(asctime)s:%(name)s:%(levelname)s%(message)s'
    handler.setFormatter(logging.Formatter(fmt=format_template,
                                           datefmt='%Y-%m-%d %H:%M:%S'))
    logger.addHandler(handler)

    async def dummy_consumer(item):
        print("Item received:", item)
        await asyncio.sleep(2)

    import random
    async def dummy_task():
        logger.info("Executed")
        return await asyncio.sleep(1, random.randrange(1, 1000))

    async def bad_dummy_task():
        logger.info("Executed and gonna throw a throttle")
        raise ThrottleException(bad_dummy_task())

    loop = asyncio.get_event_loop()

    # roflcoptr
    todo_list = [dummy_task() for _ in range(1, 31)]
    todo_list.append(bad_dummy_task())
    todo_list = todo_list + [dummy_task() for _ in range(1, 31)]

    throttler = Throttler(
        todo_list,
        dummy_consumer,
        time_window=10,
        per_time_window=20,
        concurrency=5,
        log_handler=logging.StreamHandler(),
        log_level=logging.DEBUG,
        loop=loop
    )

    loop.run_until_complete(throttler.run())
    loop.close()

它是如何工作的

使用一系列可等待的、异步函数和许多关键字参数调整以适应您目的的参数实例化一个节流器。

内部有两个 asyncio.Queue 对象和一个 asyncio.LifoQueue

  • exceptions 是我们捕获的非节流异常的队列。

  • processed 是一个用于处理输出的 Queue。这是消费者将从其中消费的内容。

  • todo 是一个 LifoQueue,用于存放未处理的任务列表。它最初从传递给 Throttler 的列表的逆序中获取,这是一个快速迭代器。它采用 LIFO 的方式,以便我们可以将节流的项从前面弹出。

几个内部函数组合在一起,创建了一个异步生产者和消费者循环,其中项目将尽可能快地处理,同时考虑到在 Throttler 实例化时施加的限制。当节流时,它会退避 time_window 时间,一次仅执行 concurrency 个任务,并在触发 per_time_window 个项目的处理后等待 time_window 时间。

这应该涵盖了几个情况……

无论如何,你传递的异步 consumer_fn 将在结果可用时立即执行,用于写入磁盘或其他。

注意

这很痛苦。

项目详情


下载文件

下载适用于您平台的项目文件。如果您不确定要选择哪个,请了解有关 安装包 的更多信息。

源分布

asyncio_throttler-0.1.0.tar.gz (5.9 kB 查看散列)

上传时间

构建分布

asyncio_throttler-0.1.0-py3-none-any.whl (8.9 kB 查看散列)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面