aio框架的Pubsub系统
项目描述
aio.signals
为aio asyncio框架的Pubsub系统
构建状态
安装
需要python >= 3.4
使用以下命令安装
pip install aio.signals
快速入门
listen函数是同步调用的,但如果它不是一个协程,则回调监听器将被作为一个协程调用
回调监听器接收一个信号对象,它包含信号名称和发出信号的对象
emit函数是一个协程
将以下代码添加到文件my_signals.py中
import asyncio
from aio.signals import Signals
def listener(signal):
yield from asyncio.sleep(1)
print(signal.data)
signals = Signals()
signals.listen("my-signal", listener)
loop = asyncio.get_event_loop()
loop.run_until_complete(
signals.emit("my-signal", 'BOOM!'))
运行方式
python my_signals.py
aio.signals使用方法
使用
>>> import asyncio >>> import aio.testing >>> from aio.signals import Signals
处理程序接收一个信号对象
signal.name是信号的名称
signal.data包含发出的对象
>>> @asyncio.coroutine ... def callback(signal): ... print("%s received with %s" % (signal.name, signal.data))
>>> @aio.testing.run_until_complete ... def run_test(_signals, name, message): ... yield from _signals.emit(name, message)
>>> signals = Signals() >>> signals.listen("test-signal", callback)
>>> run_test(signals, "test-signal", 'BOOM!') test-signal received with BOOM!
如果处理程序不是一个协程,则将其包装在一个协程中
>>> def callback(signal): ... yield from asyncio.sleep(1) ... print("%s received with %s" % (signal.name, signal.data))
>>> signals = Signals() >>> signals.listen("test-signal-2", callback)
>>> run_test(signals, "test-signal-2", 'BOOM AGAIN!') test-signal-2 received with BOOM AGAIN!