Apache Kafka的纯Python客户端
项目描述
由于发布问题,建议暂时使用 https://github.com/wbarnha/kafka-python-ng
Apache Kafka分布式流处理系统的Python客户端。kafka-python-ng设计得类似于官方Java客户端,并添加了一些Pythonic接口(例如,消费者迭代器)。
kafka-python-ng最佳用于与较新版本的代理(0.9+)一起使用,但与较旧版本(至0.8.0)也向后兼容。一些功能仅在较新版本的代理上启用。例如,完全协调的消费者组(即,将多个消费者动态分配到同一组)需要使用0.9+的kafka代理。为早期版本的代理发布此功能需要编写和维护自定义领导选举以及成员/健康检查代码(可能使用zookeeper或consul)。对于较旧的代理,您可以通过使用chef、ansible等配置管理工具手动将不同的分区分配给每个消费者实例来达到类似的效果。这种方法可以很好地工作,尽管它不支持故障时的重新平衡。
有关更多详细信息,请参阅https://kafka-python.readthedocs.io/en/master/compatibility.html
。
请注意,主分支可能包含未发布的功能。有关发布文档,请参阅readthedocs和/或python的在线帮助。
$ pip install kafka-python-ng
对于关心此包安全性的用户:该项目使用GitHub Actions中的https://docs.pypi.ac.cn/trusted-publishers/在https://github.com/wbarnha/kafka-python-ng/deployments/pypi发布工件。由于kafka-python无法在此期间发布发布,因此将此项目分支出来以保持项目为Python和Kafka的未来版本而活跃。
KafkaConsumer
KafkaConsumer是一个高级消息消费者,旨在尽可能与官方Java客户端相似。对协调消费者组的全支持需要使用支持组API的kafka代理:kafka v0.9+。
有关API和配置详细信息,请参阅https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
。
消费者迭代器返回ConsumerRecords,它是一个简单的命名元组,公开基本消息属性:主题、分区、偏移量、键和值
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
# or as a static member with a fixed group member name
# consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group',
# group_instance_id='consumer-1', leave_group_on_close=False)
for msg in consumer:
print (msg)
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
for msg in consumer:
print (msg)
# manually assign the partition list for the consumer
from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)
# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
assert isinstance(msg.value, dict)
# Access record headers. The returned value is a list of tuples
# with str, bytes for key and value
for msg in consumer:
print (msg.headers)
# Get consumer metrics
metrics = consumer.metrics()
KafkaProducer
KafkaProducer是一个高级的异步消息生产者。该类旨在尽可能与官方Java客户端相似。
有关API和配置详细信息,请参阅https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
producer.send('foobar', b'some_message_bytes')
# Block until a single message is sent (or timeout)
future = producer.send('foobar', b'another_message')
result = future.get(timeout=60)
# Block until all pending messages are at least put on the network
# NOTE: This does not guarantee delivery or success! It is really
# only useful if you configure internal batching using linger_ms
producer.flush()
# Use a key for hashed-partitioning
producer.send('foobar', key=b'foo', value=b'bar')
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})
# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')
# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)
# Include record headers. The format is list of tuples with string key
# and bytes value.
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
# Get producer performance metrics
metrics = producer.metrics()
线程安全
与KafkaConsumer不同,KafkaProducer可以在线程之间无问题地使用。
虽然可以在线程局部方式中使用KafkaConsumer,但建议使用多进程。
压缩
kafka-python-ng支持以下压缩格式
gzip
LZ4
Snappy
Zstandard (zstd)
gzip受原生支持,其他需要安装额外的库。
有关更多信息,请参阅https://kafka-python.readthedocs.io/en/master/install.html
优化的CRC32验证
Kafka使用CRC32校验和来验证消息。kafka-python-ng包含一个纯Python实现以实现兼容性。为了提高高吞吐量应用程序的性能,如果已安装,kafka-python将使用优化的本地代码的crc32c。有关安装说明,请参阅https://kafka-python.readthedocs.io/en/master/install.html
有关底层crc32c库的详细信息,请参阅https://pypi.ac.cn/project/crc32c/
协议
kafka-python-ng的次要目标是提供一个易于使用的协议层,通过Python repl与kafka代理交互。这对于测试、探测和一般实验非常有用。通过协议支持,启用KafkaClient.check_version()方法,该方法探测kafka代理并尝试识别其正在运行的版本(0.8.0至2.6+)。
项目详细信息
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。
源代码分发
构建分发
kafka_python_ng-2.2.3.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f79f28e10ade9b5a9860b2ec15b7cc8dc510d5702f5a399430478cff5f93a05a |
|
MD5 | fe356d6662a649f3effe56a7a2aaed7b |
|
BLAKE2b-256 | ce041d65bdf3f0103a08710e226b851de4b357ac702f1cadabf6128bab7518a7 |
kafka_python_ng-2.2.3-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | adc6e82147c441ca4ae1f22e291fc08efab0d10971cbd4aa1481d2ffa38e9480 |
|
MD5 | b38beca94a65afd18f2d7f113c73829e |
|
BLAKE2b-256 | 0f6122e778f642465a157c449782300d8817ebbc106794a8a7ebe88cbb846b05 |