全功能纯Python Kafka客户端
项目描述
PyKafka
PyKafka 是一个针对 Python 的 Kafka 客户端,它对 Kafka 生产者和消费者进行了 Python 实现,并可选地通过基于 librdkafka 的 C 扩展来支持。它支持 Python 2.7+、Python 3.4+ 和 PyPy,并支持 Kafka 0.8.2 及以上版本。
PyKafka 的主要目标是提供类似于 JVM Kafka 客户端的抽象级别,同时使用 Python 程序员熟悉的语法,并尽可能暴露最符合 Python 语言的 API。
您可以使用以下命令从 PyPI 安装 PyKafka:
$ pip install pykafka
或者从 conda-forge 安装:
$ conda install -c conda-forge pykafka
PyKafka 的完整文档和使用示例可以在 readthedocs 上找到。
您可以通过克隆此仓库并运行以下命令来安装 PyKafka 用于本地开发和测试:
$ python setup.py develop
入门指南
假设您至少在一个运行在 localhost 上的 Kafka 实例上运行 Kafka,您可以使用 PyKafka 来连接到它。
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")
或者,对于 TLS 连接,您可以编写以下代码(也请参阅 SslConfig 文档以获取更多详细信息)
>>> from pykafka import KafkaClient, SslConfig
>>> config = SslConfig(cafile='/your/ca.cert',
... certfile='/your/client.cert', # optional
... keyfile='/your/client.key', # optional
... password='unlock my client key please') # optional
>>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
... ssl_config=config)
如果您连接到的集群上定义了任何主题,您可以使用以下命令列出它们:
>>> client.topics
>>> topic = client.topics['my.test']
一旦您有了 Topic,您可以为它创建一个 Producer 并开始发送消息。
>>> with topic.get_sync_producer() as producer:
... for i in range(4):
... producer.produce('test message ' + str(i ** 2))
上面的示例将以同步方式发送到 Kafka - 调用只有在确认消息已到达集群后才会返回。
为了实现更高的吞吐量,我们建议使用异步模式的 Producer,这样 produce() 调用将立即返回,生产者可以选择以更大的批量发送消息。生产者会在发送每个批次之前将已生产的消息收集到内部队列中 linger_ms。这种延迟可以通过 linger_ms、min_queued_messages 和其他关键字参数来消除或更改(请参阅 readthedocs)。您仍然可以通过设置 delivery_reports=True 来启用队列接口,从而获取消息的投递确认。以下是一个大致的用法示例
>>> with topic.get_producer(delivery_reports=True) as producer:
... count = 0
... while True:
... count += 1
... producer.produce('test msg', partition_key='{}'.format(count))
... if count % 10 ** 5 == 0: # adjust this or bring lots of RAM ;)
... while True:
... try:
... msg, exc = producer.get_delivery_report(block=False)
... if exc is not None:
... print 'Failed to deliver msg {}: {}'.format(
... msg.partition_key, repr(exc))
... else:
... print 'Successfully delivered msg {}'.format(
... msg.partition_key)
... except Queue.Empty:
... break
请注意,投递报告队列是线程本地的:它只为当前线程产生的消息提供服务。另外,如果您使用 delivery_reports=True,则无法消费投递报告队列将导致 PyKafka 的内存使用无限制增长。
您还可以使用 Consumer 实例从该主题消费消息。
>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
... if message is not None:
... print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9
这个 SimpleConsumer 无法扩展 - 如果您有两个 SimpleConsumer 消费同一个主题,它们将接收到重复的消息。为了解决这个问题,您可以使用 BalancedConsumer。
>>> balanced_consumer = topic.get_balanced_consumer(
... consumer_group='testgroup',
... auto_commit_enable=True,
... zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
... )
您可以拥有尽可能多的 BalancedConsumer 实例来消费一个主题,该主题有多少分区就有多少个 BalancedConsumer 实例。如果它们都连接到同一个 zookeeper 实例,它们将与之通信来自动平衡它们之间的分区。默认情况下,BalancedConsumer 使用的分区分配策略是“range”策略。策略可以通过 membership_protocol 关键字参数进行切换,可以是 pykafka.membershipprotocol 暴露的对象,或者 pykafka.membershipprotocol.GroupMembershipProtocol 的自定义实例。
您还可以使用 Kafka 0.9 Group Membership API 通过在 get_balanced_consumer 上使用 managed 关键字参数。
使用 librdkafka 扩展
PyKafka 包含一个利用 librdkafka 加速生产者和消费者操作的 C 扩展。要使用 librdkafka 扩展,您需要确保头文件和共享库位于 Python 可以找到的位置,无论是在构建扩展(由 setup.py develop 处理)还是在运行时。通常,这意味着您需要将 librdkafka 安装在系统常规位置,或者在 shell 环境中声明 C_INCLUDE_PATH、LIBRARY_PATH 和 LD_LIBRARY_PATH 来指向 librdkafka 共享对象安装位置。您可以使用 locate librdkafka.so 命令找到此位置。
之后,您只需在 topic.get_producer()、topic.get_simple_consumer() 或 topic.get_balanced_consumer() 中传递一个额外的参数 use_rdkafka=True。请注意,某些配置选项可能具有不同的最佳值;在这种情况下,查阅 librdkafka 的 配置说明 可能是有益的。
操作工具
PyKafka 包含一组小型 CLI 工具,可以帮助处理与 Kafka 集群管理相关的常见任务,包括偏移量和延迟监控以及主题检查。可以通过运行以下命令找到这些工具的完整、最新的接口:
$ python cli/kafka_tools.py --help
或通过 setuptools 或 pip 安装 PyKafka 后
$ kafka-tools --help
PyKafka 或 kafka-python?
这两个项目是不同的。有关两个项目之间的比较,请参阅 这里的讨论。
贡献
如果您有兴趣为 PyKafka 贡献代码,可以从标记为 “help wanted” 的问题标签开始。我们还建议查看 贡献指南。
支持
如果您需要使用 PyKafka 的帮助,有许多可用资源。对于使用问题或常见食谱,请查看 StackOverflow 标签。对于更深入的问题或您想直接发送给 PyKafka 维护者的询问,Google Group 可能很有用。如果您认为您在 PyKafka 中发现了一个错误,请在阅读 贡献指南 后打开 github 问题。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。