基于AsyncIO的服务编程
项目描述
AsyncIO 服务分支
文档: https://faust-streaming.github.io/mode/
源代码: https://github.com/faust-streaming/mode
为什么进行分支
我们决定对原始的 Mode 项目进行分支,因为存在一个关键的新版本发布过程,这导致社区中存在不确定性。所有人均可贡献于这个 分支,你也可以被添加为维护者。
我们希望
- 确保持续发布
- 代码质量
- 支持最新的 Python 版本
- 更新文档
等等...
什么是 Mode?
Mode 是一个基于 AsyncIO 构建的非常简洁的 Python 库,使得使用变得更加容易。
在 Mode 中,你的程序是由你可以启动、停止、重启和监督的服务构成的。
服务只是一个类
class PageViewCache(Service):
redis: Redis = None
async def on_start(self) -> None:
self.redis = connect_to_redis()
async def update(self, url: str, n: int = 1) -> int:
return await self.redis.incr(url, n)
async def get(self, url: str) -> int:
return await self.redis.get(url)
服务可以启动、停止和重启,并为这些动作提供回调。
它可以启动另一个服务
class App(Service):
page_view_cache: PageViewCache = None
async def on_start(self) -> None:
await self.add_runtime_dependency(self.page_view_cache)
@cached_property
def page_view_cache(self) -> PageViewCache:
return PageViewCache()
它可以包含后台任务
class PageViewCache(Service):
@Service.timer(1.0)
async def _update_cache(self) -> None:
self.data = await cache.get('key')
依赖于其他服务的服务实际上形成了一个你可以可视化的图。
工作进程
Mode 可选提供了一个工作进程,你可以使用它来启动程序,它支持日志记录、阻塞检测、远程调试等。
要启动工作进程,将以下代码添加到你的程序中
if __name__ == '__main__':
from mode import Worker
Worker(Service(), loglevel="info").execute_from_commandline()
然后执行你的程序以启动工作进程
$ python examples/tutorial.py
[2018-03-27 15:47:12,159: INFO]: [^Worker]: Starting...
[2018-03-27 15:47:12,160: INFO]: [^-AppService]: Starting...
[2018-03-27 15:47:12,160: INFO]: [^--Websockets]: Starting...
STARTING WEBSOCKET SERVER
[2018-03-27 15:47:12,161: INFO]: [^--UserCache]: Starting...
[2018-03-27 15:47:12,161: INFO]: [^--Webserver]: Starting...
[2018-03-27 15:47:12,164: INFO]: [^--Webserver]: Serving on port 8000
REMOVING EXPIRED USERS
REMOVING EXPIRED USERS
要停止它,按 Control-c
[2018-03-27 15:55:08,084: INFO]: [^Worker]: Stopping on signal received...
[2018-03-27 15:55:08,084: INFO]: [^Worker]: Stopping...
[2018-03-27 15:55:08,084: INFO]: [^-AppService]: Stopping...
[2018-03-27 15:55:08,084: INFO]: [^--UserCache]: Stopping...
REMOVING EXPIRED USERS
[2018-03-27 15:55:08,085: INFO]: [^Worker]: Gathering service tasks...
[2018-03-27 15:55:08,085: INFO]: [^--UserCache]: -Stopped!
[2018-03-27 15:55:08,085: INFO]: [^--Webserver]: Stopping...
[2018-03-27 15:55:08,085: INFO]: [^Worker]: Gathering all futures...
[2018-03-27 15:55:08,085: INFO]: [^--Webserver]: Closing server
[2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Waiting for server to close handle
[2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Shutting down web application
[2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Waiting for handler to shut down
[2018-03-27 15:55:08,086: INFO]: [^--Webserver]: Cleanup
[2018-03-27 15:55:08,086: INFO]: [^--Webserver]: -Stopped!
[2018-03-27 15:55:08,086: INFO]: [^--Websockets]: Stopping...
[2018-03-27 15:55:08,086: INFO]: [^--Websockets]: -Stopped!
[2018-03-27 15:55:08,087: INFO]: [^-AppService]: -Stopped!
[2018-03-27 15:55:08,087: INFO]: [^Worker]: -Stopped!
信标
我们传递给服务的 beacon
对象跟踪图中的服务。
它们不是严格必需的,但可以用来可视化正在运行的系统,例如,我们可以将其渲染为漂亮的图。
这需要你安装 pydot
库和 GraphViz
$ pip install pydot
让我们改变应用程序服务类,在启动时将图输出为图像
class AppService(Service):
async def on_start(self) -> None:
print('APP STARTING')
import pydot
import io
o = io.StringIO()
beacon = self.app.beacon.root or self.app.beacon
beacon.as_graph().to_dot(o)
graph, = pydot.graph_from_dot_data(o.getvalue())
print('WRITING GRAPH TO image.png')
with open('image.png', 'wb') as fh:
fh.write(graph.create_png())
创建服务
要定义一个服务,只需派生并填写在服务启动/停止等时执行操作的方法。
class MyService(Service):
async def on_start(self) -> None:
print('Im starting now')
async def on_started(self) -> None:
print('Im ready')
async def on_stop(self) -> None:
print('Im stopping now')
要启动服务,调用 await service.start()
await service.start()
或者,你可以使用 mode.Worker
(或其子类)从控制台启动基于服务的 asyncio 程序
if __name__ == '__main__':
import mode
worker = mode.Worker(
MyService(),
loglevel='INFO',
logfile=None,
daemon=False,
)
worker.execute_from_commandline()
这是一个图!
服务可以启动其他服务、协程和后台任务。
- 使用
add_dependency
启动其他服务
class MyService(Service):
def __post_init__(self) -> None:
self.add_dependency(OtherService(loop=self.loop))
- 使用
on_init_dependencies
启动一系列服务
class MyService(Service):
def on_init_dependencies(self) -> None:
return [
ServiceA(loop=self.loop),
ServiceB(loop=self.loop),
ServiceC(loop=self.loop),
]
- 启动一个 future/coroutine(在停止时将等待其完成)
class MyService(Service):
async def on_start(self) -> None:
self.add_future(self.my_coro())
async def my_coro(self) -> None:
print('Executing coroutine')
- 启动一个后台任务
class MyService(Service):
@Service.task
async def _my_coro(self) -> None:
print('Executing coroutine')
- 启动一个持续运行的后台任务
class MyService(Service):
@Service.task
async def _my_coro(self) -> None:
while not self.should_stop:
# NOTE: self.sleep will wait for one second, or
# until service stopped/crashed.
await self.sleep(1.0)
print('Background thread waking up')
安装
你可以通过 Python 包索引(PyPI)或从源代码安装 Mode。
使用 pip
安装
$ pip install -U mode-streaming
从源代码下载和安装: https://pypi.ac.cn/project/mode-streaming
你可以通过以下步骤安装它
$ tar xvfz mode-streaming-0.2.1.tar.gz
$ cd mode-0.2.1
$ python -m build .
# python install
如果你当前没有使用虚拟环境,则必须以特权用户身份执行最后一个命令。
使用开发版本
使用 pip
你可以使用以下 pip 命令安装 Mode 的最新快照
$ pip install mode-streaming
开发
指南和相关信息存储在 CONTRIBUTING.md
常见问题解答
我可以使用 Mode 与 Django/Flask 等?
是的!使用 gevent/eventlet 作为与 asyncio 集成的桥梁。
使用 gevent
这适用于任何可以与 gevent 一起工作的阻塞 Python 库。
使用 gevent 需要你安装 aiogevent
模块,你可以将 Mode 和此模块一起安装
$ pip install -U mode-streaming[gevent]
实际上要使用 gevent 作为事件循环,您必须在入口模块(通常是启动工作进程的地方)中执行以下操作,在其他任何第三方库导入之前:
#!/usr/bin/env python3
import mode.loop
mode.loop.use('gevent')
# execute program
请注意:这必须位于模块的顶部,以便在导入其他库之前执行。
使用 eventlet
它可以与任何可以使用 eventlet 工作的阻塞 Python 库一起使用。
使用 eventlet 需要您安装 aioeventlet
模块,并且您可以使用 Mode 将其作为捆绑包安装
$ pip install -U mode-streaming[eventlet]
实际上要使用 eventlet 作为事件循环,您必须在入口模块(通常是启动工作进程的地方)中执行以下操作,在其他任何第三方库导入之前:
#!/usr/bin/env python3
import mode.loop
mode.loop.use('eventlet')
# execute program
请注意:这非常重要,它必须位于模块的顶部,并且在导入库之前执行。
我可以与 Tornado 一起使用 Mode 吗?
是的!使用 tornado.platform.asyncio
桥接:[https://tornado.org.cn/en/stable/asyncio.html](https://tornado.org.cn/en/stable/asyncio.html)
我可以与 Twisted 一起使用 Mode 吗?
是的!使用 asyncio 反应器实现:[https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html](https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html)
您会支持 Python 3.5 或更早的版本吗?
目前没有计划支持 Python 3.5,但您欢迎为项目做出贡献。
以下是一些实现此目标所需的步骤
- 源代码转换,例如,将变量注释重写为代码
class Point:
x: int = 0
y: int = 0
必须重写为
class Point:
x = 0 # type: int
y = 0 # type: int
- 源代码转换,重写异步函数,例如,代码
async def foo():
await asyncio.sleep(1.0)
必须重写为
@coroutine
def foo():
yield from asyncio.sleep(1.0)
您会支持 Python 2 吗?
目前没有计划支持 Python 2,但您欢迎为项目做出贡献(上述问题中的详细信息也适用于 Python 2)。
在关闭时我收到很多警告,这是怎么回事?
如果在关闭时收到此类警告
Task was destroyed but it is pending!
task: <Task pending coro=<Service._execute_task() running at /opt/devel/mode/mode/services.py:643> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1100a7468>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<Service._execute_task() running at /opt/devel/mode/mode/services.py:643> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1100a72e8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<Service._execute_task() running at /opt/devel/mode/mode/services.py:643> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1100a7678>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<Event.wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/locks.py:269> cb=[_release_waiter(<Future pendi...1100a7468>()]>)() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:316]>
Task was destroyed but it is pending!
task: <Task pending coro=<Event.wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/locks.py:269> cb=[_release_waiter(<Future pendi...1100a7678>()]>)() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:316]>
通常意味着在进程退出之前忘记停止服务。
行为准则
所有与项目代码库、问题跟踪器、聊天室和邮件列表互动的人都有望遵守 Mode 行为准则。
作为这些项目的贡献者和维护者,为了培养一个开放和欢迎的社区,我们承诺尊重所有通过报告问题、发布功能请求、更新文档、提交拉取请求或补丁等方式做出贡献的人。
我们致力于让每个人都能在项目中参与,无论经验水平如何、性别、性别认同和表达、性取向、残疾、个人外貌、体型、种族、民族、年龄、宗教或国籍。
参与者不可接受的行为示例包括
- 使用性化语言或图像
- 个人攻击
- 捣乱或侮辱性评论
- 公开或私人骚扰
- 未经明确许可发布他人的个人信息,例如物理或电子地址
- 其他不道德或不专业的行为。
项目维护者有权也有责任删除、编辑或拒绝与该行为准则不一致的评论、提交、代码、维基编辑、问题和其他贡献。通过采用此行为准则,项目维护者承诺公平和一致地应用这些原则到管理此项目的各个方面。不遵守或执行行为准则的项目维护者可能会被永久从项目团队中移除。
此行为准则适用于项目空间以及个人代表项目或其社区时的公共空间。
可以通过打开问题或联系一个或多个项目维护者来报告滥用、骚扰或不可接受的行为。
此行为准则改编自贡献者承诺,版本 1.2.0,可在 [http://contributor-covenant.org/version/1/2/0/](http://contributor-covenant.org/version/1/2/0/) 获取。