Apache Kafka的Twisted Python客户端
项目描述
Afkak 是一个原生 Twisted 的 Twisted 和 Apache Kafka 客户端库。它提供了以下支持:
- 生产消息,具有自动批处理和可选压缩。
- 消费消息,具有组协调和自动提交。
请参阅 文档 了解更多,从 PyPI 下载,或查看 贡献指南。请通过 GitHub 报告任何问题。
状态
Afkak 支持以下 Python 版本
- CPython 3.5, 3.6, 3.7, 3.8, 和 3.9
- PyPy3
我们旨在支持 Kafka 1.1.x 及以后的版本。集成测试针对以下 Kafka 代理版本运行
- 0.9.0.1
- 1.1.1
计划对 2.0.0 进行测试(见 #45)。
较新的代理版本通常可以工作,但并非所有 Afkak 功能在旧版本代理上都能正常工作。特别是,协调消费者在 Kafka 0.9.0.1 之前无法工作。我们不建议部署如此旧的 Kafka 版本,因为它们存在严重的错误。
用法
高级
注意:此代码不可运行。请参阅 producer_example 和 consumer_example 以获取可运行的示例代码。
from afkak.client import KafkaClient
from afkak.consumer import Consumer
from afkak.producer import Producer
from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS,
PRODUCER_ACK_LOCAL_WRITE)
kClient = KafkaClient("localhost:9092")
# To send messages
producer = Producer(kClient)
d1 = producer.send_messages("my-topic", msgs=[b"some message"])
d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"])
# To get confirmations/errors on the sends, add callbacks to the returned deferreds
d1.addCallbacks(handleResponses, handleErrors)
# To wait for acknowledgements
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# [ the default ]
# PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed
# by all in sync replicas before sending a response
producer = Producer(kClient,
req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,
ack_timeout=2000)
responseD = producer.send_messages("my-topic", msgs=[b"message"])
# Using twisted's @inlineCallbacks:
responses = yield responseD
if response:
print(response[0].error)
print(response[0].offset)
# To send messages in batch: You can use a producer with any of the
# partitioners for doing this. The following producer will collect
# messages in batch and send them to Kafka after 20 messages are
# collected or every 60 seconds (whichever comes first). You can
# also batch by number of bytes.
# Notes:
# * If the producer dies before the messages are sent, the caller would
# * not have had the callbacks called on the send_messages() returned
# * deferreds, and so can retry.
# * Calling producer.stop() before the messages are sent will
# errback() the deferred(s) returned from the send_messages call(s)
producer = Producer(kClient, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)
responseD1 = producer.send_messages("my-topic", msgs=[b"message"])
responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"])
# To consume messages
# define a function which takes a list of messages to process and
# possibly returns a deferred which fires when the processing is
# complete.
def processor_func(consumer, messages):
# Store_Messages_In_Database may return a deferred
result = store_messages_in_database(messages)
# record last processed message
consumer.commit()
return result
the_partition = 3 # Consume only from partition 3.
consumer = Consumer(kClient, "my-topic", the_partition, processor_func)
d = consumer.start(OFFSET_EARLIEST) # Start reading at earliest message
# The deferred returned by consumer.start() will fire when an error
# occurs that can't handled by the consumer, or when consumer.stop()
# is called
yield d
consumer.stop()
kClient.close()
键控消息
from afkak.client import KafkaClient
from afkak.producer import Producer
from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost:9092")
# Use the HashedPartitioner so that the producer will use the optional key
# argument on send_messages()
producer = Producer(kafka, partitioner_class=HashedPartitioner)
producer.send_messages("my-topic", "key1", [b"some message"])
producer.send_messages("my-topic", "key2", [b"this method"])
低级
from afkak.client import KafkaClient
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProtocol.encode_message(b"some message")])
resps = afkak.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()
resps[0].topic # b"my-topic"
resps[0].partition # 1
resps[0].error # 0 (hopefully)
resps[0].offset # offset of the first message sent in this request
安装
Afkak 版本可在 PyPI 上获得 。
由于 Afkak 依赖的 Twisted 和 python-snappy 包含二进制扩展模块,因此您需要安装您希望使用的解释器的 Python 开发头文件
Debian/Ubuntu | sudo apt-get install build-essential python3-dev pypy3-dev libsnappy-dev
|
OS X | brew install python pypy snappy pip install virtualenv |
然后可以使用 pip 如常安装 Afkak
许可证
版权 2013, 2014, 2015 David Arthur,根据 Apache 许可证,版本 2.0。见 LICENSE
版权 2014, 2015 Cyan, Inc.,根据 Apache 许可证,版本 2.0。见 LICENSE
版权 2015–2021 Ciena Corporation,根据 Apache 许可证,版本 2.0。见 LICENSE
该项目最初是将 kafka-python 库移植到 Twisted。
请参阅 AUTHORS.md 了解完整的贡献者列表。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。
源分布
构建发行版
afkak-21.5.0.post0.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | bd30fcf8ebbbd9d4eca59e34e0af7567e57bdae02e4b67095dd2f216116c63c4 |
|
MD5 | 141138f9d0067a234ebad48c4bbd188e |
|
BLAKE2b-256 | 31cba863f8c163440d0e8ae2283ab0284a1810f5cc661283c2c507036d185e17 |
afkak-21.5.0.post0-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d852e4497b233f0bebc513084d942e5765235971559eb3210b1bffd6e219a238 |
|
MD5 | 16b907d6ece9321da3b5a314a5007670 |
|
BLAKE2b-256 | 810b55c21dddb003f326219f4091be3c5b6c36ccb1816d6f210ccf0366058c95 |