基于线程的异步调度器,用于从分布式代理收集结果。
项目描述
ate-dispatcher
概述
一个纯Python,基于线程的异步调度器,用于从对给定事件做出反应的分布式代理收集结果。
调度器会对一个带有特定主题的事件做出反应,该主题被传递给一组已注册它们所反应事件类型的Producer
对象。然后每个结果都会传递给一组ResultListener
对象,它们将根据是否注册了参与该主题来处理响应。
提供的API是线程安全、异步和无锁的,这使得它适用于轻量级和快速的任务。
依赖项
这个包是纯Python,因此它不依赖于任何外部库,除了用于导入在较旧Python 3版本中不可用的类型类的typing-extensions
。
安装
您可以使用conda或pip包管理器安装此库,如下所示
# Using conda
conda install ate-dispatcher -c conda-forge
# Using pip
pip install ate-dispatcher
本地开发
为了安装ate-dispatcher
的本地开发版本,可以调用pip
pip install -e .
包使用
ate-dispatcher
公开了两个抽象接口(Producer
和ResultListener
),以及主要的调度器(ATEDispatcher
)。前一个类被设计为可继承的,而最后一个类被设计为可实例化的。
定义生产者
一个Producer
对象负责对来自调度器的标记为特定topic
的输入消息产生响应,该主题是该对象可以处理的。它必须实现produce_dispatcher_output
。
import time
from typing import Any
from ate_dispatcher import Producer
# Defining a producer
class SpecificTopicProducer(Producer):
def __init__(self, _id: int, timeout: int, *topics):
super().__init__()
self.id = _id
self.timeout = timeout / 1000
self.topics = set(topics)
def produce_dispatcher_output(
self, topic: str, *args, **kwargs) -> Any:
time.sleep(timeout)
return {
'id': self.id,
'some_key': topic,
'args': args,
'kwargs': kwargs
}
定义结果监听器
一个 ResultListener
对象将接收对特定主题做出反应的 Producer
对象发出的响应。这也是调度器架构的终点,所有最终消息都将到达这里。ResultListener
子类必须实现 process_dispatcher_result
方法。
from typing import Any
from ate_dispatcher import ResultListener
# Defining a result listener
class ResultListenerExample(ResultListener):
def __init__(self):
super().__init__()
self.responses = {}
def clear(self):
self.responses = {}
def process_dispatcher_result(self, topic: str, response: Any):
topic_responses = self.responses.get(topic, [])
topic_responses.append(response)
self.responses[topic] = topic_responses
创建、使用和销毁调度器
在定义了 Producer
和 ResultListener
子类之后,需要实例化和注册 ATEDispatcher 实例
。由于接口继承自 threading.Thread
,需要通过 start
和 end
方法跟踪它们的生命周期。
import time
# Import the producer and result listener classes
from specific_topic_producer import SpecificTopicProducer
from result_listener_example import ResultListenerExample
# Import the dispatcher
from ate_dispatcher import ATEDispatcher
# Create the dispatcher
dispatcher = ATEDispatcher()
# Start the dispatcher, the lifetime is delegated to the end developer.
dispatcher.start()
# Define the producers and register them against the dispatcher
producer1 = SpecificTopicProducer(0, 200, 'topic1', 'my_topic')
producer1.start()
producer2 = SpecificTopicProducer(1, 500, 'topic1', 'topic2', 'my_topic')
producer2.start()
for topic in producer1.topics:
dispatcher.register_result_producer(producer1, topic)
for topic in producer2.topics:
dispatcher.register_result_producer(producer2, topic)
# Define the result listeners and register them against the dispatcher
listener1 = ResultListenerExample()
listener1.start()
listener2 = ResultListenerExample()
listener2.start()
for topic in ['topic1', 'topic2', 'my_topic']:
# The first listener will receive all responses tagged with all topics
dispatcher.register_result_listener(listener1, topic)
for topic in ['topic1', 'my_topic']:
# This listener will attend to certain topics.
dispatcher.register_result_listener(listener2, topic)
由于调度器架构是完全异步的,触发消息可能指示给定主题上所有已注册生产者发出响应的最大超时时间(以毫秒为单位)。在指定超时后收到的任何响应都将被丢弃。此外,消息将按照到达顺序传递给结果监听器。
# Trigger a dispatcher request with a 4000ms timeout on the topic1
dispatcher.send_request('topic1', 3, 4, 5, ttl=4000, keyword='b')
# Wait for responses to arrive
time.sleep(1.0)
# Both listeners should have received the responses from both producers.
expected_responses = {
'topic1': [
{
'id': 0
'some_key': 'topic1',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
},
{
'id': 1
'some_key': 'topic1',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
},
]
}
assert listener1.responses == expected_responses
assert listener2.responses == expected_responses
# Clear the listener responses
listener1.clear()
listener2.clear()
# Trigger a dispatcher request with a 300ms timeout limit on the topic my_topic
dispatcher.send_request('my_topic', 3, 4, 5, ttl=300, keyword='b')
# Wait for responses to arrive
time.sleep(0.5)
# Both listeners should have only received the response from producer1.
expected_responses = {
'my_topic': [
{
'id': 0
'some_key': 'my_topic',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
}
]
}
assert listener1.responses == expected_responses
assert listener2.responses == expected_responses
# Clear the listener1 responses
listener1.clear()
# Trigger a dispatcher request with a 1000ms timeout limit on the topic topic2
dispatcher.send_request('topic2', 3, 4, 5, ttl=1000, keyword='b')
# Wait for responses to arrive
time.sleep(1.0)
# Only the listener1 should have received the message produced by the producer2
expected_responses = {
'topic2': [
{
'id': 1
'some_key': 'topic2',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
}
]
}
assert listener1.responses == expected_responses
最后,可以随时从特定主题注销已注册的每个 Producer
和 ResultListener
实例。然而,在停止任一 Producer
或 ResultListener
实例之前,必须注销每个注册的主题。
# Deregister the listener2 and the producer1 from certain topics
dispatcher.deregister_result_listener(listener2, 'my_topic')
dispatcher.deregister_result_producer(producer1, 'topic1')
# Stopping the producer and result listener instances
for topic in producer1.topics:
dispatcher.deregister_result_producer(producer1, topic)
for topic in producer2.topics:
dispatcher.deregister_result_producer(producer2, topic)
producer1.stop()
producer2.stop()
for topic in ['topic1', 'topic2', 'my_topic']:
dispatcher.deregister_result_listener(listener1, topic)
for topic in ['topic1', 'my_topic']:
dispatcher.deregister_result_listener(listener2, topic)
# Stop the dispatcher
dispatcher.stop()
运行测试
我们使用 pytest 来运行测试,如下所示
pytest -x -v ate_dispatcher/tests
变更日志
访问我们的 变更日志 文件,了解更多关于我们的新功能和改进。
贡献指南
我们遵循 PEP8 和 PEP257。我们为此包中声明的所有函数和类使用 MyPy 类型注解。如果您有任何问题/疑问,请随时发送 PR 或创建问题。
项目详情
ate-dispatcher-0.1.0.tar.gz 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | c8de28e5cb8169078a5f15dee1caf0edeb44710a5ba598a41fbe9972f0fd2b54 |
|
MD5 | 0eda56a56b3077ca49279fac2e272699 |
|
BLAKE2b-256 | fbab4aa2bc7ea3d0383e3dc83fa43acada41cbd2bcb3daf9b0e3e1fd56ca65fe |
ate_dispatcher-0.1.0-py3-none-any.whl 的散列
算法 | 散列摘要 | |
---|---|---|
SHA256 | 1c90000560828f3fb53d08627a95a47bd57ce155aa07810b6f52019b66ef9d8d |
|
MD5 | bd4ccf267e4324fc52ef58b7a7dca3b8 |
|
BLAKE2b-256 | f0b331e0c5ef8cc60974c214c904e729dea6e493d26b1088c4bed210f0b75c3f |