跳转到主要内容

用于使用协程调度进程的GUI友好库。

项目描述

异步进程调度器

License: MIT PyPI PyPI - Python Version Build Status

当前状态:测试版。在v1.0.0之前可能会发生破坏性更改。

简介

异步进程调度器是一个小型Python库,它提供了一个简单、GUI友好的方式来高效运行多个进程,同时避免基于回调的数据流。

异步进程调度器与标准库中的multiprocessing兼容,以及类似的实现,如multiprocess

注意: 要在GUI程序中使用异步进程调度器,您需要一个实现兼容事件循环的库。例如,qasync可以与PyQt5一起使用。

安装

要安装异步进程调度器,请使用pip

pip install AsyncProcessScheduler

示例

详细示例在示例文件夹中,包括使用PyQt5的GUI示例。以下是一个代码片段,展示了一般的工作流程

from scheduler.Scheduler import Scheduler

def long_calculation(x: int, y: int) -> Tuple[int, int]:
    """Simulates a long calculation and returns two numbers."""
    time.sleep(5)
    return x, y

async def run() -> None:
    """Runs 16 processes with the scheduler and prints the results."""
    scheduler = Scheduler()

    # Create (x, y) inputs for 16 processes.
    num_processes = 16
    args = [(i, i+1) for i in range(num_processes)]

    # Run all processes and get an ordered list containing the results from each.
    results: List[Tuple] = await scheduler.map(target=long_calculation, args=args)

    # Do something with the results.
    print(results)

# Start the coroutine (blocking to prevent the program from exiting).
loop = asyncio.get_event_loop() 
loop.run_until_complete(run())

注意: GUI程序将使用asyncio.ensure_future(run())来启动协程,而不会阻塞。

快速指南

本指南解释了Async Process Scheduler的基本用法。

导入调度器

要在Python中使用Scheduler,请从scheduler导入。

from scheduler.Scheduler import Scheduler

创建调度器

可以带有或不带进度回调来创建调度器实例。进度回调是一个函数,它接受已完成的任务数和总任务数。

def on_progress(finished: int, total: int) -> None:
    print(f"{finished} of {total} tasks are complete.")

# Without progress callback.
scheduler = Scheduler()

# With progress callback.
scheduler = Scheduler(on_progress)

# With progress callback.
scheduler = Scheduler(progress_callback=on_progress)

进度回调在协程运行的线程上调用,可用于修改GUI。

:warning: 在v1.0.0之前,此功能可能发生变化。

使用调度器进行映射

使用调度器运行代码的最简单方法是将输入的可迭代对象映射到函数。

这允许您提供函数和包含函数输入的元组的列表。输出将在每个输入集的单独进程中计算。

在协程中进行映射

"""
Snippet which demonstrates mapping values over a function.
"""

def my_calculation(x: int, y: float, z: str) -> Tuple[int, float, str]:
    """Simulates a long calculation and returns the function parameters."""
    time.sleep(5)
    return x, y, z

scheduler = Scheduler()
args = [
    (1, 3.14, "test1"),  # Args for first process.
    (0, 2.10, "test2"),  # Args for second process.
    (5, 10.0, "test3"),  # Args for third process.
]

然后可以在协程中计算结果

results: List = await scheduler.map(target=my_calculation, args=args)

映射(阻塞)

results: List = scheduler.map_blocking(target=my_calculation, args=args)

向调度器添加任务

除了使用map()之外,还可以将任务单独添加到调度器中。添加所有任务后,可以使用run()启动调度器。

您可以将普通函数添加到调度器中。如果您从面向进程的代码迁移,您可能会发现将进程和队列添加到调度器更容易。

添加到调度器中的每个任务都将作为一个单独的进程运行。为了最大限度地提高效率,您应该添加比逻辑核心数更多的任务数。当前CPU的最佳进程数由静态方法Scheduler.optimal_process_count()返回。

为了简单起见,以下示例仅向调度器添加一个任务。

添加函数

add()可以用于将函数添加到调度器。与Process的构造函数类似。

"""
Snippet which demonstrates adding functions to a scheduler.
"""

def my_calculation(x: int, y: float, z: str) -> Tuple[int, float, str]:
    """Simulates a long calculation and returns the function parameters."""
    time.sleep(5)
    return x, y, z

scheduler = Scheduler()
args = (1, 3.14, "test",)

# Without named arguments.
scheduler.add(my_calculation, args)

# With named arguments.
scheduler.add(target=my_calculation, args=args)

内部,add()创建一个进程和队列,用于运行您的函数并获取结果。要使用除multiprocessing之外的类型,可以使用process_typequeue_type参数指定它们。

"""
Snippet which demonstrates adding a function to scheduler using Processes and Queues
from `multiprocess` instead of `multiprocessing`.
"""
from multiprocess import Process, Queue

scheduler = Scheduler()
scheduler.add(
    target=my_function,
    args=(1,2,3,),
    process_type=Process,
    queue_type=Queue
)

添加进程

add_process()可以用于向调度器添加进程和队列。

"""
Snippet which demonstrates adding processes and queues to a scheduler.
"""

def my_calculation(queue: Queue, x: int, y: int) -> None:
    """Function which will be run using a process."""
    time.sleep(5)

    # Important: put results in queue instead of returning them.
    queue.put((
        x, y
    ))

scheduler = Scheduler()

queue = Queue()
process = Process(target=my_calculation, args=(queue, 1, 2))

scheduler.add_process(process, queue)

:warning: 添加进程时,请确保传递给函数的队列实例与添加到调度器的队列相同。同时请注意,函数应将输出放入队列而不是返回它。

子任务

上述函数接受可选的subtasks参数。subtasks用于向调度器提示每个进程可能创建自己的进程;这将在调度进程时予以考虑。

例如,如果每个进程创建4个进程,则可以使用scheduler.add(target=my_process, args=my_args, subtasks=4)

运行调度器

当调度器运行时,它将运行所有任务,直到完成,然后返回一个包含每个任务输出的有序列表。

在协程中运行

results: List = await scheduler.run()

运行(阻塞)

results: List = scheduler.run_blocking()

终止调度器

要取消调度器,请使用terminate()

scheduler.terminate()

终止的调度器将始终返回一个空列表。

设计

当调度器启动时,它将同时运行多达Scheduler.optimal_process_count()返回值的进程数。当一个进程完成时,另一个进程将启动以替代它。

如果启用dynamic,调度器将定期检查CPU使用情况,并在CPU使用率低于阈值时增加并发进程数。

此图显示了Scheduler的实现

Image demonstrating the implementation of Scheduler.

开发者说明

项目打包

有关详细信息,请参阅文档

rm -r dist/
python setup.py sdist
python setup.py bdist_wheel
twine upload dist/*

项目详情


下载文件

下载您平台对应的文件。如果您不确定选择哪一个,请了解有关安装包的更多信息。

源分发

AsyncProcessScheduler-0.9.0b1.tar.gz (13.5 kB 查看散列值)

上传时间

构建分发

AsyncProcessScheduler-0.9.0b1-py3-none-any.whl (17.3 kB 查看散列值)

上传时间 Python 3

支持