使用 Python 和 Kafka 轻松发布和订阅事件。
项目描述
kafkaesk
目录
关于项目
此项目旨在帮助简化使用 Python 和 Kafka 发布和订阅事件的过程。
指导原则
- HTTP
- 语言无关
- 基于 Kafka 的合同
替代方案
- confluent kafka + avro:接近,但最终类似于 grpc。语言的编译。没有 asyncio。
将此 Python 项目视为这些想法的语法糖。
发布
使用 pydantic,但也可以使用纯 JSON 完成。
import kafkaesk
from pydantic import BaseModel
app = kafkaesk.Application()
@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
async def foobar():
# ...
# doing something in an async func
await app.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
在subscriber
依赖实例中提供了一个便捷方法,这允许从消费的消息中传播头信息。
import kafkaesk
from pydantic import BaseModel
app = kafkaesk.Application()
@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
print(f"{data.foo}")
# This will propagate `data` record headers
await subscriber.publish("content.edited.Resource", data=ContentMessage(foo="bar"))
订阅
import kafkaesk
from pydantic import BaseModel
app = kafkaesk.Application()
@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
print(f"{data.foo}")
避免全局对象
如果您不希望有全局应用程序配置,可以懒加载地配置应用程序,并分别注册模式/订阅者。
import kafkaesk
from pydantic import BaseModel
router = kafkaesk.Router()
@router.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
foo: str
@router.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
print(f"{data.foo}")
if __name__ == "__main__":
app = kafkaesk.Application()
app.mount(router)
kafkaesk.run(app)
可选消费者注入参数
- schema: str
- record: aiokafka.structs.ConsumerRecord
- app: kafkaesk.app.Application
- subscriber: kafkaesk.app.BatchConsumer
根据第一个参数的类型注解,您将获得不同的注入数据
async def get_messages(data: ContentMessage)
:解析pydantic模式async def get_messages(data: bytes)
:提供原始字节数据async def get_messages(record: aiokafka.structs.ConsumerRecord)
:提供Kafka记录对象async def get_messages(data)
:消息中的原始JSON数据
手动提交
自行完成手动提交策略
app = kafkaesk.Application(auto_commit=False)
@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
print(f"{data.foo}")
await subscriber.consumer.commit()
SSL
将这些值添加到您的kafka_settings
中
ssl_context
- 这应该是一个占位符,因为SSL上下文通常在应用程序中创建security_protocol
- SSL或PLAINTEXT之一sasl_mechanism
- PLAIN、GSSAPI、SCRAM-SHA-256、SCRAM-SHA-512或OAUTHBEARER之一sasl_plain_username
.sasl_plain_password
.
kafkaesk 合同
这是一个围绕使用kafka的库。Kafka本身不强制执行这些概念。
- 每条消息都必须提供一个json模式
- 生产的消息将与json模式进行验证
- 每个主题将只有一个模式
- 单个模式可以用于多个主题
- 消费消息的模式验证由消费者决定
- 消息将被至少消费一次。考虑到这一点,您的处理应该是幂等的
消息格式
{
"schema": "schema_name:1",
"data": { ... }
}
工作者
kafkaesk mymodule:app --kafka-servers=localhost:9092
选项
- --kafka-servers:逗号分隔的kafka服务器列表
- --kafka-settings:要传递给https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class的JSON编码选项
- --topic-prefix:用于主题的前缀
- --replication-factor:创建主题时应使用的复制因子。默认为服务器数量的最小值,3。
Application.publish
- stream_id: str:要发送数据到的流名称
- data:从pydantic.BaseModel继承的类
- key:Optional[bytes]:如果需要,消息的关键字
Application.subscribe
- stream_id: str:要订阅的流的fnmatch模式
- group:Optional[str]:要使用的消费者组id。如果没有提供,将使用函数的名称
Application.schema
- id: str:要存储的模式id
- version:Optional[int]:要存储的模式版本
- streams:Optional[List[str]]:如果提前知道流,可以在推送数据之前预先创建它们
- retention:Optional[int]:保留策略(以秒为单位)
Application.configure
- kafka_servers:Optional[List[str]]:要连接的kafka服务器
- topic_prefix:Optional[str]:要订阅的主题名称前缀
- kafka_settings:Optional[Dict[str, Any]]:要传递的额外aiokafka设置
- replication_factor:Optional[int]:创建主题时应使用的复制因子。默认为服务器数量的最小值,3。
- kafka_api_version:str:默认
auto
- auto_commit:bool:默认
True
- auto_commit_interval_ms:int:默认
5000
开发
需求
poetry install
运行测试
docker-compose up
KAFKA=localhost:9092 poetry run pytest tests
扩展
日志记录
此扩展包括类,用于扩展Python的日志记录框架,以便将结构化日志消息发布到Kafka主题。此扩展由三个主要组件组成:一个扩展的logging.LogRecord
和一些自定义的logging.Handler
。
请参阅示例目录中的logger.py
。
日志记录
kafkaesk.ext.logging.record.factory
是一个函数,它将返回kafkaesk.ext.logging.record.PydanticLogRecord
对象。该factory()
函数扫描传递给记录器的任何args
,并检查每个项目以确定它是否是pydantic.BaseModel
的子类。
如果是基础模型实例,并且model._is_log_model
评估为True
,则该模型将从args
中移除并添加到record._pydantic_data
中。之后,factory()
将使用日志的现有逻辑来完成日志记录的创建。
处理器
此扩展程序附带两个处理器,可以处理kafkaesk.ext.logging.handler.PydanticLogModel
类:kafakesk.ext.logging.handler.PydanticStreamHandler
和kafkaesk.ext.logging.handler.PydanticKafkaeskHandler
。
流处理器是围绕logging.StreamHandler
的一个非常小的包装器,签名相同,唯一的区别是处理器将尝试将接收到的任何Pydantic模型转换为可读的日志消息。
kafkaesk处理器在后台还有一些其他功能。
处理器需要两个输入,一个kafkaesk.app.Application
实例和一个流名称。
一旦初始化,处理器发出的任何日志将被保存到一个内部队列中。有一个工作任务负责从队列中提取日志并将这些日志写入指定的主题。
命名
这有点困难,而且“kafka”这个名字已经很有趣了。希望这个库对你来说并不是字面上的“kafkaesque”。
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。