使用async def __ainit__编写类
项目描述
async-class
添加了编写具有可等待初始化函数的类的功能。
使用示例
import asyncio
from async_class import AsyncClass, AsyncObject, task, link
class MyAsyncClass(AsyncClass):
async def __ainit__(self):
# Do async staff here
pass
class MainClass(AsyncObject):
async def __ainit__(self):
# Do async staff here
pass
async def __adel__(self):
""" This method will be called when object will be closed """
pass
class RelatedClass(AsyncObject):
async def __ainit__(self, parent: MainClass):
link(self, parent)
async def main():
instance = await MyAsyncClass()
print(instance)
main_instance = await MainClass()
related_instance = await RelatedClass(main_instance)
assert not main_instance.is_closed
assert not related_instance.is_closed
await main_instance.close()
assert main_instance.is_closed
# will be closed because linked to closed main_instance
assert related_instance.is_closed
asyncio.run(main())
文档
如果没有运行任何事件循环,可能会创建异步对象。 self.loop 属性是延迟评估的。
模块为编写异步代码提供了有用的抽象。
从 AsyncClass 继承的对象可能有自己的 __init__ 方法,但不建议这样做。
类 AsyncClass
这是一个没有 TaskStore 实例和如 self.create_task 和 self.create_future 等额外方法的基类包装器。
这个类仅仅解决了初始化问题
import asyncio
from async_class import AsyncClass
class MyAsyncClass(AsyncClass):
async def __ainit__(self):
future = self.loop.create_future()
self.loop.call_soon(future.set_result, True)
await future
async def main():
instance = await MyAsyncClass()
print(instance)
asyncio.run(main())
类 AsyncObject
这是一个带有任务存储实例和简单任务管理辅助器的基类。
import asyncio
from async_class import AsyncObject
class MyClass(AsyncObject):
async def __ainit__(self):
self.task = self.create_task(asyncio.sleep(3600))
async def main():
obj = await MyClass()
assert not obj.task.done()
await obj.close()
assert obj.task.done()
asyncio.run(main())
类 TaskStore
TaskStore 是一个任务管理辅助器。一个实例具有 create_task() 和 create_future() 方法,并且所有创建的实体都将在通过 TaskStore 的 close() 方法关闭时被销毁。
此外,任务存储可能会创建一个与self的链接副本,当父实例关闭时将关闭。
import asyncio
from async_class import TaskStore
async def main():
store = TaskStore(asyncio.get_event_loop())
task1 = store.create_task(asyncio.sleep(3600))
child_store = store.get_child()
task2 = child_store.create_task(asyncio.sleep(3600))
await store.close()
assert task1.done() and task2.done()
asyncio.run(main())
项目详情
下载文件
下载您平台对应的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。
源分布
async-class-0.5.0.tar.gz (5.4 kB 查看哈希值)
构建分布
async_class-0.5.0-py2-none-any.whl (5.3 kB 查看哈希值)
关闭
async-class-0.5.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e2c02a67ea40b606bc281b8cac016c0b194aa7f48274c43bf5cbe385bcdaf39c |
|
MD5 | d6f8e17118fed35aa362e20a46251aef |
|
BLAKE2b-256 | 8843cdae41799d82b0d766f784c2b14d08d53f9739aecc481aeaea5394c3232b |
关闭
async_class-0.5.0-py2-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | b11390fcf465fd07c2247668f1698e2e4699be2c4d18991413705efef21fadc7 |
|
MD5 | 6a452f622890b9a3414bc73765ceed59 |
|
BLAKE2b-256 | d57f5882dbcc97deb53de472551dcdf930438fe1ae78961a99704c53cc565c2d |