跳转到主要内容

使用 Python 和 Kafka 轻松发布和订阅事件。

项目描述


Onna Logo

kafkaesk

目录

关于项目

此项目旨在帮助简化使用 Python 和 Kafka 发布和订阅事件的过程。

指导原则

  • HTTP
  • 语言无关
  • 基于 Kafka 的合同

替代方案

  • confluent kafka + avro:接近,但最终类似于 grpc。语言的编译。没有 asyncio。

将此 Python 项目视为这些想法的语法糖。

发布

使用 pydantic,但也可以使用纯 JSON 完成。

import kafkaesk
from pydantic import BaseModel

app = kafkaesk.Application()

@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


async def foobar():
    # ...
    # doing something in an async func
    await app.publish("content.edited.Resource", data=ContentMessage(foo="bar"))

subscriber依赖实例中提供了一个便捷方法,这允许从消费的消息中传播头信息。

import kafkaesk
from pydantic import BaseModel

app = kafkaesk.Application()

@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
    print(f"{data.foo}")
    # This will propagate `data` record headers
    await subscriber.publish("content.edited.Resource", data=ContentMessage(foo="bar"))

订阅

import kafkaesk
from pydantic import BaseModel

app = kafkaesk.Application()

@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
    print(f"{data.foo}")

避免全局对象

如果您不希望有全局应用程序配置,可以懒加载地配置应用程序,并分别注册模式/订阅者。

import kafkaesk
from pydantic import BaseModel

router = kafkaesk.Router()

@router.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


@router.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
    print(f"{data.foo}")


if __name__ == "__main__":
    app = kafkaesk.Application()
    app.mount(router)
    kafkaesk.run(app)

可选消费者注入参数

  • schema: str
  • record: aiokafka.structs.ConsumerRecord
  • app: kafkaesk.app.Application
  • subscriber: kafkaesk.app.BatchConsumer

根据第一个参数的类型注解,您将获得不同的注入数据

  • async def get_messages(data: ContentMessage):解析pydantic模式
  • async def get_messages(data: bytes):提供原始字节数据
  • async def get_messages(record: aiokafka.structs.ConsumerRecord):提供Kafka记录对象
  • async def get_messages(data):消息中的原始JSON数据

手动提交

自行完成手动提交策略

app = kafkaesk.Application(auto_commit=False)

@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
    print(f"{data.foo}")
    await subscriber.consumer.commit()

SSL

将这些值添加到您的kafka_settings

  • ssl_context - 这应该是一个占位符,因为SSL上下文通常在应用程序中创建
  • security_protocol - SSL或PLAINTEXT之一
  • sasl_mechanism - PLAIN、GSSAPI、SCRAM-SHA-256、SCRAM-SHA-512或OAUTHBEARER之一
  • sasl_plain_username .
  • sasl_plain_password .

kafkaesk 合同

这是一个围绕使用kafka的库。Kafka本身不强制执行这些概念。

  • 每条消息都必须提供一个json模式
  • 生产的消息将与json模式进行验证
  • 每个主题将只有一个模式
  • 单个模式可以用于多个主题
  • 消费消息的模式验证由消费者决定
  • 消息将被至少消费一次。考虑到这一点,您的处理应该是幂等的

消息格式

{
    "schema": "schema_name:1",
    "data": { ... }
}

工作者

kafkaesk mymodule:app --kafka-servers=localhost:9092

选项

Application.publish

  • stream_id: str:要发送数据到的流名称
  • data:从pydantic.BaseModel继承的类
  • key:Optional[bytes]:如果需要,消息的关键字

Application.subscribe

  • stream_id: str:要订阅的流的fnmatch模式
  • group:Optional[str]:要使用的消费者组id。如果没有提供,将使用函数的名称

Application.schema

  • id: str:要存储的模式id
  • version:Optional[int]:要存储的模式版本
  • streams:Optional[List[str]]:如果提前知道流,可以在推送数据之前预先创建它们
  • retention:Optional[int]:保留策略(以秒为单位)

Application.configure

  • kafka_servers:Optional[List[str]]:要连接的kafka服务器
  • topic_prefix:Optional[str]:要订阅的主题名称前缀
  • kafka_settings:Optional[Dict[str, Any]]:要传递的额外aiokafka设置
  • replication_factor:Optional[int]:创建主题时应使用的复制因子。默认为服务器数量的最小值,3。
  • kafka_api_version:str:默认 auto
  • auto_commit:bool:默认 True
  • auto_commit_interval_ms:int:默认 5000

开发

需求

poetry install

运行测试

docker-compose up
KAFKA=localhost:9092 poetry run pytest tests

扩展

日志记录

此扩展包括类,用于扩展Python的日志记录框架,以便将结构化日志消息发布到Kafka主题。此扩展由三个主要组件组成:一个扩展的logging.LogRecord和一些自定义的logging.Handler

请参阅示例目录中的logger.py

日志记录

kafkaesk.ext.logging.record.factory是一个函数,它将返回kafkaesk.ext.logging.record.PydanticLogRecord对象。该factory()函数扫描传递给记录器的任何args,并检查每个项目以确定它是否是pydantic.BaseModel的子类。

如果是基础模型实例,并且model._is_log_model评估为True,则该模型将从args中移除并添加到record._pydantic_data中。之后,factory()将使用日志的现有逻辑来完成日志记录的创建。

处理器

此扩展程序附带两个处理器,可以处理kafkaesk.ext.logging.handler.PydanticLogModel类:kafakesk.ext.logging.handler.PydanticStreamHandlerkafkaesk.ext.logging.handler.PydanticKafkaeskHandler

流处理器是围绕logging.StreamHandler的一个非常小的包装器,签名相同,唯一的区别是处理器将尝试将接收到的任何Pydantic模型转换为可读的日志消息。

kafkaesk处理器在后台还有一些其他功能。

处理器需要两个输入,一个kafkaesk.app.Application实例和一个流名称。

一旦初始化,处理器发出的任何日志将被保存到一个内部队列中。有一个工作任务负责从队列中提取日志并将这些日志写入指定的主题。

命名

这有点困难,而且“kafka”这个名字已经很有趣了。希望这个库对你来说并不是字面上的“kafkaesque”。

项目详情


发行历史 发行通知 | RSS 源

下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

kafkaesk-0.8.5.tar.gz (22.6 kB 查看哈希值)

上传时间

构建分发

kafkaesk-0.8.5-py3-none-any.whl (23.6 kB 查看哈希值)

上传于 Python 3

支持者

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面