Kafka 0.7客户端
项目描述
一天早上,当格列高里·萨姆萨从焦虑的梦中醒来时,他发现自己躺在床上已经变成了一只巨大的爬虫。
萨姆萨是一个针对Python的集群感知Kafka协议客户端和路由库。它实现了协议的0.7版本。
它目前处于开发中,但已在内部使用并取得了一些成功。
介绍
什么是Kafka?
Kafka是一个分布式发布/订阅消息系统。集群管理由ZooKeeper完成。
- 消息
消息是客户端或服务之间要通信的数据单元。消息没有任何固有的结构——就Kafka而言,消息只是一个字节数组,应用程序可以以对其自身环境有意义的方式序列化/反序列化负载。
- 代理
代理是存储和提供消息的节点/服务器。
- 主题
消息发布到主题,类似于频道。每个主题都可在所有服务器上获得。
- 分区
主题被划分为分区,这些分区被分配到各个代理中。每个分区只由一个代理拥有。
一个Kafka集群的客户端通常分为两个不同的类别,尽管这些角色并不是互斥的
- 生产者
生产者向主题发布消息。
- 消费者
消费者订阅来自主题的消息。
本项目目标
您的应用程序代码不应依赖于代理状态或Kafka集群配置。它唯一应直接了解的服务是ZooKeeper集群。
提供与Scala客户端类似级别的抽象和功能,但使用Pythonic惯用和原生数据结构。
保证
只要有一个节点可以接受写入,每个发布都将被写入(至少)一个节点。
使用方法和API概述
创建集群
>>> from kazoo.client import KazooClient >>> from samsa.cluster import Cluster >>> zookeeper = KazooClient() >>> zookeeper.start() >>> cluster = Cluster(zookeeper)
代理
要列出集群中按代理ID排列的所有代理
>>> print cluster.brokers.keys() [0, 1]
要通过ID获取Broker对象,请将ID作为字典项查找中的键
>>> print cluster.brokers[0] <samsa.brokers.Broker at 0x1005f4c10: id=0>
主题
Cluster对象还提供了类似字典的接口,用于按名称检索Topic对象。例如
>>> topic = cluster.topics['example-topic'] >>> print topic <samsa.topics.Topic at 0x1005f4d90: name='example-topic'>
您还可以通过将PartitionMap强制转换为列表来查看可用于在主题内进行写入的分区数量
>>> print list(topic.partitions) [<samsa.partitions.Partition at 0x1005f9b90: topic=<samsa.topics.Topic at 0x1005f4d90: name='example-topic'>, broker=<samsa.brokers.Broker at 0x1005f4c10: id=0>, number=0>]
发布
要向主题发布,请向Topic实例的publish方法提供要发布的字符串或字符串列表
>>> topic.publish('hello world') >>> topic.publish(['hello', 'world'])
如果提供了消息列表,所有消息都将按顺序,在同一个代理上,发送到同一个分区。单个消息没有这样的保证,并且可能会根据分区方法在任意数量的主机上结束,并可能分布在多个分区中,可能会丢失消息之间的任何顺序。
消费
消费者被组织到消费者组中,这使得Kafka可以根据消费者组的配置提供两种不同的语义消息消费方法。
- 队列
消费者组中的每个消费者只接收一次消息,这为消息在组周围提供了一种合理的均匀分布,因为它只拥有主题中可用分区的子集的所有者。这类似于put/get队列或许多AMQP-like接口。
- 主题
每个消费者都是自己的消费者组,并接收发送到主题的所有消息,因为它拥有所有分区的所有者。这类似于事件系统或发布/订阅接口。
关于分区数量的注意事项
配置代理时要注意的一个重要事项是,如果有比分区多的消费者,则某些消费者根本不会收到任何消息,因此您应该从一开始就配置代理,以便将主题拆分为至少与预期消费者数量一样多的分区。
要获取应提供的最小分区数,请使用以下公式
# of brokers * # of partitions per node = # of consumers in the largest consumer group for this topic
创建消费者
要订阅主题,请向Topic实例的subscribe方法提供组名
>>> consumer = topic.subscribe('group-name')
在开始接收消息之前,消费者必须注册到ZooKeeper。(记住,在任何时候,消费者组中只能有一个消费者可以从分区中读取。)
要监听已建立的消费者上的消息
>>> for message in consumer: ... print message
项目详情
samsa-0.3.11.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | db57d2e88213afbc4e02fb3df62f02a61452cd911a9fd69f7079bc0db4904889 |
|
MD5 | 4fecfc8d81e9690c7ee022b73dfd1f5c |
|
BLAKE2b-256 | cad3261465fba1404dd44482e084b2f9986641c08f1828c7309738739e18df53 |