不同步 asyncio
项目描述
不同步
使用环境事件循环或执行在单独的线程或进程中,不同步 asyncio
。
快速概述
标记为 @different
装饰器的函数将按照以下方式之一运行
async
函数将在从different.thread
执行的different.loop
事件循环中运行- 常规函数将在
different.thread_executor
中执行,这是一个ThreadPoolExecutor
- 适用于不支持
asyncio
的 I/O 限制工作
- 适用于不支持
- 标记为
@different(cpu_bound=True)
的常规函数将在different.process_executor
中执行,这是一个ProcessPoolExecutor
- 适用于 CPU 限制工作
所有 @different
函数都将返回一个 Unfuture
对象。这种新的未来类型结合了 asyncio.Future
和 concurrent.Future
的行为,并进行了以下更改
Unfuture.set_result
与asyncio.Future
不同,是线程安全的Unfuture
实例可以被等待,即使它们是从concurrent.Future
制作的Unfuture.result()
是一个阻塞操作 除了 在different.loop
/different.thread
中,它表现得像asyncio.Future.result
,如果未来尚未完成,将抛出异常
示例
简单的睡眠
使用 asyncio
的简单睡眠示例
async def sync_async():
await asyncio.sleep(1)
return 'I hate event loops'
async def main():
future1 = asyncio.create_task(sync_async())
future2 = asyncio.create_task(sync_async())
await future1, future2
print(future1.result() + future2.result())
asyncio.run(main())
# Takes 1 second to run
使用 different
的相同示例
@unsync
async def unsync_async():
await asyncio.sleep(1)
return 'I like decorators'
unfuture1 = unsync_async()
unfuture2 = unsync_async()
print(unfuture1.result() + unfuture2.result())
# Takes 1 second to run
多线程 I/O 限制函数
可以通过在 concurrent.ThreadPoolExecutor
中执行它们来使同步函数异步运行。这可以通过标记常规函数 @different
来轻松实现。
@unsync
def non_async_function(seconds):
time.sleep(seconds)
return 'Run concurrently!'
start = time.time()
tasks = [non_async_function(0.1) for _ in range(10)]
print([task.result() for task in tasks])
print('Executed in {} seconds'.format(time.time() - start))
这将打印
['Run concurrently!', 'Run concurrently!', ...]
Executed in 0.10807514190673828 seconds
延续
使用 Unfuture.then
连接异步调用,并返回一个包装了源和延续的 Unfuture
。延续使用源 Unfuture
作为第一个参数调用。延续可以是常规函数(将同步执行),或 @different
函数。
@unsync
async def initiate(request):
await asyncio.sleep(0.1)
return request + 1
@unsync
async def process(task):
await asyncio.sleep(0.1)
return task.result() * 2
start = time.time()
print(initiate(3).then(process).result())
print('Executed in {} seconds'.format(time.time() - start))
这将打印
8
Executed in 0.20314741134643555 seconds
混合方法
我们将首先将常规同步函数转换为线程 Unfuture
,这将开始我们的请求。
@unsync
def non_async_function(num):
time.sleep(0.1)
return num, num + 1
我们可能想在另一个函数中改进结果,因此我们定义以下延续。
@unsync
async def result_continuation(task):
await asyncio.sleep(0.1)
num, res = task.result()
return num, res * 2
然后,我们使用异步函数将所有结果聚合到一个单独的字典中。
@unsync
async def result_processor(tasks):
output = {}
for task in tasks:
num, res = await task
output[num] = res
return output
执行完整的链 non_async_function
→result_continuation
→result_processor
将如下所示
start = time.time()
print(result_processor([non_async_function(i).then(result_continuation) for i in range(10)]).result())
print('Executed in {} seconds'.format(time.time() - start))
这将打印
{0: 2, 1: 4, 2: 6, 3: 8, 4: 10, 5: 12, 6: 14, 7: 16, 8: 18, 9: 20}
Executed in 0.22115683555603027 seconds
保留类型
据我们所知,无法通过装饰器更改方法或函数的返回类型。因此,我们需要一个变通方法来正确使用IntelliSense。通常有三个选项
-
忽略类型警告。
-
在达到类型警告的地方使用抑制语句。
A. 当通过将返回类型更改为
Unfuture
来定义未同步的方法时。B. 当使用未同步的方法时。
-
不使用装饰器包装函数。示例
def function_name(x: str) -> Unfuture[str]: async_method = unsync(__function_name_synced) return async_method(x) def __function_name_synced(x: str) -> str: return x + 'a' future_result = function_name('b') self.assertEqual('ba', future_result.result())
自定义事件循环
为了使用自定义事件循环,在调用任何 @unsync
方法之前,请确保已设置事件循环策略。例如,要使用 uvloop
,只需
import unsync
import uvloop
@unsync
async def main():
# Main entry-point.
...
uvloop.install() # Equivalent to asyncio.set_event_loop_policy(EventLoopPolicy())
main()
项目详情
unsync-1.4.0.tar.gz 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | a29e0f8952ffb0b3a0453ce436819a5a1ba2febbb5caa707c319f6f98d35f3c5 |
|
MD5 | 6b2119ba4a7e2795dbb544e0cd00705b |
|
BLAKE2b-256 | ea6b8d9699d37fe8c43c76fda4a7f93ae6aef129576f75c3cacfc9a78a7cc984 |