aio asyncio 框架的 HTTP 服务器
项目描述
aio.http.server
aio asyncio 框架的 HTTP 服务器
构建状态
安装
需要 python >= 3.4
安装方式
pip install aio.http.server
快速入门 - Hello world HTTP 服务器
创建一个输出“你好”的 Web 服务器
将以下内容保存到文件“hello.conf”中
[server/my_server]
factory = aio.http.server.factory
port = 8080
protocol = my_example.protocol
将以下内容保存到文件名为 my_example.py 的文件中
import asyncio
import aiohttp
import aio.app
@aio.app.server.protocol
def protocol(name):
loop = asyncio.get_event_loop()
webapp = aiohttp.web.Application(loop=loop)
@asyncio.coroutine
def handle_hello_world(webapp):
return aiohttp.web.Response(body=b"Hello, world")
webapp.router.add_route("GET", "/", handle_hello_world)
return webapp.make_handler()
使用 aio run 命令运行
aio run -c hello.conf
aio.http.server 使用方法
配置
使用 aio.http.server.factory 工厂创建服务器配置,并抑制正常输出
>>> config = """ ... [aio] ... log_level = ERROR ... ... [server/test] ... factory: aio.http.server.factory ... port: 7070 ... """
运行 HTTP 服务器
默认情况下,如果没有设置路由,HTTP 服务器将响应 404
>>> import asyncio >>> import aiohttp >>> from aio.app.runner import runner >>> import aio.testing
>>> @aio.testing.run_forever(sleep=1) ... def run_http_server(): ... runner(['run'], config_string=config) ... ... def call_http_server(): ... result = yield from ( ... yield from aiohttp.request( ... "GET", "https://127.0.0.1:7070")).read() ... print(result) ... ... return call_http_server
>>> run_http_server() b'404: Not Found'
服务器对象可通过 aio.app.servers[{name}] 变量访问
>>> import aio.app
>>> aio.app.servers['test'] <Server sockets=[<socket.socket...laddr=('0.0.0.0', 7070)...]>
清除应用
>>> aio.app.clear()
使用自定义协议运行服务器
如果在“server/”配置中指定了协议,HTTP 服务器将使用该函数作为协议工厂
函数应该是一个协程,并使用服务器名称调用
>>> config_with_protocol = """ ... [aio] ... log_level = ERROR ... ... [server/test] ... factory = aio.http.server.factory ... protocol = aio.http.server.tests._example_http_protocol ... port = 7070 ... """
我们需要用 aio.app.server.protocol 装饰器装饰协议
>>> @aio.app.server.protocol ... def http_protocol(name): ... loop = asyncio.get_event_loop() ... http_app = aiohttp.web.Application(loop=loop) ... http_app['name'] = name ... ... @asyncio.coroutine ... def handle_hello_world(http_app): ... return aiohttp.web.Response(body=b"Hello, world") ... ... http_app.router.add_route("GET", "/", handle_hello_world) ... return http_app.make_handler()
>>> aio.http.server.tests._example_http_protocol = http_protocol
>>> @aio.testing.run_forever(sleep=1) ... def run_http_server(): ... runner(['run'], config_string=config_with_protocol) ... ... def call_http_server(): ... result = yield from ( ... yield from aiohttp.request( ... "GET", "https://127.0.0.1:7070")).read() ... ... print(result) ... ... return call_http_server
>>> run_http_server() b'Hello, world'