跳转到主要内容

事件驱动数据管道

项目描述

Build PyPi Documentation

简介

eventkit的主要用例包括

  • 在松散耦合的组件之间发送事件;

  • 组合各种事件驱动的数据管道。

接口尽可能保持Pythonic,尽可能使用Python及其库中的熟悉名称。用于调度的是asyncio,并且与它的集成是无缝的。

查看示例和简介笔记本,以了解所有可能性的真实感受。

安装

pip3 install eventkit

需要Python 3.6或更高版本。

示例

创建一个事件并连接两个监听器

import eventkit as ev

def f(a, b):
    print(a * b)

def g(a, b):
    print(a / b)

event = ev.Event()
event += f
event += g
event.emit(10, 5)

创建一个简单的管道

import eventkit as ev

event = (
    ev.Sequence('abcde')
    .map(str.upper)
    .enumerate()
)

print(event.run())  # in Jupyter: await event.list()

输出

[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]

创建一个管道以获取运行平均值和标准差

import random
import eventkit as ev

source = ev.Range(1000).map(lambda i: random.gauss(0, 1))

event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()

print(event.last().run())  # in Jupyter: await event.last()

输出

[(0.00790957852672618, 1.0345673260655333)]

组合异步迭代器

import asyncio
import eventkit as ev

async def ait(r):
    for i in r:
        await asyncio.sleep(0.1)
        yield i

async def main():
    async for t in ev.Zip(ait('XYZ'), ait('123')):
        print(t)

asyncio.get_event_loop().run_until_complete(main())  # in Jupyter: await main()

输出

('X', '1')
('Y', '2')
('Z', '3')

实时视频分析管道

self.video = VideoStream(conf.CAM_ID)
scene = self.video | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, persons in lastScene:
    ...

完整源代码

分布式计算

distex库提供了poolmap扩展方法,可以将多个核心或机器用于分布式计算

from distex import Pool
import eventkit as ev
import bz2

pool = Pool()
# await pool  # un-comment in Jupyter
data = [b'A' * 1000000] * 1000

pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()

print(pipe.run())  # in Jupyter: print(await pipe)
pool.shutdown()

灵感来源于

文档

完整的API文档

项目详情


下载文件

下载适用于您平台文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分发

eventkit-1.0.3.tar.gz (28.3 kB 查看哈希值)

上传

构建分发

eventkit-1.0.3-py3-none-any.whl (31.8 kB 查看哈希值)

上传 Python 3

由以下支持