跳转到主要内容

Kafka与asyncio的集成

项目描述

|Build status| |Coverage| |Chat on Gitter|

asyncio客户端用于Kafka

AIOKafkaProducer

AIOKafkaProducer是一个高级、异步的消息生产者。

AIOKafkaProducer使用示例

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one():
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

asyncio.run(send_one())

AIOKafkaConsumer

AIOKafkaConsumer 是一个高级、异步的消息消费者。它与分配的 Kafka 群组协调器节点交互,允许多个消费者均衡地消费主题(需要 kafka >= 0.9.0.0)。

AIOKafkaConsumer 使用示例

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

运行测试

运行测试需要 Docker。有关安装说明,请参阅 https://docs.docker.net.cn/engine/installation。请注意,Python 的 lz4 压缩库需要 python-dev 软件包或 Linux 上的 Python 源头文件进行编译。注意:您还需要一个有效的 Java 安装。这是用于生成某些测试的 SSH 密钥的 keytool 工具所需的。

设置测试需求(假设您在 Ubuntu 14.04+ 上的虚拟环境中)

sudo apt-get install -y libkrb5-dev krb5-user
make setup

带有覆盖率的运行测试

make cov

要使用特定版本的 Kafka 运行测试(默认版本为 2.8.1),请使用 KAFKA_VERSION 变量

make cov SCALA_VERSION=2.11 KAFKA_VERSION=0.10.2.1

测试运行快捷方式

  • make test FLAGS="-l -x --ff" - 运行直到 1 个失败,首先重新运行失败的测试。非常适合清理大量错误,例如在大型重构之后。

  • make test FLAGS="-k consumer" - 仅运行消费者测试。

  • make test FLAGS="-m 'not ssl'" - 运行排除 ssl 的测试。

  • make test FLAGS="--no-pull" - 在测试运行之前不尝试拉取新的 Docker 镜像。

项目详情


下载文件

下载适合您平台版本的文件。如果您不确定选择哪个,请了解有关 安装软件包 的更多信息。

源分布

aiokafka-0.11.0.tar.gz (564.7 kB 查看哈希值)

上传时间

构建分布

aiokafka-0.11.0-cp312-cp312-win_amd64.whl (365.9 kB 查看哈希值)

上传时间 CPython 3.12 Windows x86-64

aiokafka-0.11.0-cp312-cp312-win32.whl (347.7 kB 查看哈希值)

上传于 CPython 3.12 Windows x86

aiokafka-0.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB 查看哈希值)

上传于 CPython 3.12 manylinux: glibc 2.17+ x86-64

aiokafka-0.11.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.2 MB 查看哈希值)

上传于 CPython 3.12 manylinux: glibc 2.17+ ARM64

aiokafka-0.11.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl (1.2 MB 查看哈希值)

上传于 CPython 3.12 manylinux: glibc 2.17+ i686 manylinux: glibc 2.5+ i686

aiokafka-0.11.0-cp312-cp312-macosx_11_0_arm64.whl (370.5 kB 查看哈希值)

上传于 CPython 3.12 macOS 11.0+ ARM64

aiokafka-0.11.0-cp311-cp311-win_amd64.whl (368.1 kB 查看哈希值)

上传于 CPython 3.11 Windows x86-64

aiokafka-0.11.0-cp311-cp311-win32.whl (348.5 kB 查看哈希值)

上传于 CPython 3.11 Windows x86

aiokafka-0.11.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB 查看哈希值)

上传于 CPython 3.11 manylinux: glibc 2.17+ x86-64

aiokafka-0.11.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.1 MB 查看哈希值)

上传于 CPython 3.11 manylinux: glibc 2.17+ ARM64

aiokafka-0.11.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl (1.1 MB 查看哈希值)

上传于 CPython 3.11 manylinux: glibc 2.17+ i686 manylinux: glibc 2.5+ i686

aiokafka-0.11.0-cp311-cp311-macosx_11_0_arm64.whl (372.2 kB 查看哈希值)

上传于 CPython 3.11 macOS 11.0+ ARM64

aiokafka-0.11.0-cp310-cp310-win_amd64.whl (368.3 kB 查看哈希值)

上传时间 CPython 3.10 Windows x86-64

aiokafka-0.11.0-cp310-cp310-win32.whl (349.5 kB 查看哈希值)

上传时间 CPython 3.10 Windows x86

aiokafka-0.11.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB 查看哈希值)

上传时间 CPython 3.10 manylinux: glibc 2.17+ x86-64

aiokafka-0.11.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.1 MB 查看哈希值)

上传时间 CPython 3.10 manylinux: glibc 2.17+ ARM64

aiokafka-0.11.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl (1.0 MB 查看哈希值)

上传时间 CPython 3.10 manylinux: glibc 2.17+ i686 manylinux: glibc 2.5+ i686

aiokafka-0.11.0-cp310-cp310-macosx_11_0_arm64.whl (372.5 kB 查看哈希值)

上传时间 CPython 3.10 macOS 11.0+ ARM64

aiokafka-0.11.0-cp39-cp39-win_amd64.whl (369.8 kB 查看哈希值)

上传时间 CPython 3.9 Windows x86-64

aiokafka-0.11.0-cp39-cp39-win32.whl (350.6 kB 查看哈希值)

上传时间 CPython 3.9 Windows x86

aiokafka-0.11.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB 查看哈希值)

上传时间 CPython 3.9 manylinux: glibc 2.17+ x86-64

aiokafka-0.11.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.1 MB 查看哈希值)

上传时间 CPython 3.9 manylinux: glibc 2.17+ ARM64

aiokafka-0.11.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl (1.1 MB 查看哈希值)

上传于 CPython 3.9 manylinux: glibc 2.17+ i686 manylinux: glibc 2.5+ i686

aiokafka-0.11.0-cp39-cp39-macosx_11_0_arm64.whl (373.7 kB 查看哈希值)

上传于 CPython 3.9 macOS 11.0+ ARM64

aiokafka-0.11.0-cp38-cp38-win_amd64.whl (370.3 kB 查看哈希值)

上传于 CPython 3.8 Windows x86-64

aiokafka-0.11.0-cp38-cp38-win32.whl (351.0 kB 查看哈希值)

上传于 CPython 3.8 Windows x86

aiokafka-0.11.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB 查看哈希值)

上传于 CPython 3.8 manylinux: glibc 2.17+ x86-64

aiokafka-0.11.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.1 MB 查看哈希值)

上传于 CPython 3.8 manylinux: glibc 2.17+ ARM64

aiokafka-0.11.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl (1.1 MB 查看哈希值)

上传于 CPython 3.8 manylinux: glibc 2.17+ i686 manylinux: glibc 2.5+ i686

aiokafka-0.11.0-cp38-cp38-macosx_11_0_arm64.whl (374.9 kB 查看哈希值)

上传于 CPython 3.8 macOS 11.0+ ARM64

由以下支持