跳转到主要内容

基于线程的异步调度器,用于从分布式代理收集结果。

项目描述

ate-dispatcher

Project License - MIT pypi version conda version download count Downloads PyPI status Unit tests codecov

概述

一个纯Python,基于线程的异步调度器,用于从对给定事件做出反应的分布式代理收集结果。

调度器会对一个带有特定主题的事件做出反应,该主题被传递给一组已注册它们所反应事件类型的Producer对象。然后每个结果都会传递给一组ResultListener对象,它们将根据是否注册了参与该主题来处理响应。

提供的API是线程安全、异步和无锁的,这使得它适用于轻量级和快速的任务。

依赖项

这个包是纯Python,因此它不依赖于任何外部库,除了用于导入在较旧Python 3版本中不可用的类型类的typing-extensions

安装

您可以使用conda或pip包管理器安装此库,如下所示

# Using conda
conda install ate-dispatcher -c conda-forge

# Using pip
pip install ate-dispatcher

本地开发

为了安装ate-dispatcher的本地开发版本,可以调用pip

pip install -e .

包使用

ate-dispatcher公开了两个抽象接口(ProducerResultListener),以及主要的调度器(ATEDispatcher)。前一个类被设计为可继承的,而最后一个类被设计为可实例化的。

定义生产者

一个Producer对象负责对来自调度器的标记为特定topic的输入消息产生响应,该主题是该对象可以处理的。它必须实现produce_dispatcher_output

import time
from typing import Any
from ate_dispatcher import Producer

# Defining a producer
class SpecificTopicProducer(Producer):
    def __init__(self, _id: int, timeout: int, *topics):
        super().__init__()
        self.id = _id
        self.timeout = timeout / 1000
        self.topics = set(topics)

    def produce_dispatcher_output(
            self, topic: str, *args, **kwargs) -> Any:
        time.sleep(timeout)
        return {
            'id': self.id,
            'some_key': topic,
            'args': args,
            'kwargs': kwargs
        }

定义结果监听器

一个 ResultListener 对象将接收对特定主题做出反应的 Producer 对象发出的响应。这也是调度器架构的终点,所有最终消息都将到达这里。ResultListener 子类必须实现 process_dispatcher_result 方法。

from typing import Any
from ate_dispatcher import ResultListener

# Defining a result listener
class ResultListenerExample(ResultListener):
    def __init__(self):
        super().__init__()
        self.responses = {}

    def clear(self):
        self.responses = {}

    def process_dispatcher_result(self, topic: str, response: Any):
        topic_responses = self.responses.get(topic, [])
        topic_responses.append(response)
        self.responses[topic] = topic_responses

创建、使用和销毁调度器

在定义了 ProducerResultListener 子类之后,需要实例化和注册 ATEDispatcher 实例。由于接口继承自 threading.Thread,需要通过 startend 方法跟踪它们的生命周期。

import time

# Import the producer and result listener classes
from specific_topic_producer import SpecificTopicProducer
from result_listener_example import ResultListenerExample

# Import the dispatcher
from ate_dispatcher import ATEDispatcher

# Create the dispatcher
dispatcher = ATEDispatcher()

# Start the dispatcher, the lifetime is delegated to the end developer.
dispatcher.start()

# Define the producers and register them against the dispatcher
producer1 = SpecificTopicProducer(0, 200, 'topic1', 'my_topic')
producer1.start()

producer2 = SpecificTopicProducer(1, 500, 'topic1', 'topic2', 'my_topic')
producer2.start()

for topic in producer1.topics:
    dispatcher.register_result_producer(producer1, topic)

for topic in producer2.topics:
    dispatcher.register_result_producer(producer2, topic)

# Define the result listeners and register them against the dispatcher
listener1 = ResultListenerExample()
listener1.start()

listener2 = ResultListenerExample()
listener2.start()

for topic in ['topic1', 'topic2', 'my_topic']:
    # The first listener will receive all responses tagged with all topics
    dispatcher.register_result_listener(listener1, topic)

for topic in ['topic1', 'my_topic']:
    # This listener will attend to certain topics.
    dispatcher.register_result_listener(listener2, topic)

由于调度器架构是完全异步的,触发消息可能指示给定主题上所有已注册生产者发出响应的最大超时时间(以毫秒为单位)。在指定超时后收到的任何响应都将被丢弃。此外,消息将按照到达顺序传递给结果监听器。

# Trigger a dispatcher request with a 4000ms timeout on the topic1
dispatcher.send_request('topic1', 3, 4, 5, ttl=4000, keyword='b')

# Wait for responses to arrive
time.sleep(1.0)

# Both listeners should have received the responses from both producers.
expected_responses = {
    'topic1': [
        {
            'id': 0
            'some_key': 'topic1',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        },
        {
            'id': 1
            'some_key': 'topic1',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        },
    ]
}

assert listener1.responses == expected_responses
assert listener2.responses == expected_responses

# Clear the listener responses
listener1.clear()
listener2.clear()

# Trigger a dispatcher request with a 300ms timeout limit on the topic my_topic
dispatcher.send_request('my_topic', 3, 4, 5, ttl=300, keyword='b')

# Wait for responses to arrive
time.sleep(0.5)

# Both listeners should have only received the response from producer1.
expected_responses = {
    'my_topic': [
        {
            'id': 0
            'some_key': 'my_topic',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        }
    ]
}

assert listener1.responses == expected_responses
assert listener2.responses == expected_responses


# Clear the listener1 responses
listener1.clear()

# Trigger a dispatcher request with a 1000ms timeout limit on the topic topic2
dispatcher.send_request('topic2', 3, 4, 5, ttl=1000, keyword='b')

# Wait for responses to arrive
time.sleep(1.0)

# Only the listener1 should have received the message produced by the producer2
expected_responses = {
    'topic2': [
        {
            'id': 1
            'some_key': 'topic2',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        }
    ]
}

assert listener1.responses == expected_responses

最后,可以随时从特定主题注销已注册的每个 ProducerResultListener 实例。然而,在停止任一 ProducerResultListener 实例之前,必须注销每个注册的主题。

# Deregister the listener2 and the producer1 from certain topics
dispatcher.deregister_result_listener(listener2, 'my_topic')
dispatcher.deregister_result_producer(producer1, 'topic1')

# Stopping the producer and result listener instances
for topic in producer1.topics:
    dispatcher.deregister_result_producer(producer1, topic)

for topic in producer2.topics:
    dispatcher.deregister_result_producer(producer2, topic)

producer1.stop()
producer2.stop()

for topic in ['topic1', 'topic2', 'my_topic']:
    dispatcher.deregister_result_listener(listener1, topic)

for topic in ['topic1', 'my_topic']:
    dispatcher.deregister_result_listener(listener2, topic)

# Stop the dispatcher
dispatcher.stop()

运行测试

我们使用 pytest 来运行测试,如下所示

pytest -x -v ate_dispatcher/tests

变更日志

访问我们的 变更日志 文件,了解更多关于我们的新功能和改进。

贡献指南

我们遵循 PEP8 和 PEP257。我们为此包中声明的所有函数和类使用 MyPy 类型注解。如果您有任何问题/疑问,请随时发送 PR 或创建问题。

项目详情


下载文件

下载适用于您的平台的文件。如果您不确定选择哪一个,请了解更多关于 安装包 的信息。

源分布

ate-dispatcher-0.1.0.tar.gz (10.9 kB 查看散列)

上传时间

构建分布

ate_dispatcher-0.1.0-py3-none-any.whl (8.9 kB 查看散列)

上传时间 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面