从多个迭代器中产生值,一旦可用
项目描述
interleave 包提供了一个同名的函数,它接受多个迭代器,在单独的线程中运行它们,并一旦每个迭代器产生值就产生这些值。
安装
interleave 需要 Python 3.7 或更高版本。只需使用 Python 3 的 pip 安装 interleave 和其依赖项即可。
python3 -m pip install interleave
示例
>>> from time import sleep, strftime >>> from interleave import interleave >>> >>> def sleeper(idno, delays): ... for i, d in enumerate(delays): ... sleep(d) ... yield (idno, i) ... >>> with interleave( ... [ ... sleeper(0, [0, 1, 2]), ... sleeper(1, [2, 2, 2]), ... sleeper(2, [5, 2, 1]), ... ] ... ) as it: ... for x in it: ... print(strftime("%H:%M:%S"), x) ... 22:08:39 (0, 0) 22:08:40 (0, 1) 22:08:41 (1, 0) 22:08:42 (0, 2) 22:08:43 (1, 1) 22:08:44 (2, 0) 22:08:45 (1, 2) 22:08:46 (2, 1) 22:08:47 (2, 2)
API
interleave.interleave(
iterators: Iterable[Iterator[T]],
*,
max_workers: Optional[int] = None,
thread_name_prefix: str = "",
queue_size: Optional[int] = None,
onerror: interleave.OnError = interleave.STOP,
) -> interleave.Interleaver[T]
interleave() 在单独的线程中运行给定的迭代器,并返回一个迭代器,该迭代器在它们可用时产生它们产生的值。(有关 Interleaver 类的详细信息,请参阅以下内容。)
max_workers 和 thread_name_prefix 参数将被传递到底层的 concurrent.futures.ThreadPoolExecutor(参见)。max_workers 决定了同时运行的最大迭代器数量。
queue_size 参数设置内部使用的队列的最大大小,该队列用于传输迭代器产生的值;当队列满时,任何要产生值的迭代器将阻塞等待调用交错器的 __next__ 以出队下一个值。当 queue_size 为 None(默认值)时,interleave() 使用一个没有最大大小的 queue.SimpleQueue。当 queue_size 不是 None(包括零,表示没有最大大小)时,interleave() 使用一个 queue.Queue,其 get() 方法在 Windows 上不可中断(包括通过 KeyboardInterrupt)。
onerror 参数是一个枚举值,用于确定在其中一个迭代器抛出异常时,interleave() 应该如何行为。可能的值有
- STOP
(默认) 停止遍历所有迭代器,取消任何尚未启动的待处理迭代器,等待所有运行的线程完成,并重新抛出异常。请注意,由于无法在产生之间停止迭代器,"等待" 步骤涉及在每个当前运行的迭代器产生其下一个值之前停止。这可能导致死锁,如果在此期间队列已满。
- DRAIN
与 STOP 相似,但在迭代器完成之前产生的任何剩余值将在抛出异常之前由交错器产生。
- FINISH_ALL
继续正常运行,并在所有迭代器完成时重新抛出异常。
- FINISH_CURRENT
与 FINISH_ALL 相似,但只有当前运行的迭代器将运行到完成;在异常被引发时,尚未启动线程的任何迭代器将取消其作业。
无论 onerror 的值如何,任何在初始异常之后引发的后续异常都将被丢弃。
class Interleaver(Generic[T]):
def __init__(
self,
max_workers: Optional[int] = None,
thread_name_prefix: str = "",
queue_size: Optional[int] = None,
onerror: OnError = STOP,
)
一个迭代器和上下文管理器。作为一个迭代器,它按生成顺序产生传递给相应 interleave() 调用的迭代器生成的值。作为一个上下文管理器,它进入时返回自身,退出时通过调用 shutdown(wait=True) 方法(见下文)清理任何未完成的线程。
交错器可以通过调用 interleave() 或直接调用构造函数来实例化。构造函数接受与 interleave() 相同的参数,但不包括 iterators,并生成一个新的尚未运行任何迭代器的 Interleaver。通过 submit() 方法将迭代器提交给一个新的 Interleaver;一旦提交了所有所需的迭代器,必须调用 finalize() 方法,以便 Interleaver 能够知道何时完成。
交织器会关闭其ThreadPoolExecutor,并在返回最终值(特别是在调用__next__/get()导致抛出StopIteration或其他异常时)后等待线程完成。如果在迭代完成前放弃交织器,相关的资源可能无法正确清理,线程可能会无限期地继续运行。因此,强烈建议您将交织器上的任何迭代都包裹在上下文管理器中,以处理迭代提前结束的情况(包括来自KeyboardInterrupt的)。
除了迭代器和上下文管理器API之外,交织器还有以下公共方法
Interleaver.submit(it: Iterator[T]) -> None
0.2.0版本中新加入
将迭代器添加到交织器中。
如果交织器是从interleave()返回的或者已经对其调用过finalize(),调用submit()将导致ValueError。
Interleave.finalize() -> None
0.2.0版本中新加入
通知交织器所有迭代器都已注册。必须调用此方法,以便交织器能够检测迭代结束;如果未调用此方法并且所有提交的迭代器都已完成并检索了它们的值,则后续对next(it)的调用将无限期地挂起。
Interleaver.get(block: bool = True, timeout: Optional[float] = None) -> T
0.2.0版本中新加入
获取迭代器生成的下一个值。如果所有迭代器都已完成并且所有值都已检索,则抛出interleaver.EndOfInputError。如果block为False并且没有立即可用的值,则抛出queue.Empty。如果block为True,则等待最多timeout秒(如果timeout为None则无限期等待)以获取下一个值或所有迭代器结束;如果在超时之前没有发生任何操作,则抛出queue.Empty。
it.get(block=True, timeout=None)与next(it)等效,但后者将EndOfInputError转换为StopIteration。
注意:当onerror=STOP并设置超时时,如果迭代器抛出异常,超时可能会超过,因为交织器正在等待所有剩余的线程关闭。
Interleaver.shutdown(wait: bool = True) -> None
如果尚未调用,调用finalize(),告诉所有正在运行的迭代器停止迭代,取消任何尚未开始的待处理迭代器,并关闭ThreadPoolExecutor。将wait参数传递给对ThreadPoolExecutor.shutdown()的调用。
在调用shutdown()之后,交织器可以继续迭代,并将在迭代器完全停止之前产生的任何剩余值产出。
项目详细信息
下载文件
下载适用于您的平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源分布
构建分发
interleave-0.2.1.tar.gz 的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 210969c607828ecafc5e425532a6b6c10d09f1343e5976238ca15e8a07fb5118 |
|
MD5 | 45a4a68aeccac12d1d3e9cbc00560c9e |
|
BLAKE2b-256 | 8c801c45cd6dcd72c0669c3e11c74fbb1459cea5aa3d80960ffee47b66d1640d |
interleave-0.2.1-py3-none-any.whl 的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 3345cedac12b8663e787cb2e0a80a3c1e450387b66b152ed32b951bb7c3d9e7e |
|
MD5 | 5ed254fec93bda034f571d9ab94855de |
|
BLAKE2b-256 | f8ff0d393596b592e911e57f9970faa799c54d70eed3b92c2064f4a19c94445f |