跳转到主要内容

不同步 asyncio

项目描述

不同步

使用环境事件循环或执行在单独的线程或进程中,不同步 asyncio

快速概述

标记为 @different 装饰器的函数将按照以下方式之一运行

  • async 函数将在从 different.thread 执行的 different.loop 事件循环中运行
  • 常规函数将在 different.thread_executor 中执行,这是一个 ThreadPoolExecutor
    • 适用于不支持 asyncio 的 I/O 限制工作
  • 标记为 @different(cpu_bound=True) 的常规函数将在 different.process_executor 中执行,这是一个 ProcessPoolExecutor
    • 适用于 CPU 限制工作

所有 @different 函数都将返回一个 Unfuture 对象。这种新的未来类型结合了 asyncio.Futureconcurrent.Future 的行为,并进行了以下更改

  • Unfuture.set_resultasyncio.Future 不同,是线程安全的
  • Unfuture 实例可以被等待,即使它们是从 concurrent.Future 制作的
  • Unfuture.result() 是一个阻塞操作 除了different.loop/different.thread 中,它表现得像 asyncio.Future.result,如果未来尚未完成,将抛出异常

示例

简单的睡眠

使用 asyncio 的简单睡眠示例

async def sync_async():
    await asyncio.sleep(1)
    return 'I hate event loops'


async def main():
    future1 = asyncio.create_task(sync_async())
    future2 = asyncio.create_task(sync_async())

    await future1, future2

    print(future1.result() + future2.result())

asyncio.run(main())
# Takes 1 second to run

使用 different 的相同示例

@unsync
async def unsync_async():
    await asyncio.sleep(1)
    return 'I like decorators'

unfuture1 = unsync_async()
unfuture2 = unsync_async()
print(unfuture1.result() + unfuture2.result())
# Takes 1 second to run

多线程 I/O 限制函数

可以通过在 concurrent.ThreadPoolExecutor 中执行它们来使同步函数异步运行。这可以通过标记常规函数 @different 来轻松实现。

@unsync
def non_async_function(seconds):
    time.sleep(seconds)
    return 'Run concurrently!'

start = time.time()
tasks = [non_async_function(0.1) for _ in range(10)]
print([task.result() for task in tasks])
print('Executed in {} seconds'.format(time.time() - start))

这将打印

['Run concurrently!', 'Run concurrently!', ...]
Executed in 0.10807514190673828 seconds

延续

使用 Unfuture.then 连接异步调用,并返回一个包装了源和延续的 Unfuture。延续使用源 Unfuture 作为第一个参数调用。延续可以是常规函数(将同步执行),或 @different 函数。

@unsync
async def initiate(request):
    await asyncio.sleep(0.1)
    return request + 1

@unsync
async def process(task):
    await asyncio.sleep(0.1)
    return task.result() * 2

start = time.time()
print(initiate(3).then(process).result())
print('Executed in {} seconds'.format(time.time() - start))

这将打印

8
Executed in 0.20314741134643555 seconds

混合方法

我们将首先将常规同步函数转换为线程 Unfuture,这将开始我们的请求。

@unsync
def non_async_function(num):
    time.sleep(0.1)
    return num, num + 1

我们可能想在另一个函数中改进结果,因此我们定义以下延续。

@unsync
async def result_continuation(task):
    await asyncio.sleep(0.1)
    num, res = task.result()
    return num, res * 2

然后,我们使用异步函数将所有结果聚合到一个单独的字典中。

@unsync
async def result_processor(tasks):
    output = {}
    for task in tasks:
        num, res = await task
        output[num] = res
    return output

执行完整的链 non_async_functionresult_continuationresult_processor 将如下所示

start = time.time()
print(result_processor([non_async_function(i).then(result_continuation) for i in range(10)]).result())
print('Executed in {} seconds'.format(time.time() - start))

这将打印

{0: 2, 1: 4, 2: 6, 3: 8, 4: 10, 5: 12, 6: 14, 7: 16, 8: 18, 9: 20}
Executed in 0.22115683555603027 seconds

保留类型

据我们所知,无法通过装饰器更改方法或函数的返回类型。因此,我们需要一个变通方法来正确使用IntelliSense。通常有三个选项

  1. 忽略类型警告。

  2. 在达到类型警告的地方使用抑制语句。

    A. 当通过将返回类型更改为 Unfuture 来定义未同步的方法时。

    B. 当使用未同步的方法时。

  3. 不使用装饰器包装函数。示例

    def function_name(x: str) -> Unfuture[str]:
        async_method = unsync(__function_name_synced)
        return async_method(x)
    
    def __function_name_synced(x: str) -> str:
        return x + 'a'
    
    future_result = function_name('b')
    self.assertEqual('ba', future_result.result())
    

自定义事件循环

为了使用自定义事件循环,在调用任何 @unsync 方法之前,请确保已设置事件循环策略。例如,要使用 uvloop,只需

import unsync
import uvloop

@unsync
async def main():
    # Main entry-point.
    ...

uvloop.install() # Equivalent to asyncio.set_event_loop_policy(EventLoopPolicy())
main()

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

unsync-1.4.0.tar.gz (8.8 kB 查看散列)

上传时间

由支持