跳转到主要内容

aiomisc-dependency - aiomisc中的依赖注入

项目描述

aiomisc构建的依赖注入插件,使用aiodine库,并支持pytest fixture样式依赖注入。

安装

从pypi安装

pip3 install aiomisc aiomisc-dependency

如何使用

注册依赖

要注册依赖,可以使用aiomisc_dependency.dependency装饰器。

from aiomisc_dependency import dependency

@dependency
async def pg_engine():
    pg_engine = await create_engine(dsn=pg_url)
    yield pg_engine
    pg_engine.close()
    await pg_engine.wait_closed()

如您所见,依赖可以是异步生成器函数。yield后面的代码将在teardown时执行,以正确关闭依赖。

协程函数、非异步函数和生成器也受支持。

使用依赖

要使用依赖,需要将其名称添加到每个依赖服务的__dependencies__属性中。指定的依赖将在entrypoint启动时注入为服务的属性。如果需要将依赖映射到不同的名称,则使用__dependencies_map__

from contextlib import suppress
from types import MappingProxyType

import aiohttp
from aiomisc.service.aiohttp import AIOHTTPService

class HealthcheckService(AIOHTTPService):

    __dependencies__ = ('pg_engine',)

    async def create_application(self):
        app = aiohttp.web.Application()
        app.add_routes([aiohttp.web.get('/ping', self.healthcheck_handler)])
        return app

    async def healthcheck_handler(self, request):
        pg_status = False
        with suppress(Exception):
           async with self.pg_engine.acquire() as conn:
               await conn.execute('SELECT 1')
               pg_status = True

        return aiohttp.web.json_response(
            {'db': pg_status},
            status=(200 if pg_status else 500),
        )


class RESTService(AIOHTTPService):

    __dependencies__ = ('pg_engine',)

    ...

class AnotherRESTService(AIOHTTPService):

    __dependencies_map__ = MappingProxyType({'pg_engine': 'engine'})

    ...

如果在entrypoint启动时找不到任何必需的依赖,将引发RuntimeError

您可以在创建服务时通过将其添加到 kw 参数中来手动设置依赖项。这在测试中可能很有用。

from unittest import Mock

def test_rest_service():
    pg_engine_mock = Mock()
    service = RESTService(pg_engine=pg_engine_mock)
    ...

依赖的依赖

您可以将依赖项用作其他依赖项的参数。参数将自动注入。

@dependency
async def pg_connection(pg_engine):
    async with pg_engine.acquire() as conn:
        yield conn

loop内置依赖

如果您的依赖项需要事件循环实例,则可以使用内置的 循环 依赖项。

import aioredis

@dependency
async def redis_pool(loop):
    pool = aioredis.create_pool(redis_url, loop=loop)
    yield pool
    pool.close()
    await pool.wait_closed()

LICENSE

MIT

项目详情


下载文件

下载适用于您平台文件的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

aiomisc_dependency-0.1.20.tar.gz (7.5 kB 查看哈希值)

上传时间

构建分布

aiomisc_dependency-0.1.20-py3-none-any.whl (5.8 kB 查看哈希值)

上传时间 Python 3

由以下支持