跳转到主要内容

防护并发的concurrent.futures

项目描述

Futureproof - 防护并发的concurrent.futures

Documentation Status Build Status Supported Python Versions PyPI codecov Read the Docs shoutouts.dev

concurrent.futures非常棒,但它有一些锋利的边缘,过去我多次被它咬到。

Futureproof是对其的一个薄包装,解决了这些问题中的一些,并增加了一些可用性功能。

特性

  • 监控:默认记录最近完成的任务的摘要。
  • 快速失败:错误会导致主线程抛出异常并默认停止。
  • 错误策略:用户可以决定是否在任务上抛出、记录或完全忽略错误。
  • 背压控制:随着执行器完成任务,大量任务被懒加载消费,大大减少了内存消耗并提高了这些情况下的响应性。

当前状态:Alpha

API可能会更改,任何更改都将记录在变更日志中。

Futureproof被设计用来包装ThreadPoolExecutor,然而版本0.2+包括对ProcessPoolExecutor的有限支持,但仅限于Python3.7+。

concurrent.futures问题?

让我们看看ThreadPoolExecutor的典范示例。

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain-that-definitely-does-not-exist.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

再次强调,这真是太棒了,多线程的入门门槛这么低,确实是Brian Quinlan和核心Python开发者们出色工作的证明。

然而,我看到了这个方法中的两个问题

  1. 模板代码。我们需要进入上下文管理器,手动调用submit并跟踪future及其参数,调用as_completed(它实际上返回一个迭代器),在future上调用result并记得处理异常。
  2. 这很令人惊讶。为什么我们需要获取结果才能抛出异常?如果我们不期望它抛出异常呢?我们可能想尽快知道。

如果你运行这段代码,你将得到以下输出(在撰写本文时)

'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.foxnews.com/' page is 248838 bytes
'http://www.bbc.co.uk/' page is 338658 bytes
'http://www.cnn.com/' page is 991167 bytes
'http://europe.wsj.com/' page is 970346 bytes

这是完美的。futureproof又是如何比较的呢?

executor = futureproof.FutureProofExecutor(max_workers=5)
with futureproof.TaskManager(executor) as tm:
    for url in URLS:
        tm.submit(load_url, url, 60)
    for task in tm.as_completed():
        print("%r page is %d bytes" % (task.args[0], len(task.result)))

看起来非常相似,有一个执行器和一个任务管理器submitas_completed是它的方法,并且没有try..except。如果我们运行它,我们会得到

'http://www.foxnews.com/' page is 248838 bytes
Traceback (most recent call last):
  File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1317, in do_open
    encode_chunked=req.has_header('Transfer-encoding'))
  ... omitted traceback output ...
socket.gaierror: [Errno 8] nodename nor servname provided, or not known

注意futureproof立即抛出了异常,并且一切停止了,正如你在普通的非线程Python中预期的那样,没有惊喜。

如果我们更喜欢futureproof,它提供了使用错误策略记录或忽略异常的选项。比如说我们想要记录异常

logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s %(thread)s] %(message)s",
    datefmt="%H:%M:%S",
)

executor = futureproof.FutureProofExecutor(max_workers=5)
with futureproof.TaskManager(executor, error_policy="log") as tm:
    for url in URLS:
        tm.submit(load_url, url, 60)

    for task in tm.as_completed():
        if not isinstance(task.result, Exception):
            print("%r page is %d bytes" % (task.args[0], len(task.result)))

注意我们添加了一个检查,只有在它不是异常的情况下才打印结果,这输出了

'http://www.foxnews.com/' page is 251088 bytes
[12:09:15 4350641600] Task Task(fn=<function load_url at 0x1029ef1e0>, args=('http://some-made-up-domain-that-definitely-does-not-exist.com/', 60), kwargs={}, result=URLError(gaierror(8, 'nodename nor servname provided, or not known')),
 complete=True) raised an exception
Traceback (most recent call last):
  File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1317, in do_open
    encode_chunked=req.has_header('Transfer-encoding'))
    ... omitted long traceback ...
  File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1319, in do_open
    raise URLError(err)
urllib.error.URLError: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.bbc.co.uk/' page is 339087 bytes
'http://www.cnn.com/' page is 991167 bytes
[12:09:16 123145404444672] 5 task completed in the last 1.18 second(s)
'http://europe.wsj.com/' page is 970880 bytes

注意我们只需要配置日志和传递适当的错误策略,其他的一切都由我们代劳。你也可以选择完全忽略异常并自己管理它们,通过访问result,这是使用concurrent.futures时的工作流程。

as_completed?

如果你想过,为什么我们需要as_completed

答案是用于监控和错误处理。

如果我们有成千上万的URL,你不想等待所有URL返回后才显示输出,这可能会花费很长时间。但这实际上只是增加了代码的复杂性。如果你不使用as_completed,示例会是什么样子呢?

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}

for future, url in future_to_url.items():
    try:
        data = future.result()
    except Exception as exc:
        print("%r generated an exception: %s" % (url, exc))
    else:
        print("%r page is %d bytes" % (url, len(data)))

这可以说是更易读的,然而,有一个细微的区别:没有输出,直到所有的future都完成。如果你想象任务会花费更长的时间,你可能会怀疑事情是否真的在进行中。

让我们比较一下futureproof版本

executor = futureproof.FutureProofExecutor(max_workers=5)
with futureproof.TaskManager(executor, error_policy="ignore") as tm:
    for url in URLS:
        tm.submit(load_url, url, 60)

for task in tm.completed_tasks:
    if isinstance(task.result, Exception):
        print("%r generated an exception: %s" % (task.args[0], task.result))
    else:
        print("%r page is %d bytes" % (task.args[0], len(task.result)))
[12:40:28 123145393414144] Starting executor monitor
[12:40:29 123145393414144] 5 task completed in the last 1.01 second(s)
[12:40:29 123145393414144] Shutting down monitor...
'http://www.foxnews.com/' page is 252016 bytes
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.cnn.com/' page is 992648 bytes
'http://www.bbc.co.uk/' page is 338987 bytes
'http://europe.wsj.com/' page is 969285 bytes

futureproof默认将任务的监控信息记录在日志中,这样你总能知道事情是否在进行中。注意任务管理器暴露了completed_tasks,这使得可以轻松访问结果,而无需手动跟踪future。最后,正如之前提到的,你还可以完全控制异常处理,因此你也不需要添加相关代码。

这并不是什么大问题...

确实,这些问题相当小,我们可以通过使用concurrent.futures手动解决。但随着工作量的增加,一个问题会悄悄出现。

在底层,concurrent.futures使用队列来存储工作,包括函数及其参数。它一开始就会这样做,对于所有的工作,这意味着在大量工作的情况下,队列会变得非常大,主线程可能会挂起并变得无响应。

一个简单的例子

def custom_sum(a, b):
    time.sleep(0.1)
    return a + b


with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
    fn = partial(custom_sum, b=1)
    ex.map(fn, range(1_000_000_000))

运行这个简单的函数一亿次会导致CPU达到最大值,内存使用量将大幅增加。再次强调,这一切都没有任何日志或输出,而且,更糟糕的是,键盘中断不会立即退出,你将不得不反复按它,迫使线程在不干净的状态下退出。

让我们比较一下futureproof

# same function as before
ex = futureproof.ThreadPoolExecutor(max_workers=2)
with futureproof.TaskManager(
    ex, error_policy=futureproof.ErrorPolicyEnum.RAISE
) as tm:
    fn = partial(custom_sum, b=1)
    tm.map(fn, range(1_000_000_000))

几乎立即,你会看到这样的日志:

[15:15:21 4632346048] Starting backpressure test with 1,000,000,000 tasks
[15:15:21 4632346048] You may KeyboardInterrupt at any point and the executor will stop almost immediately
[15:15:21 123145413025792] Starting executor monitor
[15:15:22 123145413025792] 2 task completed in the last 0.20 second(s)
[15:15:22 123145413025792] 1 task completed in the last 0.10 second(s)
[15:15:22 123145413025792] 2 task completed in the last 0.21 second(s)
...

在任何时候,单个键盘中断都会停止进程。

[15:15:24 123145413025792] 2 task completed in the last 0.20 second(s)
^CTraceback (most recent call last):
  File "examples/backpressure.py", line 64, in <module>
    with_futureproof()
  File "examples/backpressure.py", line 40, in with_futureproof
    tm.map(fn, range(1_000_000_000))
  File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 65, in __exit__
    self.run()
  File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 93, in run
    for _ in self.as_completed():
  File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 104, in as_completed
    yield self.wait_for_result()
  File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 146, in wait_for_result
    completed_task = self._results_queue.get(block=True)
  File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/queue.py", line 170, in get
    self.not_empty.wait()
  File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
[15:15:24 123145413025792] 2 task completed in the last 0.20 second(s)
[15:15:24 123145413025792] 1 task completed in the last 0.10 second(s)
[15:15:24 123145413025792] Shutting down monitor...

查看示例目录,其中包含所有这些场景中 futureproofconcurrent.futures 的完整示例。只需运行 python examples/file.py 并在命令行中添加 futures 即可使用 concurrent.futures 运行示例。

替代方案

我绝不是第一个解决这个问题的人。以下是一些类似、更稳定且功能更全的替代方案,尽管它们的许可相对有限。

futureproof 使用 MIT 许可。

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定该选择哪个,请了解更多关于 安装包 的信息。

源分布

futureproof-0.3.1.tar.gz (17.6 kB 查看哈希值)

上传时间

构建分布

futureproof-0.3.1-py2.py3-none-any.whl (11.3 kB 查看哈希值)

上传时间 Python 2 Python 3

支持者

AWSAWS云计算和安全赞助商DatadogDatadog监控FastlyFastlyCDNGoogleGoogle下载分析MicrosoftMicrosoftPSF赞助商PingdomPingdom监控SentrySentry错误日志StatusPageStatusPage状态页面