跳转到主要内容

构建简单的Kafka流应用程序

项目描述

Kstreams

kstreams 是一个库/微框架,用于与 kafka 一起使用。它具有简单的 Kafka 流实现,提供以下保证。

Build status codecov python version


文档: https://kpn.github.io/kstreams/


安装

pip install kstreams

您需要一个工作器,我们推荐 aiorun

pip install aiorun

用法

import aiorun
from kstreams import create_engine, ConsumerRecord


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
    print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await stream_engine.send("local--kstreams", value=payload)
        print(f"Message sent: {metadata}")


async def start():
    await stream_engine.start()
    await produce()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)

功能

  • 生产事件
  • 使用 Streams 消费事件
  • 通过 pattern 订阅主题
  • Prometheus 指标和自定义监控
  • TestClient
  • 自定义序列化和反序列化
  • 易于与任何 async 框架集成。不依赖于任何库!!
  • 流中的产量事件
  • Opentelemetry 仪器化
  • 中间件
  • 钩子(on_startup, on_stop, after_startup, after_stop)
  • 存储(kafka streams 模式)
  • 流连接
  • 窗口化

开发

此仓库需要使用 poetry 而不是 pip。 注意:如果您想将 virtualenv 放在与项目相同的路径下,首先应运行 poetry config --local virtualenvs.in-project true

要安装依赖项,只需执行

poetry install

然后您可以使用

poetry shell

运行测试

./scripts/test

使用 ruff 运行代码格式化

./scripts/format

提交信息

我们使用 conventional commits 进行提交信息。

推荐使用 commitizen。Commitizen 是开发依赖项的一部分。

cz commit

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载您平台的文件。如果您不确定要选择哪个,请了解更多关于 安装包 的信息。

源分布

kstreams-0.24.0.tar.gz (28.6 kB 查看散列)

上传时间

构建分布

kstreams-0.24.0-py3-none-any.whl (35.2 kB 查看散列)

上传时间 Python 3

由以下提供支持