跳转到主要内容

Kafka 0.7客户端

项目描述

travis-ci

一天早上,当格列高里·萨姆萨从焦虑的梦中醒来时,他发现自己躺在床上已经变成了一只巨大的爬虫。

萨姆萨是一个针对Python的集群感知Kafka协议客户端和路由库。它实现了协议的0.7版本

它目前处于开发中,但已在内部使用并取得了一些成功。

介绍

什么是Kafka?

Kafka是一个分布式发布/订阅消息系统。集群管理由ZooKeeper完成。

消息

消息是客户端或服务之间要通信的数据单元。消息没有任何固有的结构——就Kafka而言,消息只是一个字节数组,应用程序可以以对其自身环境有意义的方式序列化/反序列化负载。

代理

代理是存储和提供消息的节点/服务器。

主题

消息发布到主题,类似于频道。每个主题都可在所有服务器上获得。

分区

主题被划分为分区,这些分区被分配到各个代理中。每个分区只由一个代理拥有。

一个Kafka集群的客户端通常分为两个不同的类别,尽管这些角色并不是互斥的

生产者

生产者向主题发布消息。

消费者

消费者订阅来自主题的消息。

本项目目标

  • 您的应用程序代码不应依赖于代理状态或Kafka集群配置。它唯一应直接了解的服务是ZooKeeper集群。

  • 提供与Scala客户端类似级别的抽象和功能,但使用Pythonic惯用和原生数据结构。

保证

  • 只要有一个节点可以接受写入,每个发布都将被写入(至少)一个节点。

使用方法和API概述

创建集群

>>> from kazoo.client import KazooClient
>>> from samsa.cluster import Cluster
>>> zookeeper = KazooClient()
>>> zookeeper.start()
>>> cluster = Cluster(zookeeper)

代理

要列出集群中按代理ID排列的所有代理

>>> print cluster.brokers.keys()
[0, 1]

要通过ID获取Broker对象,请将ID作为字典项查找中的键

>>> print cluster.brokers[0]
<samsa.brokers.Broker at 0x1005f4c10: id=0>

主题

Cluster对象还提供了类似字典的接口,用于按名称检索Topic对象。例如

>>> topic = cluster.topics['example-topic']
>>> print topic
<samsa.topics.Topic at 0x1005f4d90: name='example-topic'>

您还可以通过将PartitionMap强制转换为列表来查看可用于在主题内进行写入的分区数量

>>> print list(topic.partitions)
[<samsa.partitions.Partition at 0x1005f9b90: topic=<samsa.topics.Topic at 0x1005f4d90: name='example-topic'>, broker=<samsa.brokers.Broker at 0x1005f4c10: id=0>, number=0>]

发布

要向主题发布,请向Topic实例的publish方法提供要发布的字符串或字符串列表

>>> topic.publish('hello world')
>>> topic.publish(['hello', 'world'])

如果提供了消息列表,所有消息都将按顺序,在同一个代理上,发送到同一个分区。单个消息没有这样的保证,并且可能会根据分区方法在任意数量的主机上结束,并可能分布在多个分区中,可能会丢失消息之间的任何顺序。

消费

消费者被组织到消费者组中,这使得Kafka可以根据消费者组的配置提供两种不同的语义消息消费方法。

队列

消费者组中的每个消费者只接收一次消息,这为消息在组周围提供了一种合理的均匀分布,因为它只拥有主题中可用分区的子集的所有者。这类似于put/get队列或许多AMQP-like接口。

主题

每个消费者都是自己的消费者组,并接收发送到主题的所有消息,因为它拥有所有分区的所有者。这类似于事件系统或发布/订阅接口。

关于分区数量的注意事项

配置代理时要注意的一个重要事项是,如果有比分区多的消费者,则某些消费者根本不会收到任何消息,因此您应该从一开始就配置代理,以便将主题拆分为至少与预期消费者数量一样多的分区。

要获取应提供的最小分区数,请使用以下公式

# of brokers * # of partitions per node = # of consumers in the largest consumer group for this topic

创建消费者

要订阅主题,请向Topic实例的subscribe方法提供组名

>>> consumer = topic.subscribe('group-name')

在开始接收消息之前,消费者必须注册到ZooKeeper。(记住,在任何时候,消费者组中只能有一个消费者可以从分区中读取。)

要监听已建立的消费者上的消息

>>> for message in consumer:
...     print message

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

samsa-0.3.11.tar.gz (28.0 kB 查看哈希值)

上传时间

支持者