跳转到主要内容

使用async def __ainit__编写类

项目描述

Coveralls License Build status Wheel Latest version

async-class

添加了编写具有可等待初始化函数的类的功能。

使用示例

import asyncio
from async_class import AsyncClass, AsyncObject, task, link


class MyAsyncClass(AsyncClass):
     async def __ainit__(self):
          # Do async staff here
          pass


class MainClass(AsyncObject):
     async def __ainit__(self):
          # Do async staff here
          pass

     async def __adel__(self):
          """ This method will be called when object will be closed """
          pass


class RelatedClass(AsyncObject):
     async def __ainit__(self, parent: MainClass):
          link(self, parent)


async def main():
     instance = await MyAsyncClass()
     print(instance)

     main_instance = await MainClass()
     related_instance = await RelatedClass(main_instance)

     assert not main_instance.is_closed
     assert not related_instance.is_closed

     await main_instance.close()
     assert main_instance.is_closed

     # will be closed because linked to closed main_instance
     assert related_instance.is_closed

asyncio.run(main())

文档

如果没有运行任何事件循环,可能会创建异步对象。 self.loop 属性是延迟评估的。

模块为编写异步代码提供了有用的抽象。

AsyncClass 继承的对象可能有自己的 __init__ 方法,但不建议这样做。

AsyncClass

这是一个没有 TaskStore 实例和如 self.create_taskself.create_future 等额外方法的基类包装器。

这个类仅仅解决了初始化问题

import asyncio
from async_class import AsyncClass


class MyAsyncClass(AsyncClass):
   async def __ainit__(self):
       future = self.loop.create_future()
       self.loop.call_soon(future.set_result, True)
       await future


async def main():
   instance = await MyAsyncClass()
   print(instance)

asyncio.run(main())

AsyncObject

这是一个带有任务存储实例和简单任务管理辅助器的基类。

import asyncio
from async_class import AsyncObject


class MyClass(AsyncObject):
   async def __ainit__(self):
       self.task = self.create_task(asyncio.sleep(3600))


async def main():
   obj = await MyClass()

   assert not obj.task.done()

   await obj.close()

   assert obj.task.done()


asyncio.run(main())

TaskStore

TaskStore 是一个任务管理辅助器。一个实例具有 create_task()create_future() 方法,并且所有创建的实体都将在通过 TaskStoreclose() 方法关闭时被销毁。

此外,任务存储可能会创建一个与self的链接副本,当父实例关闭时将关闭。

import asyncio
from async_class import TaskStore


async def main():
   store = TaskStore(asyncio.get_event_loop())

   task1 = store.create_task(asyncio.sleep(3600))

   child_store = store.get_child()
   task2 = child_store.create_task(asyncio.sleep(3600))

   await store.close()

   assert task1.done() and task2.done()


asyncio.run(main())

项目详情


下载文件

下载您平台对应的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

async-class-0.5.0.tar.gz (5.4 kB 查看哈希值)

上传时间

构建分布

async_class-0.5.0-py2-none-any.whl (5.3 kB 查看哈希值)

上传时间 Python 2

由...