构建简单的Kafka流应用程序
项目描述
Kstreams
kstreams
是一个库/微框架,用于与 kafka
一起使用。它具有简单的 Kafka 流实现,提供以下保证。
文档: https://kpn.github.io/kstreams/
安装
pip install kstreams
您需要一个工作器,我们推荐 aiorun
pip install aiorun
用法
import aiorun
from kstreams import create_engine, ConsumerRecord
stream_engine = create_engine(title="my-stream-engine")
@stream_engine.stream("local--kstream")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")
async def produce():
payload = b'{"message": "Hello world!"}'
for i in range(5):
metadata = await stream_engine.send("local--kstreams", value=payload)
print(f"Message sent: {metadata}")
async def start():
await stream_engine.start()
await produce()
async def shutdown(loop):
await stream_engine.stop()
if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
功能
- 生产事件
- 使用
Streams
消费事件 - 通过
pattern
订阅主题 -
Prometheus
指标和自定义监控 - TestClient
- 自定义序列化和反序列化
- 易于与任何
async
框架集成。不依赖于任何库!! - 流中的产量事件
- Opentelemetry 仪器化
- 中间件
- 钩子(on_startup, on_stop, after_startup, after_stop)
- 存储(kafka streams 模式)
- 流连接
- 窗口化
开发
此仓库需要使用 poetry 而不是 pip。 注意:如果您想将 virtualenv
放在与项目相同的路径下,首先应运行 poetry config --local virtualenvs.in-project true
要安装依赖项,只需执行
poetry install
然后您可以使用
poetry shell
运行测试
./scripts/test
使用 ruff 运行代码格式化
./scripts/format
提交信息
我们使用 conventional commits 进行提交信息。
推荐使用 commitizen。Commitizen 是开发依赖项的一部分。
cz commit
项目详情
下载文件
下载您平台的文件。如果您不确定要选择哪个,请了解更多关于 安装包 的信息。
源分布
kstreams-0.24.0.tar.gz (28.6 kB 查看散列)
构建分布
kstreams-0.24.0-py3-none-any.whl (35.2 kB 查看散列)