跳转到主要内容

全功能纯Python Kafka客户端

项目描述

https://travis-ci.org/Parsely/pykafka.svg?branch=master https://codecov.io/github/Parsely/pykafka/coverage.svg?branch=master

PyKafka

http://i.imgur.com/ztYl4lG.jpg

PyKafka 是一个针对 Python 的 Kafka 客户端,它对 Kafka 生产者和消费者进行了 Python 实现,并可选地通过基于 librdkafka 的 C 扩展来支持。它支持 Python 2.7+、Python 3.4+ 和 PyPy,并支持 Kafka 0.8.2 及以上版本。

PyKafka 的主要目标是提供类似于 JVM Kafka 客户端的抽象级别,同时使用 Python 程序员熟悉的语法,并尽可能暴露最符合 Python 语言的 API。

您可以使用以下命令从 PyPI 安装 PyKafka:

$ pip install pykafka

或者从 conda-forge 安装:

$ conda install -c conda-forge pykafka

PyKafka 的完整文档和使用示例可以在 readthedocs 上找到。

您可以通过克隆此仓库并运行以下命令来安装 PyKafka 用于本地开发和测试:

$ python setup.py develop

入门指南

假设您至少在一个运行在 localhost 上的 Kafka 实例上运行 Kafka,您可以使用 PyKafka 来连接到它。

>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")

或者,对于 TLS 连接,您可以编写以下代码(也请参阅 SslConfig 文档以获取更多详细信息)

>>> from pykafka import KafkaClient, SslConfig
>>> config = SslConfig(cafile='/your/ca.cert',
...                    certfile='/your/client.cert',  # optional
...                    keyfile='/your/client.key',  # optional
...                    password='unlock my client key please')  # optional
>>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
...                      ssl_config=config)

如果您连接到的集群上定义了任何主题,您可以使用以下命令列出它们:

>>> client.topics
>>> topic = client.topics['my.test']

一旦您有了 Topic,您可以为它创建一个 Producer 并开始发送消息。

>>> with topic.get_sync_producer() as producer:
...     for i in range(4):
...         producer.produce('test message ' + str(i ** 2))

上面的示例将以同步方式发送到 Kafka - 调用只有在确认消息已到达集群后才会返回。

为了实现更高的吞吐量,我们建议使用异步模式的 Producer,这样 produce() 调用将立即返回,生产者可以选择以更大的批量发送消息。生产者会在发送每个批次之前将已生产的消息收集到内部队列中 linger_ms。这种延迟可以通过 linger_msmin_queued_messages 和其他关键字参数来消除或更改(请参阅 readthedocs)。您仍然可以通过设置 delivery_reports=True 来启用队列接口,从而获取消息的投递确认。以下是一个大致的用法示例

>>> with topic.get_producer(delivery_reports=True) as producer:
...     count = 0
...     while True:
...         count += 1
...         producer.produce('test msg', partition_key='{}'.format(count))
...         if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
...             while True:
...                 try:
...                     msg, exc = producer.get_delivery_report(block=False)
...                     if exc is not None:
...                         print 'Failed to deliver msg {}: {}'.format(
...                             msg.partition_key, repr(exc))
...                     else:
...                         print 'Successfully delivered msg {}'.format(
...                         msg.partition_key)
...                 except Queue.Empty:
...                     break

请注意,投递报告队列是线程本地的:它只为当前线程产生的消息提供服务。另外,如果您使用 delivery_reports=True,则无法消费投递报告队列将导致 PyKafka 的内存使用无限制增长。

您还可以使用 Consumer 实例从该主题消费消息。

>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
...     if message is not None:
...         print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9

这个 SimpleConsumer 无法扩展 - 如果您有两个 SimpleConsumer 消费同一个主题,它们将接收到重复的消息。为了解决这个问题,您可以使用 BalancedConsumer

>>> balanced_consumer = topic.get_balanced_consumer(
...     consumer_group='testgroup',
...     auto_commit_enable=True,
...     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
... )

您可以拥有尽可能多的 BalancedConsumer 实例来消费一个主题,该主题有多少分区就有多少个 BalancedConsumer 实例。如果它们都连接到同一个 zookeeper 实例,它们将与之通信来自动平衡它们之间的分区。默认情况下,BalancedConsumer 使用的分区分配策略是“range”策略。策略可以通过 membership_protocol 关键字参数进行切换,可以是 pykafka.membershipprotocol 暴露的对象,或者 pykafka.membershipprotocol.GroupMembershipProtocol 的自定义实例。

您还可以使用 Kafka 0.9 Group Membership API 通过在 get_balanced_consumer 上使用 managed 关键字参数。

使用 librdkafka 扩展

PyKafka 包含一个利用 librdkafka 加速生产者和消费者操作的 C 扩展。要使用 librdkafka 扩展,您需要确保头文件和共享库位于 Python 可以找到的位置,无论是在构建扩展(由 setup.py develop 处理)还是在运行时。通常,这意味着您需要将 librdkafka 安装在系统常规位置,或者在 shell 环境中声明 C_INCLUDE_PATHLIBRARY_PATHLD_LIBRARY_PATH 来指向 librdkafka 共享对象安装位置。您可以使用 locate librdkafka.so 命令找到此位置。

之后,您只需在 topic.get_producer()topic.get_simple_consumer()topic.get_balanced_consumer() 中传递一个额外的参数 use_rdkafka=True。请注意,某些配置选项可能具有不同的最佳值;在这种情况下,查阅 librdkafka 的 配置说明 可能是有益的。

操作工具

PyKafka 包含一组小型 CLI 工具,可以帮助处理与 Kafka 集群管理相关的常见任务,包括偏移量和延迟监控以及主题检查。可以通过运行以下命令找到这些工具的完整、最新的接口:

$ python cli/kafka_tools.py --help

或通过 setuptools 或 pip 安装 PyKafka 后

$ kafka-tools --help

PyKafka 或 kafka-python?

这两个项目是不同的。有关两个项目之间的比较,请参阅 这里的讨论

贡献

如果您有兴趣为 PyKafka 贡献代码,可以从标记为 “help wanted” 的问题标签开始。我们还建议查看 贡献指南

支持

如果您需要使用 PyKafka 的帮助,有许多可用资源。对于使用问题或常见食谱,请查看 StackOverflow 标签。对于更深入的问题或您想直接发送给 PyKafka 维护者的询问,Google Group 可能很有用。如果您认为您在 PyKafka 中发现了一个错误,请在阅读 贡献指南 后打开 github 问题

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分发

pykafka-2.8.0.tar.gz (141.6 kB 查看散列值)

上传时间

构建分发

pykafka-2.8.0-py3.6-linux-x86_64.egg (415.6 kB 查看散列值)

上传时间

支持