跳转到主要内容

Fabric Message Bus Schema

项目描述

PyPI

消息总线模式和消息定义

用于交互者通信的Fabric消息总线模式和消息的基本框架

概述

控制与测量框架中各种交互者的Fabric通信是通过Apache Kafka实现的。

Apache Kafka是一个为流设计的分布式系统。它被构建为具有容错性、高吞吐量、水平可伸缩性,并允许地理分布式的数据流和流处理应用程序。

Kafka使得各种交互者/服务的基于事件的实现成为可能。事件既是事实也是触发器。每个fabric交互者都将遵循单写原则,作为某个主题的生产者,并订阅来自其他交互者的主题进行通信。消息通过Apache Avro数据序列化系统在Kafka中进行交换。

需求

  • Python 3.7+
  • confluent-kafka
  • confluent-kafka[avro]

安装

$ pip3 install .

用法

此包实现了通过Avro序列化将消息推送到/从Kafka的生成器/消费者API接口。

消息和模式

用户需要继承自 IMessage 类(message.py)来定义自己的成员,并重写 to_dict() 函数。还需要定义与派生类相对应的 AVRO 架构。这个新架构应在生产者和消费者中使用。

基本 IMessage 类的示例架构可在 (schema/message.avsc) 中找到。

生产者

AvroProducerApi 类实现了 Avro Kafka 生产者的基本功能。用户需要继承这个类,并重写 delivery_report 方法来处理异步生产的消息传递。

示例用法可在 producer.py 文件的末尾找到。

消费者

AvroConsumerApi 类实现了 Avro Kafka 消费者的基本功能。用户需要继承这个类,并重写 process_message 方法来处理接收到的消息。

示例用法可在 consumer.py 文件的末尾找到。

管理 API

AdminApi 类提供了执行基本管理功能(如创建/删除主题/分区等)的支持。

如何启动测试 Kafka 集群进行测试

生成凭据

您必须生成 CA 证书(或如果您已有,则使用您的证书)并生成代理和客户端的密钥库和信任库。

cd $(pwd)/secrets
./create-certs.sh
(Type yes for all "Trust this certificate? [no]:" prompts.)
cd -

设置用于密钥目录的环境变量。这在后续命令中使用。确保您在 MessageBus 目录中。

export KAFKA_SSL_SECRETS_DIR=$(pwd)/secrets

启动容器

您可以使用 docker-compose.yaml 文件来启动一个简单的 Kafka 集群,其中包含

  • 代理
  • zookeeper
  • 模式注册表

使用以下命令启动集群

docker-compose up -d

这将启动以下容器

docker ps
CONTAINER ID        IMAGE                                    COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
189ba0e70b97        confluentinc/cp-schema-registry:latest   "/etc/confluent/dock…"   58 seconds ago      Up 58 seconds       0.0.0.0:8081->8081/tcp                                                                       schemaregistry
49616f1c9b0a        confluentinc/cp-kafka:latest             "/etc/confluent/dock…"   59 seconds ago      Up 58 seconds       0.0.0.0:9092->9092/tcp, 0.0.0.0:19092->19092/tcp                                             broker1
c9d19c82558d        confluentinc/cp-zookeeper:latest         "/etc/confluent/dock…"   59 seconds ago      Up 59 seconds       2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp                                                   zookeeper

项目详情


发布历史 发布通知 | RSS 源

下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源分布

fabric-message-bus-1.7.0.tar.gz (45.6 kB 查看哈希值)

上传时间

构建分布

fabric_message_bus-1.7.0-py3-none-any.whl (143.3 kB 查看哈希值)

上传时间 Python 3

由以下组织支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面