跳转到主要内容

单主群组成员框架

项目描述

PyPI package CI Status Test Coverage

一个简单的基于Twisted的单主群组成员框架,有助于在多个节点之间划分工作负载或无状态数据。它包括2个组件

  1. 作为Twisted插件提供的独立TCP服务器,用于跟踪群组。目前协议是HTTP,但将来可能会改变。

  2. 基于Twisted的客户端库,与上述服务器通信(可按需实现其他语言库)

它提供了基于心跳的故障检测。然而,由于它是单主,服务器是一个故障点。但由于服务器完全无状态,它可以很容易地重启而不会出现任何问题。

它支持Python 2.7和3.6。

安装

在服务器和客户端节点上分别运行pip install bloc

用法

在服务器上运行twist -t 4 -s 6,其中4是客户端心跳超时,6是解决超时(以下解释)。这将默认在端口8989上启动HTTP服务器。可以通过-l tcp:port选项指定不同的端口。

在客户端,为了在多个节点之间平均分配items,创建BlocClient并定期调用get_index_total。以下是一个示例代码:

from functools import partial
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks, gatherResults

@inlineCallbacks
def do_stuff(bc):
    """ Process items based on index and total got from BlocClient """
    # get_index_total returns this node's index and total number of nodes in the group
    index_total = bc.get_index_total()
    if index_total is None:
        return
    index, total = index_total
    items = yield get_items_to_process()
    my_items = filter(partial(is_my_item, index, total), items)
    yield gatherResults([process_item(item) for item in my_items])

def is_my_item(index, total, item):
    """ Can I process this item? """
    return hash(item) % total + 1 == index

@task.react
def main(reactor):
    bc = BlocClient(reactor, "server_ip:8989", 3)
    bc.startService()
    # Call do_stuff every 2 seconds
    return task.LoopingCall(do_stuff, bc).start(2)

这里,重要的函数是is_my_item,它根据索引和总数决定是否由该节点处理该项。它基于项的哈希值工作。不用说,实现一个稳定的哈希函数对于您的项非常重要。理想情况下,项不应该有任何其他属性,只应该是某种键(字符串)。这个函数将保证只有一个节点处理特定的项,前提是bloc服务器为每个节点提供唯一的索引。

作为一个例子,假设节点A和B正在运行上面的代码,与同一bloc服务器通信,并处理以下列表中的用户ID:

1. 365f54e9-5de8-4509-be16-38e0c37f5fe9
2. f6a6a396-d0bf-428a-b63b-830f98874b6c
3. 6bec3551-163d-4eb8-b2d8-1f2c4b106d33
4. b6691e16-1d95-42de-8ad6-7aa0c81fe080

如果节点A的get_item_index返回(1, 2),则is_my_item将为用户ID 1和3返回True,而在节点B中,get_item_index返回(2, 2),则is_my_item将为用户ID 2和4返回True。这样,您就可以在多个节点之间分配用户ID。

哈希函数和键空间的选择可能会决定工作负载如何在节点之间均匀分布。

上述代码假设items是动态的,如果它是基于您应用程序数据(如用户)的话,这将是真的。然而,在某些情况下,它可以是固定数字,如果您的数据已经在固定数量的桶中分区,那么您可以使用bloc将桶分配给每个节点。一个例子是otter的调度功能,它将事件分配到固定的10个桶中,并在10个节点之间分配桶。另一个例子是kafka的分区主题。每个节点可以根据索引和总数消费特定的分区。

当没有分配索引时,get_index_total返回None,这可能在添加/删除节点或客户端由于任何网络问题无法与服务器通信时发生。当发生这种情况时,客户端必须停止工作,因为下次节点可能被分配不同的索引。这就是为什么客户端基于索引的处理必须是无状态的。

索引和总数在创建BlocClient时以提供的间隔内部更新。它们可能会随时间变化,但只有在get_index_total在稳定期(在启动服务器时提供)返回None之后才会变化。因此,必须在稳定期内至少调用一次get_index_total,以确保始终有最新的值,并且不会意外地使用错误的索引。

您可能已经注意到了上面的代码中的bc.startService,它必须在调用get_index_total之前调用。如果您正在使用服务层次结构设置twisted服务器,则最好将BlocClient对象作为子服务添加。这样,Twisted将在需要时启动和停止服务。

它是如何工作的

服务器在任何时候都处于两种状态之一:SETTLING(正在结算)或SETTLED(已结算)。它从SETTLING状态开始,当节点开始加入或离开时保持该状态。当节点停止活动(没有更多的加入/离开)一段时间(启动服务器时提供的称为结算时间)后,它就过渡到SETTLED状态,此时为每个节点分配一个索引并通知它们。结算时间在启动服务器时通过-s选项提供,通常应比心跳间隔多几秒。这样,服务器可以避免在多个节点在接近的时间加入/离开时无必要地分配索引。

客户端在创建BlocClient时提供的间隔内向服务器发送心跳。服务器根据这个心跳跟踪客户端,并删除任何在配置时间内没有心跳的客户端。这个时间在启动服务器时作为-t选项提供。服务器提供的超时时间应略大于客户端提供的心跳间隔,以考虑延迟或暂时的网络问题。在上面的例子中,服务器在4秒后超时,客户端每3秒发送一次心跳。这种心跳机制提供了故障检测。如果任何节点出现故障,该节点将停止处理工作。

需要了解的一些事项

  • 无安全性:目前服务器不验证客户端并接受任何客户端。连接也不是加密的。根据需求,我计划添加双向TLS认证。

  • 未进行基准测试。然而,由于它全部在内存中,并且使用了Twisted,它应该可以轻松扩展到数百个客户端。我将在稍后进行一些测试并更新。

  • 默认情况下,twist日志级别为info,由于HTTP中的心跳,每个请求都会被记录。您可以使用--log-level=warn选项来避免它。

项目详细信息


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

bloc-0.1.2.tar.gz (9.1 kB 查看哈希)

上传时间

构建分发

bloc-0.1.2-py2.py3-none-any.whl (13.4 kB 查看哈希)

上传时间 Python 2 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面