Fabric Message Bus Schema
项目描述
消息总线模式和消息定义
用于交互者通信的Fabric消息总线模式和消息的基本框架
概述
控制与测量框架中各种交互者的Fabric通信是通过Apache Kafka实现的。
Apache Kafka是一个为流设计的分布式系统。它被构建为具有容错性、高吞吐量、水平可伸缩性,并允许地理分布式的数据流和流处理应用程序。
Kafka使得各种交互者/服务的基于事件的实现成为可能。事件既是事实也是触发器。每个fabric交互者都将遵循单写原则,作为某个主题的生产者,并订阅来自其他交互者的主题进行通信。消息通过Apache Avro数据序列化系统在Kafka中进行交换。
需求
- Python 3.7+
- confluent-kafka
- confluent-kafka[avro]
安装
$ pip3 install .
用法
此包实现了通过Avro序列化将消息推送到/从Kafka的生成器/消费者API接口。
消息和模式
用户需要继承自 IMessage 类(message.py)来定义自己的成员,并重写 to_dict() 函数。还需要定义与派生类相对应的 AVRO 架构。这个新架构应在生产者和消费者中使用。
基本 IMessage 类的示例架构可在 (schema/message.avsc) 中找到。
生产者
AvroProducerApi 类实现了 Avro Kafka 生产者的基本功能。用户需要继承这个类,并重写 delivery_report 方法来处理异步生产的消息传递。
示例用法可在 producer.py 文件的末尾找到。
消费者
AvroConsumerApi 类实现了 Avro Kafka 消费者的基本功能。用户需要继承这个类,并重写 process_message 方法来处理接收到的消息。
示例用法可在 consumer.py 文件的末尾找到。
管理 API
AdminApi 类提供了执行基本管理功能(如创建/删除主题/分区等)的支持。
如何启动测试 Kafka 集群进行测试
生成凭据
您必须生成 CA 证书(或如果您已有,则使用您的证书)并生成代理和客户端的密钥库和信任库。
cd $(pwd)/secrets
./create-certs.sh
(Type yes for all "Trust this certificate? [no]:" prompts.)
cd -
设置用于密钥目录的环境变量。这在后续命令中使用。确保您在 MessageBus 目录中。
export KAFKA_SSL_SECRETS_DIR=$(pwd)/secrets
启动容器
您可以使用 docker-compose.yaml 文件来启动一个简单的 Kafka 集群,其中包含
- 代理
- zookeeper
- 模式注册表
使用以下命令启动集群
docker-compose up -d
这将启动以下容器
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
189ba0e70b97 confluentinc/cp-schema-registry:latest "/etc/confluent/dock…" 58 seconds ago Up 58 seconds 0.0.0.0:8081->8081/tcp schemaregistry
49616f1c9b0a confluentinc/cp-kafka:latest "/etc/confluent/dock…" 59 seconds ago Up 58 seconds 0.0.0.0:9092->9092/tcp, 0.0.0.0:19092->19092/tcp broker1
c9d19c82558d confluentinc/cp-zookeeper:latest "/etc/confluent/dock…" 59 seconds ago Up 59 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分布
构建分布
fabric-message-bus-1.7.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 0ca34f22176d2deac7ef67682236b8b1d70ca7fd6da8a781967b24a10bd1362e |
|
MD5 | 4dc7c6e59d3707f0302edceecbc3251f |
|
BLAKE2b-256 | 626377a9512f92dbf7bafa5285e0105503111588aae4f100fb45093d06d8b5c3 |
fabric_message_bus-1.7.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | d234336a36b896dd7c9f4ff028c59830806f3217843028d873c02f7c9ef4aafa |
|
MD5 | 964240b9e9bcc4d99aa4f35767b3eb3d |
|
BLAKE2b-256 | c8aa6535f820279e431faf7b22bd9d7707e45a49f7790658ddfb448c6593dbc2 |