跳转到主要内容

多线程迭代工作流程的库。

项目描述

Build Status

Quenouille

Python的多线程迭代工作流程库。

它通常用于迭代延迟流,同时不溢出内存,同时尊重分组并行约束和节流,例如,在从网络上并发下载大量URL时。

它主要用于minet Python库和CLI工具,为其下载器和爬虫提供动力。

安装

您可以使用以下命令使用pip安装quenouille

pip install quenouille

用法

imap, imap_unordered

函数懒加载可迭代对象,并以多线程方式应用所需的函数,对生成的项目进行操作。

此函数还能够遵守有关并行性和节流的分组约束:例如,如果您需要以多线程方式下载URL并确保您不会同时击中同一域名超过两次,则可以通过为该函数提供适当的设置来确保所需的行为。

同样,如果需要使用此函数的节流确保在两次击中同一域名之间等待一定时间,这也是一样的。

最后请注意,此函数有两种形式:imap_unordered,它将在项目完成后立即输出处理过的项目,而imap则将在输入相同的顺序中输出项目,但性能略差,内存占用增加,这取决于在生成下一个项目之前已处理的项目的数量。

import csv
from quenouille import imap, imap_unordered

# Reading urls lazily from a CSV file using a generator:
def urls():
  with open('urls.csv') as f:
    reader = csv.DictReader(f)

    for line in reader:
      yield line['url']

# Defining some functions
def fetch(url):
  # ... e.g. use urllib3 to download the url
  # remember this function must be threadsafe
  return html

def get_domain_name(url):
  # ... e.g. use ural to extract domain name from url
  return domain_name

# Performing 10 requests at a time:
for html in imap(urls(), fetch, 10):
  print(html)

# Ouputting results as soon as possible (in arbitrary order)
for html in imap_unordered(urls(), fetch, 10):
  print(html)

# Ensuring we don't hit the same domain more that twice at a time
for html in imap(urls(), fetch, 10, key=get_domain_name, parallelism=2):
  print(html)

# Waiting 5 seconds between each request on a same domain
for html in imap(urls(), fetch, 10, key=get_domain_name, throttle=5):
  print(html)

# Throttle time depending on domain
def throttle(group, item, result):
  if group == 'lemonde.fr':
    return 10

  return 2

for html in imap(urls(), fetch, 10, key=get_domain_name, throttle=throttle):
  print(html)

# Only load 10 urls into memory when attempting to find next suitable job
for html in imap(urls(), fetch, 10, key=get_domain_name, throttle=5, buffer_size=10):
  print(html)

参数

  • iterable (iterable):任何Python可迭代对象。
  • func (可调用对象):执行所需任务的功能。该函数接受给定可迭代对象产生的单个项目作为参数。请注意,由于此函数将在工作线程中分发,因此您应确保它是线程安全的。
  • threads (整数,可选):要使用的最大线程数。默认为 min(32, os.cpu_count() + 1)。请注意,它可以设置为 0,在这种情况下,将不会使用线程,所有操作都将同步执行(这可能在调试或避免代码重复时很有用)。
  • key (可调用对象,可选):返回给定项目应属于哪个“组”的功能。这将用于确保最大并行性得到尊重。
  • parallelism (整数或可调用对象,可选) [1]:一次允许在同一组上工作的线程数。也可以是一个函数,它接受一个组并返回其并行性。
  • buffer_size (整数,可选) [1024]:在尝试找到可以立即传递给工作线程的项目时,函数将缓冲到内存中的最大项目数,同时尊重节流和组并行性。
  • throttle (整数或浮点数或可调用对象,可选):可选的节流时间,以秒为单位,在处理给定组的下一个项目之前等待。也可以是一个函数,它接受最后一个组、项目和结果,并返回此组的下一个节流时间。
  • block (布尔值,可选) [False]:在使用给定队列的 get 方法时是否阻塞。
  • panic (可调用对象,可选):当进程出现错误时将调用的函数。在特定情况下,这有助于取消阻塞一些函数并避免死锁。请注意,当同步时(即不使用额外线程时),此函数不会被调用。
  • initializer (可调用对象,可选):在每个线程工作器开始运行时运行的函数。可用于设置 线程局部数据,例如。请记住,此函数必须是线程安全的,并且不应阻塞,因为线程池将在每个线程正确启动后才能继续进行。如果函数调用之一失败,线程池将引发 quenouille.exceptions.BrokenThreadPool 错误并立即终止。
  • initargs (可迭代对象,可选):传递给 initializer 函数的参数。
  • wait (布尔值,可选) [True]:在关闭执行器时是否加入工作线程,即等待它们结束。如果您需要在清理执行器资源时快速继续,则将此设置为 False。请注意,如果您在此之后快速连续启动其他线程密集型任务或其他执行器,您可能会一次启动太多线程。
  • daemonic (布尔值,可选) [False]:是否生成守护进程工作器。如果您的工作者是守护进程,则解释器在退出时不会等待它们结束。这可以与 wait=False 结合使用,例如,如果您希望程序在按下 Ctrl+C 时立即退出(您可能想避免这种情况,因为线程将在退出时被突然关闭,并且可能需要清理)。

使用队列而不是可迭代对象

如果您需要根据执行任务的结果添加新的项目以进行处理(例如,在为网络爬虫设计时,每个下载的页面将产生新的页面以进一步探索),请注意,imapimap_unordered 函数也接受队列作为输入。

from queue import Queue
from quenouille import imap

job_queue = Queue()
job_queue.put(1)

# Enqueuing new jobs in the worker
def worker(i):
  if i < 3:
    job_queue.put(i + 1)

  return i * 2

list(imap(job_queue, worker, 2))
>>> [2, 4, 6]

# Enqueuing new jobs while iterating over results
job_queue = Queue()
job_queue.put(1)

results = []

for i in imap(job_queue, worker, 2):
  if i < 5:
    job_queue.put((i / 2) + 1)

  results.append(i)

results
>>> [2, 4, 6]

请注意,imap 只会运行到队列完全排空为止。因此,如果您之后决定添加更多项目,这不再是该函数的关注点。

ThreadPoolExecutor

如果您需要在保持相同的线程池运行的同时,连续多次运行 imapimap_unordered,您也可以直接使用 quenouille 的 ThreadPoolExecutor,如下所示:

from quenouille import ThreadPoolExecutor

# Using it as a context manager:
with ThreadPoolExecutor(max_workers=4) as executor:
  for i in executor.imap(range(10), worker):
    print(i)

  for j in executor.imap_unordered(range(10), worker):
    print(j)

# Or if you prefer shutting down the executor explicitly:
executor = ThreadPoolExecutor()
executor.imap(range(10), worker)
executor.shutdown(wait=False)

请注意,您的节流状态在多个 imapimap_unordered 调用之间保持不变,这样您就不会过早地执行某些任务。但请注意,此状态与您提供的 key 函数相关联,以确保一致性;如果您更改使用的 key,节流状态将重置。

参数

  • max_workers (int, 可选):要使用的最大线程数。默认为 min(32, os.cpu_count() + 1)。请注意,它可以是 0,在这种情况下,不会使用任何线程,所有操作将以同步方式执行(这可以用于调试或避免有时重复代码,这可能很有用)。
  • initializer (可调用对象,可选):在每个线程工作器开始运行时运行的函数。可用于设置 线程局部数据,例如。请记住,此函数必须是线程安全的,并且不应阻塞,因为线程池将在每个线程正确启动后才能继续进行。如果函数调用之一失败,线程池将引发 quenouille.exceptions.BrokenThreadPool 错误并立即终止。
  • initargs (可迭代对象,可选):传递给 initializer 函数的参数。
  • wait (bool, 可选) [True]:当关闭执行器时,是否加入工作线程,即等待它们结束。如果您需要在清理执行器的资源时快速继续,而不等待工作线程结束,请将此设置为 False。请注意,如果之后快速连续启动其他线程密集型任务或其他执行器,您可能会一次性启动太多线程。
  • daemonic (布尔值,可选) [False]:是否生成守护进程工作器。如果您的工作者是守护进程,则解释器在退出时不会等待它们结束。这可以与 wait=False 结合使用,例如,如果您希望程序在按下 Ctrl+C 时立即退出(您可能想避免这种情况,因为线程将在退出时被突然关闭,并且可能需要清理)。

#.imap, #.imap_unordered

基本上与此处描述的相同,具有以下参数:

  • iterable (iterable):任何Python可迭代对象。
  • func (可调用对象):执行所需任务的功能。该函数接受给定可迭代对象产生的单个项目作为参数。请注意,由于此函数将在工作线程中分发,因此您应确保它是线程安全的。
  • key (可调用对象,可选):返回给定项目应属于哪个“组”的功能。这将用于确保最大并行性得到尊重。
  • parallelism (整数或可调用对象,可选) [1]:一次允许在同一组上工作的线程数。也可以是一个函数,它接受一个组并返回其并行性。
  • buffer_size (整数,可选) [1024]:在尝试找到可以立即传递给工作线程的项目时,函数将缓冲到内存中的最大项目数,同时尊重节流和组并行性。
  • throttle (整数或浮点数或可调用对象,可选):可选的节流时间,以秒为单位,在处理给定组的下一个项目之前等待。也可以是一个函数,它接受最后一个组、项目和结果,并返回此组的下一个节流时间。
  • block (布尔值,可选) [False]:在使用给定队列的 get 方法时是否阻塞。
  • panic (可调用对象,可选):当进程出现错误时将调用的函数。在特定情况下,这有助于取消阻塞一些函数并避免死锁。请注意,当同步时(即不使用额外线程时),此函数不会被调用。

#.shutdown

用于显式关闭执行器的方法。

参数

  • wait (布尔值,可选) [True]:在关闭执行器时是否加入工作线程,即等待它们结束。如果您需要在清理执行器资源时快速继续,则将此设置为 False。请注意,如果您在此之后快速连续启动其他线程密集型任务或其他执行器,您可能会一次启动太多线程。

NamedLocks

一个弱引用字典的锁,对于基于键的任务线程安全非常有用,例如,如果您需要确保两个线程不会同时向同一文件写入。

from quenouille import NamedLocks

locks = NamedLocks()

def worker(filename):
  with locks[filename]:
    with open(filename, 'a+') as f:
      f.write('hello\n')

杂项注释

None组

对于 imap 函数,将 None 组(如果您的 key 函数返回 None,则可能会发生这种情况)视为特殊,并且将始终认为附加的项目可以立即处理,而不受并行性约束或节流。

如果没有 key,则认为所有项目都属于 None 组。

并行性 > 工作者

如果您将 parallelism 设置为 10,并且只为执行任务分配了 5 个线程,那么应该很明显,实际并行性永远不会达到 10

quenouille 不会提醒您这一点,因为这可能方便您不必过多考虑,并且影响不大,但请记住这一点。

可调用并行性保证

如果您决定传递一个函数来为每个组声明自定义并行性,而不是全局固定并行性,请了解该函数将在 quenouille 必须就下一个要入队的项目做出决策时被调用,这样我们就不需要使用任何内存来记录信息。

这意味着您应该确保给定的函数是幂等的,并且对于给定的组始终返回相同的并行性,这样您就不会冒死锁的风险。

并行性 vs. 节流

如果某个组正在节流,很明显 quenouille 不会同时为该组执行多个任务,因此其 parallelism 实际上为 1,尽管有其他设置。

这并不意味着某些组的 parallelism 和其他组的 throttle 是不可能的。这可以通过可调用项实现。

向节流添加熵

您可以将可调用参数 throttle 理解为“从该组启动下一个作业之前应等待的最小时间”。这意味着,如果您需要向节流时间添加熵,您确实可以像这样使此函数与随机性一起工作。

from random import random

# Waiting 5 + (between 0 and 2) seconds
def throttle(group, item, result):
  return 5 + (2 * random())

有关异常抛出的注意事项

延迟生成器使用异常死锁

如果您在创建它的地方以外的其他地方消费由 imap/imap_unordered 返回的生成器,您可能会在抛出异常时遇到死锁。

当使用 daemonic=True 时,这并不重要,但您可能会因为超出我控制之外的 Python 原因而在退出时遇到段错误。

# Safe
for item in imap(...):
  raise RuntimeError

# Not safe
it = imap(...)

for item in it:
  raise RuntimeError

如果您真的想这样做,因为您不想使用 ThreadPoolExecutor 上下文管理器,您可以尝试使用实验性的 excepthook 参数。

# Probably safe
it = imap(..., excepthook=True)

for item in it:
  raise RuntimeError

使用imap与队列的注意事项

典型死锁

即使 imap 可以处理输入队列,你也应该避免使自己陷入在不希望发生死锁的情况下添加到队列可能会阻塞执行的情况。如果你的队列有 maxsize,这可能会很容易出错。

from queue import Queue
from quenouille import imap

job_queue = Queue(maxsize=2)
job_queue.put(1)

for i in imap(job_queue, worker):
  if i < 2:
    job_queue.put(2)
    job_queue.put(3)

    # This will put you in a deadlock because this will block
    # because of the queue `maxsize` set to 2
    job_queue.put(4)

  print(i)

设计选择

为了使你能够在循环体中添加项到队列,并且能够安全地检测队列是否已清空而不会出现竞态条件,quenouille 只在循环体中的执行完成后才确认任务已完成。

这意味着有时只从工作函数而不是从循环体中添加项到队列可能会更高效。

queue.task_done

目前,quenouille 不会为你调用 queue.task_done,所以如果你想在以后调用 queue.join,这仍然是你的责任。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

quenouille-1.9.1.tar.gz (22.0 kB 查看哈希值)

上传时间

构建分布

quenouille-1.9.1-py3-none-any.whl (19.2 kB 查看哈希值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面