aiomisc的pytest集成
项目描述
aiomisc pytest插件
本包包含pytest的插件。
基本用法
简单用法示例
async def test_sample(event_loop):
f = event_loop.crete_future()
event_loop.call_soon(f.set_result, True)
assert await f
异步固定作用域示例
import asyncio
import pytest
@pytest.fixture
async def my_fixture(loop):
await asyncio.sleep(0)
# Requires python 3.6+
yield
如果你需要在测试之间保存异步固定作用域的实例,错误的方法只是更改固定作用域。但是为什么它不起作用呢?那是因为,在基本情况下,loop
固定作用域为每个测试创建一个新的事件循环实例,该实例将在测试拆卸后关闭。当你必须使用异步固定作用域时,任何调用asyncio.get_event_loop()
的调用者都将获得当前事件循环实例,该实例将被关闭,下一个测试将在另一个事件循环中运行。所以解决方案是使用所需的范围重新定义loop
固定作用域和具有所需范围的自定义固定作用域。
import asyncio
import pytest
from aiomisc import entrypoint
@pytest.fixture(scope='module')
def loop():
with entrypoint() as loop:
asyncio.set_event_loop(loop)
yield loop
@pytest.fixture(scope='module')
async def sample_fixture(loop):
yield 1
LOOP_ID = None
async def test_using_fixture(sample_fixture):
global LOOP_ID
LOOP_ID = id(asyncio.get_event_loop())
assert sample_fixture == 1
async def test_not_using_fixture(loop):
assert id(loop) == LOOP_ID
pytest标记
包包含一些对pytest有用的标记
catch_loop_exceptions
- 未捕获的事件循环异常将导致测试失败。forbid_get_event_loop
- 禁止在测试用例中调用asyncio.get_event_loop
。
import asyncio
import pytest
@pytest.mark.forbid_get_event_loop
async def test_with_get_loop():
def switch_context():
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_soon(future.set_result, True)
return future
with pytest.raises(Exception):
await switch_context()
# Test will be failed
@pytest.mark.catch_loop_exceptions
async def test_with_errors(loop):
async def fail():
# switch context
await asyncio.sleep(0)
raise Exception()
loop.create_task(fail())
await asyncio.sleep(0.1)
return
传递默认上下文
import pytest
@pytest.fixture
def default_context():
return {
'foo': 'bar',
'bar': 'foo',
}
测试服务
在测试模块中重新定义services
固定作用域
import aiomisc
import pytest
class SimpleServie(aiomisc.Service):
async def start(self) -> None:
pass
@pytest.fixture
def services():
return [SimpleServie()]
事件循环策略覆盖
import asyncio
import pytest
import tokio
import uvloop
policy_ids = ('uvloop', 'asyncio', 'tokio')
policies = (uvloop.EventLoopPolicy(),
asyncio.DefaultEventLoopPolicy(),
tokio.EventLoopPolicy())
@pytest.fixture(params=policies, ids=policy_ids)
def event_loop_policy(request):
return request.param
线程池覆盖
import pytest
from aiomisc.thread_pool import ThreadPoolExecutor
import concurrent.futures
thread_pool_ids = ('aiomisc pool', 'default pool')
thread_pool_implementation = (ThreadPoolExecutor,
concurrent.futures.ThreadPoolExecutor)
@pytest.fixture(params=thread_pool_implementation, ids=thread_pool_ids)
def thread_pool_executor(request):
return request.param
入口点参数
import pytest
@pytest.fixture
def entrypoint_kwargs() -> dict:
return dict(log_config=False)
aiohttp测试客户端
import pytest
from myapp.services.rest import REST
@pytest.fixture
def rest_port(aiomisc_unused_port_factory):
return aiomisc_unused_port_factory()
@pytest.fixture
def rest_service(rest_port):
return REST(port=rest_port)
@pytest.fixture
def services(rest_service):
return [rest_service]
@pytest.fixture
def api_client(api_service):
test_srv = TestServer(
app=rest_service.app,
port=arguments.port,
)
return TestClient(test_srv)
...
TCPProxy
简单TCP代理,用于模拟网络问题。作为固定作用域tcp_proxy
可用
示例
import asyncio
import time
import pytest
import aiomisc
class EchoServer(aiomisc.service.TCPServer):
async def handle_client(
self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter
):
chunk = await reader.read(65534)
while chunk:
writer.write(chunk)
chunk = await reader.read(65534)
writer.close()
await writer.wait_closed()
@pytest.fixture()
def server_port(aiomisc_unused_port_factory) -> int:
return aiomisc_unused_port_factory()
@pytest.fixture()
def services(server_port, localhost):
return [EchoServer(port=server_port, address=localhost)]
@pytest.fixture()
async def proxy(tcp_proxy, localhost, server_port):
async with tcp_proxy(localhost, server_port) as proxy:
yield proxy
async def test_proxy_client_close(proxy):
reader, writer = await proxy.create_client()
payload = b"Hello world"
writer.write(payload)
response = await asyncio.wait_for(reader.read(1024), timeout=1)
assert response == payload
assert not reader.at_eof()
await proxy.disconnect_all()
assert await asyncio.wait_for(reader.read(), timeout=1) == b""
assert reader.at_eof()
async def test_proxy_client_slow(proxy):
read_delay = 0.1
write_delay = 0.2
# Emulation of asymmetric and slow ISP
with proxy.slowdown(read_delay, write_delay):
reader, writer = await proxy.create_client()
payload = b"Hello world"
delta = -time.monotonic()
writer.write(payload)
await asyncio.wait_for(reader.read(1024), timeout=2)
delta += time.monotonic()
assert delta >= read_delay + write_delay
async def test_proxy_client_with_processor(proxy):
processed_request = b"Never say hello"
# Patching protocol functions
proxy.set_content_processors(
# Process data from client to server
lambda _: processed_request,
# Process data from server to client
lambda chunk: chunk[::-1],
)
reader, writer = await proxy.create_client()
writer.write(b'nevermind')
response = await reader.read(16)
assert response == processed_request[::-1]
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分布
aiomisc_pytest-1.2.1.tar.gz (11.6 kB 查看哈希值)
构建分布
aiomisc_pytest-1.2.1-py3-none-any.whl (10.6 kB 查看哈希值)
关闭
aiomisc_pytest-1.2.1.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e2658fefb4770a85fe5e1a13e816f142dc6920da5d99ea338d852c7a035f3325 |
|
MD5 | 9d0b1e9a6e6a00d0e0be26a5c448d347 |
|
BLAKE2b-256 | d58ee2bdb147941e4df8f3451679a0135968657b016c04025a6bb561aec53838 |
关闭
aiomisc_pytest-1.2.1-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 24a3802f0b794625a6bce0032f840c5e643e6f075c8a448297f1e92789c38736 |
|
MD5 | 9b559d477998f7182ff1c7f786274113 |
|
BLAKE2b-256 | 5fc4929aae141513874973722d80f3fb6017ecae3d361cbae4ed9589adb029a2 |