跳转到主要内容

aiomisc的pytest集成

项目描述

aiomisc pytest插件

本包包含pytest的插件。

基本用法

简单用法示例

async def test_sample(event_loop):
    f = event_loop.crete_future()
    event_loop.call_soon(f.set_result, True)
    assert await f

异步固定作用域示例

import asyncio
import pytest


@pytest.fixture
async def my_fixture(loop):
    await asyncio.sleep(0)

    # Requires python 3.6+
    yield

如果你需要在测试之间保存异步固定作用域的实例,错误的方法只是更改固定作用域。但是为什么它不起作用呢?那是因为,在基本情况下,loop固定作用域为每个测试创建一个新的事件循环实例,该实例将在测试拆卸后关闭。当你必须使用异步固定作用域时,任何调用asyncio.get_event_loop()的调用者都将获得当前事件循环实例,该实例将被关闭,下一个测试将在另一个事件循环中运行。所以解决方案是使用所需的范围重新定义loop固定作用域和具有所需范围的自定义固定作用域。

import asyncio
import pytest
from aiomisc import entrypoint


@pytest.fixture(scope='module')
def loop():
    with entrypoint() as loop:
        asyncio.set_event_loop(loop)
        yield loop


@pytest.fixture(scope='module')
async def sample_fixture(loop):
    yield 1


LOOP_ID = None


async def test_using_fixture(sample_fixture):
    global LOOP_ID
    LOOP_ID = id(asyncio.get_event_loop())
    assert sample_fixture == 1


async def test_not_using_fixture(loop):
    assert id(loop) == LOOP_ID

pytest标记

包包含一些对pytest有用的标记

  • catch_loop_exceptions - 未捕获的事件循环异常将导致测试失败。
  • forbid_get_event_loop - 禁止在测试用例中调用asyncio.get_event_loop
import asyncio
import pytest


@pytest.mark.forbid_get_event_loop
async def test_with_get_loop():
    def switch_context():
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        loop.call_soon(future.set_result, True)
        return future

    with pytest.raises(Exception):
        await switch_context()


# Test will be failed
@pytest.mark.catch_loop_exceptions
async def test_with_errors(loop):
    async def fail():
        # switch context
        await asyncio.sleep(0)
        raise Exception()

    loop.create_task(fail())
    await asyncio.sleep(0.1)
    return

传递默认上下文

import pytest

@pytest.fixture
def default_context():
    return {
        'foo': 'bar',
        'bar': 'foo',
    }

测试服务

在测试模块中重新定义services固定作用域

import aiomisc
import pytest


class SimpleServie(aiomisc.Service):
    async def start(self) -> None:
        pass

    
@pytest.fixture
def services():
    return [SimpleServie()]

事件循环策略覆盖

import asyncio
import pytest
import tokio
import uvloop

policy_ids = ('uvloop', 'asyncio', 'tokio')
policies = (uvloop.EventLoopPolicy(),
            asyncio.DefaultEventLoopPolicy(),
            tokio.EventLoopPolicy())

@pytest.fixture(params=policies, ids=policy_ids)
def event_loop_policy(request):
    return request.param

线程池覆盖

import pytest
from aiomisc.thread_pool import ThreadPoolExecutor
import concurrent.futures

thread_pool_ids = ('aiomisc pool', 'default pool')
thread_pool_implementation = (ThreadPoolExecutor,
                              concurrent.futures.ThreadPoolExecutor)


@pytest.fixture(params=thread_pool_implementation, ids=thread_pool_ids)
def thread_pool_executor(request):
    return request.param

入口点参数

import pytest

@pytest.fixture
def entrypoint_kwargs() -> dict:
    return dict(log_config=False)

aiohttp测试客户端

import pytest
from myapp.services.rest import REST


@pytest.fixture
def rest_port(aiomisc_unused_port_factory):
    return aiomisc_unused_port_factory()


@pytest.fixture
def rest_service(rest_port):
    return REST(port=rest_port)


@pytest.fixture
def services(rest_service):
    return [rest_service]


@pytest.fixture
def api_client(api_service):
    test_srv = TestServer(
        app=rest_service.app,
        port=arguments.port,
    )

    return TestClient(test_srv)

...

TCPProxy

简单TCP代理,用于模拟网络问题。作为固定作用域tcp_proxy可用

示例

import asyncio
import time

import pytest

import aiomisc


class EchoServer(aiomisc.service.TCPServer):
    async def handle_client(
            self, reader: asyncio.StreamReader,
            writer: asyncio.StreamWriter
    ):
        chunk = await reader.read(65534)
        while chunk:
            writer.write(chunk)
            chunk = await reader.read(65534)

        writer.close()
        await writer.wait_closed()


@pytest.fixture()
def server_port(aiomisc_unused_port_factory) -> int:
    return aiomisc_unused_port_factory()


@pytest.fixture()
def services(server_port, localhost):
    return [EchoServer(port=server_port, address=localhost)]


@pytest.fixture()
async def proxy(tcp_proxy, localhost, server_port):
    async with tcp_proxy(localhost, server_port) as proxy:
        yield proxy


async def test_proxy_client_close(proxy):
    reader, writer = await proxy.create_client()
    payload = b"Hello world"

    writer.write(payload)
    response = await asyncio.wait_for(reader.read(1024), timeout=1)

    assert response == payload

    assert not reader.at_eof()
    await proxy.disconnect_all()

    assert await asyncio.wait_for(reader.read(), timeout=1) == b""
    assert reader.at_eof()


async def test_proxy_client_slow(proxy):
    read_delay = 0.1
    write_delay = 0.2

    # Emulation of asymmetric and slow ISP
    with proxy.slowdown(read_delay, write_delay):
        reader, writer = await proxy.create_client()
        payload = b"Hello world"

        delta = -time.monotonic()

        writer.write(payload)
        await asyncio.wait_for(reader.read(1024), timeout=2)

        delta += time.monotonic()

        assert delta >= read_delay + write_delay


async def test_proxy_client_with_processor(proxy):
    processed_request = b"Never say hello"

    # Patching protocol functions
    proxy.set_content_processors(
        # Process data from client to server
        lambda _: processed_request,

        # Process data from server to client
        lambda chunk: chunk[::-1],
    )

    reader, writer = await proxy.create_client()
    writer.write(b'nevermind')

    response = await reader.read(16)

    assert response == processed_request[::-1]

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

aiomisc_pytest-1.2.1.tar.gz (11.6 kB 查看哈希值)

上传时间

构建分布

aiomisc_pytest-1.2.1-py3-none-any.whl (10.6 kB 查看哈希值)

上传时间 Python 3

支持