跳转到主要内容

Apache Kafka的纯Python客户端

项目描述

https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg https://img.shields.io/pypi/pyversions/kafka-python.svg https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github https://travis-ci.org/dpkp/kafka-python.svg?branch=master https://img.shields.io/badge/license-Apache%202-blue.svg

Apache Kafka分布式流处理系统的Python客户端。kafka-python旨在与官方Java客户端类似,并增加了一些Pythonic接口(例如,消费者迭代器)。

kafka-python最好与较新的代理(0.9+)一起使用,但与较旧版本(至0.8.0)向后兼容。某些功能只能在较新的代理上启用。例如,完全协调的消费者组(即,将多个消费者动态分配到同一组)需要使用0.9+的kafka代理。为较早的代理发布支持此功能需要编写和维护自定义领导选举和成员/健康检查代码(可能使用zookeeper或consul)。对于较旧的代理,您可以通过使用如chef、ansible等配置管理工具手动将不同的分区分配给每个消费者实例来实现类似的功能。这种方法将正常工作,尽管它不支持在故障时的重新平衡。有关更多详细信息,请参阅 <https://kafka-python.readthedocs.io/en/master/compatibility.html>。

请注意,主分支可能包含未发布的功能。有关发布文档,请参阅readthedocs和/或Python的内置帮助。

>>> pip install kafka-python

KafkaConsumer

KafkaConsumer 是一个高级消息消费者,旨在尽可能类似于官方的 Java 客户端。要完全支持协调的消费者组,需要使用支持 Group API 的 Kafka 代理:kafka v0.9+。

请参阅 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html> 获取 API 和配置详细信息。

消费者迭代器返回 ConsumerRecords,它是一些简单的命名元组,公开了基本的消息属性:主题、分区、偏移量、键和值

>>> from kafka3 import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
...     print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka3 import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
...     print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka3 import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
...     print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()

KafkaProducer

KafkaProducer 是一个高级的、异步的消息生产者。该类旨在尽可能类似于官方的 Java 客户端。有关更多详细信息,请参阅 <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>。

>>> from kafka3 import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
...     producer.send('foobar', b'some_message_bytes')
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)
>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
...     producer.send('foobar', b'msg %d' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()

线程安全

KafkaProducer 可以跨线程使用而不会出现问题,而 KafkaConsumer 则不行。

尽管可以在线程局部方式中使用 KafkaConsumer,但推荐使用多进程。

压缩

kafka-python 支持以下压缩格式

  • gzip

  • LZ4

  • Snappy

  • Zstandard (zstd)

gzip 是原生支持的,其他格式需要安装额外的库。有关更多信息,请参阅 <https://kafka-python.readthedocs.io/en/master/install.html>。

优化的 CRC32 验证

Kafka 使用 CRC32 校验和来验证消息。kafka-python 包含一个纯 Python 实现以实现兼容性。为了提高高吞吐量应用程序的性能,如果安装了,kafka-python 将使用 crc32c 进行优化的本地代码。有关安装说明,请参阅 <https://kafka-python.readthedocs.io/en/master/install.html>。有关底层 crc32c 库的详细信息,请参阅 https://pypi.ac.cn/project/crc32c/

协议

kafka-python 的次要目标是提供一个易于使用的协议层,通过 Python repl 与 Kafka 代理交互。这对于测试、探测和一般实验非常有用。协议支持被利用来启用 KafkaClient.check_version() 方法,该方法探测 Kafka 代理并尝试确定其运行的版本(0.8.0 到 2.6+)。

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。

源分布

kafka-python3-3.0.0.tar.gz (271.4 kB 查看哈希值)

上传时间

由以下赞助

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面