防护并发的concurrent.futures
项目描述
Futureproof - 防护并发的concurrent.futures
concurrent.futures
非常棒,但它有一些锋利的边缘,过去我多次被它咬到。
Futureproof是对其的一个薄包装,解决了这些问题中的一些,并增加了一些可用性功能。
特性
- 监控:默认记录最近完成的任务的摘要。
- 快速失败:错误会导致主线程抛出异常并默认停止。
- 错误策略:用户可以决定是否在任务上抛出、记录或完全忽略错误。
- 背压控制:随着执行器完成任务,大量任务被懒加载消费,大大减少了内存消耗并提高了这些情况下的响应性。
当前状态:Alpha
API可能会更改,任何更改都将记录在变更日志中。
Futureproof被设计用来包装ThreadPoolExecutor,然而版本0.2+包括对ProcessPoolExecutor的有限支持,但仅限于Python3.7+。
concurrent.futures问题?
让我们看看ThreadPoolExecutor的典范示例。
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain-that-definitely-does-not-exist.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
再次强调,这真是太棒了,多线程的入门门槛这么低,确实是Brian Quinlan和核心Python开发者们出色工作的证明。
然而,我看到了这个方法中的两个问题
- 模板代码。我们需要进入上下文管理器,手动调用
submit
并跟踪future及其参数,调用as_completed
(它实际上返回一个迭代器),在future上调用result
并记得处理异常。 - 这很令人惊讶。为什么我们需要获取结果才能抛出异常?如果我们不期望它抛出异常呢?我们可能想尽快知道。
如果你运行这段代码,你将得到以下输出(在撰写本文时)
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.foxnews.com/' page is 248838 bytes
'http://www.bbc.co.uk/' page is 338658 bytes
'http://www.cnn.com/' page is 991167 bytes
'http://europe.wsj.com/' page is 970346 bytes
这是完美的。futureproof又是如何比较的呢?
executor = futureproof.FutureProofExecutor(max_workers=5)
with futureproof.TaskManager(executor) as tm:
for url in URLS:
tm.submit(load_url, url, 60)
for task in tm.as_completed():
print("%r page is %d bytes" % (task.args[0], len(task.result)))
看起来非常相似,有一个执行器和一个任务管理器。submit
和as_completed
是它的方法,并且没有try..except
。如果我们运行它,我们会得到
'http://www.foxnews.com/' page is 248838 bytes
Traceback (most recent call last):
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1317, in do_open
encode_chunked=req.has_header('Transfer-encoding'))
... omitted traceback output ...
socket.gaierror: [Errno 8] nodename nor servname provided, or not known
注意futureproof
立即抛出了异常,并且一切停止了,正如你在普通的非线程Python中预期的那样,没有惊喜。
如果我们更喜欢futureproof
,它提供了使用错误策略记录或忽略异常的选项。比如说我们想要记录异常
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s %(thread)s] %(message)s",
datefmt="%H:%M:%S",
)
executor = futureproof.FutureProofExecutor(max_workers=5)
with futureproof.TaskManager(executor, error_policy="log") as tm:
for url in URLS:
tm.submit(load_url, url, 60)
for task in tm.as_completed():
if not isinstance(task.result, Exception):
print("%r page is %d bytes" % (task.args[0], len(task.result)))
注意我们添加了一个检查,只有在它不是异常的情况下才打印结果,这输出了
'http://www.foxnews.com/' page is 251088 bytes
[12:09:15 4350641600] Task Task(fn=<function load_url at 0x1029ef1e0>, args=('http://some-made-up-domain-that-definitely-does-not-exist.com/', 60), kwargs={}, result=URLError(gaierror(8, 'nodename nor servname provided, or not known')),
complete=True) raised an exception
Traceback (most recent call last):
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1317, in do_open
encode_chunked=req.has_header('Transfer-encoding'))
... omitted long traceback ...
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/urllib/request.py", line 1319, in do_open
raise URLError(err)
urllib.error.URLError: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.bbc.co.uk/' page is 339087 bytes
'http://www.cnn.com/' page is 991167 bytes
[12:09:16 123145404444672] 5 task completed in the last 1.18 second(s)
'http://europe.wsj.com/' page is 970880 bytes
注意我们只需要配置日志和传递适当的错误策略,其他的一切都由我们代劳。你也可以选择完全忽略异常并自己管理它们,通过访问result
,这是使用concurrent.futures
时的工作流程。
as_completed
?
如果你想过,为什么我们需要as_completed
?
答案是用于监控和错误处理。
如果我们有成千上万的URL,你不想等待所有URL返回后才显示输出,这可能会花费很长时间。但这实际上只是增加了代码的复杂性。如果你不使用as_completed
,示例会是什么样子呢?
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future, url in future_to_url.items():
try:
data = future.result()
except Exception as exc:
print("%r generated an exception: %s" % (url, exc))
else:
print("%r page is %d bytes" % (url, len(data)))
这可以说是更易读的,然而,有一个细微的区别:没有输出,直到所有的future都完成。如果你想象任务会花费更长的时间,你可能会怀疑事情是否真的在进行中。
让我们比较一下futureproof
版本
executor = futureproof.FutureProofExecutor(max_workers=5)
with futureproof.TaskManager(executor, error_policy="ignore") as tm:
for url in URLS:
tm.submit(load_url, url, 60)
for task in tm.completed_tasks:
if isinstance(task.result, Exception):
print("%r generated an exception: %s" % (task.args[0], task.result))
else:
print("%r page is %d bytes" % (task.args[0], len(task.result)))
[12:40:28 123145393414144] Starting executor monitor
[12:40:29 123145393414144] 5 task completed in the last 1.01 second(s)
[12:40:29 123145393414144] Shutting down monitor...
'http://www.foxnews.com/' page is 252016 bytes
'http://some-made-up-domain-that-definitely-does-not-exist.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.cnn.com/' page is 992648 bytes
'http://www.bbc.co.uk/' page is 338987 bytes
'http://europe.wsj.com/' page is 969285 bytes
futureproof
默认将任务的监控信息记录在日志中,这样你总能知道事情是否在进行中。注意任务管理器暴露了completed_tasks
,这使得可以轻松访问结果,而无需手动跟踪future。最后,正如之前提到的,你还可以完全控制异常处理,因此你也不需要添加相关代码。
这并不是什么大问题...
确实,这些问题相当小,我们可以通过使用concurrent.futures
手动解决。但随着工作量的增加,一个问题会悄悄出现。
在底层,concurrent.futures
使用队列来存储工作,包括函数及其参数。它一开始就会这样做,对于所有的工作,这意味着在大量工作的情况下,队列会变得非常大,主线程可能会挂起并变得无响应。
一个简单的例子
def custom_sum(a, b):
time.sleep(0.1)
return a + b
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
fn = partial(custom_sum, b=1)
ex.map(fn, range(1_000_000_000))
运行这个简单的函数一亿次会导致CPU达到最大值,内存使用量将大幅增加。再次强调,这一切都没有任何日志或输出,而且,更糟糕的是,键盘中断不会立即退出,你将不得不反复按它,迫使线程在不干净的状态下退出。
让我们比较一下futureproof
# same function as before
ex = futureproof.ThreadPoolExecutor(max_workers=2)
with futureproof.TaskManager(
ex, error_policy=futureproof.ErrorPolicyEnum.RAISE
) as tm:
fn = partial(custom_sum, b=1)
tm.map(fn, range(1_000_000_000))
几乎立即,你会看到这样的日志:
[15:15:21 4632346048] Starting backpressure test with 1,000,000,000 tasks
[15:15:21 4632346048] You may KeyboardInterrupt at any point and the executor will stop almost immediately
[15:15:21 123145413025792] Starting executor monitor
[15:15:22 123145413025792] 2 task completed in the last 0.20 second(s)
[15:15:22 123145413025792] 1 task completed in the last 0.10 second(s)
[15:15:22 123145413025792] 2 task completed in the last 0.21 second(s)
...
在任何时候,单个键盘中断都会停止进程。
[15:15:24 123145413025792] 2 task completed in the last 0.20 second(s)
^CTraceback (most recent call last):
File "examples/backpressure.py", line 64, in <module>
with_futureproof()
File "examples/backpressure.py", line 40, in with_futureproof
tm.map(fn, range(1_000_000_000))
File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 65, in __exit__
self.run()
File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 93, in run
for _ in self.as_completed():
File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 104, in as_completed
yield self.wait_for_result()
File "/Users/yeray/code/personal/futureproof/src/futureproof/task_manager.py", line 146, in wait_for_result
completed_task = self._results_queue.get(block=True)
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/queue.py", line 170, in get
self.not_empty.wait()
File "/Users/yeray/.pyenv/versions/3.7.3/lib/python3.7/threading.py", line 296, in wait
waiter.acquire()
KeyboardInterrupt
[15:15:24 123145413025792] 2 task completed in the last 0.20 second(s)
[15:15:24 123145413025792] 1 task completed in the last 0.10 second(s)
[15:15:24 123145413025792] Shutting down monitor...
查看示例目录,其中包含所有这些场景中 futureproof
和 concurrent.futures
的完整示例。只需运行 python examples/file.py
并在命令行中添加 futures
即可使用 concurrent.futures
运行示例。
替代方案
我绝不是第一个解决这个问题的人。以下是一些类似、更稳定且功能更全的替代方案,尽管它们的许可相对有限。
- Pebble,LGPL 3.0
- more-executors,GPL 3.0
futureproof
使用 MIT 许可。
项目详情
下载文件
下载适用于您的平台的文件。如果您不确定该选择哪个,请了解更多关于 安装包 的信息。
源分布
构建分布
futureproof-0.3.1.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 274365159bfd8b7aa39677abfa7b16c8253a771d72d3c3ee9de4a03c8d6d2d9b |
|
MD5 | a3fc4ecf2ded4c457cb86f5c8757ff05 |
|
BLAKE2b-256 | 41061ce98cf280d84b598e8b56b33ecef846ee5dec4c6e6bac353d0049a33379 |
futureproof-0.3.1-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 849d664cc201c39eb4503e1aa35d81b437e89e99cf34c3960ed106d7bbf6f154 |
|
MD5 | 212046859411df8348f5c3c53ab27ccb |
|
BLAKE2b-256 | 152c8996c6d29ec09de4815143476ad3df22dd878c1d937b74d7d78e8b1a2935 |