跳转到主要内容

Python的事件驱动中间件库

项目描述

.. 图像:: https://travis-ci.org/CloudVE/pyeventsystem.svg?branch=master :target: https://travis-ci.org/CloudVE/pyeventsystem :alt: 构建状态

.. 图像:: https://codecov.io/gh/CloudVE/pyeventsystem/branch/master/graph/badge.svg :target: https://codecov.io/gh/CloudVE/pyeventsystem :alt: 覆盖状态

pyeventsystem

pyeventsystem 是一个用于 Python 的事件驱动中间件库。除了提供订阅和监听事件的机制外,它还提供拦截函数的机制,这使得它非常适合编写中间件。通过拦截函数,中间件可以在原始函数之前、之后甚至替换原始函数。它还提供了将相关事件处理器分组到中间件类中的功能,这使得管理可安装的中间件变得更加容易。

简单示例

.. code-block:: python

from pyeventsystem.middleware import SimpleMiddlewareManager
from pyeventsystem.middleware import dispatch
from pyeventsystem.middleware import observe


class MyMiddleWare(object):

    def __init__(self, event_dispatcher):
        self.events = event_dispatcher

    @dispatch("a.series.of.unfortunate.events", priority=2500)
    def perform_villainy(self, name):
        return "Drop ACME Anvil on " + name

    @observe("a.series.of.unfortunate.events", priority=2400)
    def pre_log_villainy(self, event_args, name):
        print("Prepping for villainy: " + name)

    @observe("*.unfortunate.events", priority=2600)
    def post_log_villainy(self, event_args, name):
        print("Result of villainy: {0}".format(event_args['result']))


manager = SimpleMiddlewareManager()
myobj = MyMiddleWare(manager.events)
manager.add(myobj)
myobj.perform_villainy("RoadRunner")

在这个例子中,我们拦截了 perform_villainy 函数,并在执行前后观察了该函数。

输出如下

.. code-block:: console

Prepping for villainy: RoadRunner
Result of villainy: Drop ACME Anvil on RoadRunner

项目详情


下载文件

下载您平台对应的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

pyeventsystem-0.1.0.tar.gz (13.5 kB 查看哈希值)

上传时间

构建分布

pyeventsystem-0.1.0-py2.py3-none-any.whl (15.1 kB 查看哈希值)

上传时间 Python 2 Python 3

由支持