跳转到主要内容

Python事件源gRPC包

项目描述

事件源gRPC

这是Python事件源库的扩展。

此包提供了一个运行在gRPC服务器中的事件源应用程序的gRPC服务器类。可以使用gRPC客户端类与应用程序服务器进行交互。可以通过客户端提供的应用程序代理对象远程调用应用程序命令和查询方法,并选择通知。

此包还提供了一个系统运行器类,其接口与核心事件源库中的运行器类相同。它可以作为一个gRPC服务器集运行事件源应用程序系统。领导者和追随者连接(用于提示和拉取事件通知)由gRPC客户端实现。

此包还提供了可用于从命令行界面启动服务器和运行系统的控制台命令。

此包还提供了一个在Docker容器中运行gRPC服务器的事件源应用程序的示例。

此包还提供了一个在Docker容器中使用docker-compose运行gRPC服务器中的应用程序系统的示例。

此包还提供了一个在Docker容器中使用Kubernetes运行gRPC服务器中的应用程序系统的示例,客户端和服务器使用SSL/TLS PKI证书进行认证。

安装

使用pip从Python包索引安装稳定版发行版

$ pip install eventsourcing_grpc

建议将Python包安装到Python虚拟环境中。

gRPC客户端和服务器

可以使用GrpcApplicationServer类将事件源应用程序作为gRPC服务器运行,可以使用GrpcApplicationClient类连接到服务器。

定义一个事件源应用程序。

from eventsourcing_grpc.example import Orders

使用环境变量进行配置。

import os

os.environ["ORDERS_GRPC_SERVER_ADDRESS"] = "localhost:50051"

应用程序持久化可以按照常规方式进行配置(参见核心库文档)。

使用start_server()函数创建并启动一个gRPC应用程序服务器。

from eventsourcing_grpc.application_server import start_server

server = start_server(Orders)

使用connect()函数创建一个gRPC客户端并连接到服务器。

from eventsourcing_grpc.application_client import connect

client = connect(Orders, max_attempts=10)

使用应用程序代理调用应用程序服务器上的命令和查询方法。

# Create order.
order_id = client.app.create_new_order()

# Get order.
order = client.app.get_order(order_id)
assert order["id"] == order_id
assert order["is_reserved"] == False
assert order["is_paid"] == False

使用应用程序代理选择事件通知。

notifications = client.app.notification_log.select(start=1, limit=10)
assert len(notifications) == 1
assert notifications[0].id == 1
assert notifications[0].originator_id == order_id
assert notifications[0].originator_version == 1
assert notifications[0].topic == "eventsourcing_grpc.example:Order.Created"

关闭客户端并停止服务器。

client.close()
server.stop(wait=True)

gRPC运行器

可以使用GrpcRunner类将事件源应用程序系统作为相互连接的gRPC服务器集运行。

from eventsourcing_grpc.runner import GrpcRunner

定义一个事件源应用程序系统。

from eventsourcing_grpc.example import system

使用环境变量进行配置。

import os

os.environ["ORDERS_GRPC_SERVER_ADDRESS"] = "localhost:50051"
os.environ["RESERVATIONS_GRPC_SERVER_ADDRESS"] = "localhost:50052"
os.environ["PAYMENTS_GRPC_SERVER_ADDRESS"] = "localhost:50053"

创建并启动一个gRPC系统运行器(启动服务器并连接客户端)。

runner = GrpcRunner(system)
runner.start(with_subprocesses=True)

从运行器获取应用程序代理。

app = runner.get(Orders)

在应用程序代理上调用命令和查询方法。

order1_id = app.create_new_order()

# Wait for the processing to happen.
from time import sleep

for _ in range(100):
    sleep(0.1)
    if app.is_order_paid(order1_id):
        break
    elif runner.has_errored.is_set():
        raise AssertionError("Runner error")
else:
    raise AssertionError("Timeout waiting for order to be paid")

获取应用程序的事件通知。

notifications = app.notification_log.select(start=1, limit=10)
assert len(notifications) == 3
assert notifications[0].id == 1
assert notifications[0].originator_id == order1_id
assert notifications[0].originator_version == 1
assert notifications[0].topic == "eventsourcing_grpc.example:Order.Created"
assert notifications[1].id == 2
assert notifications[1].originator_id == order1_id
assert notifications[1].originator_version == 2
assert notifications[1].topic == "eventsourcing_grpc.example:Order.Reserved"
assert notifications[2].id == 3
assert notifications[2].originator_id == order1_id
assert notifications[2].originator_version == 3
assert notifications[2].topic == "eventsourcing_grpc.example:Order.Paid"

停止运行器(关闭客户端和停止服务器)。

runner.stop()

从命令行启动服务器

可以使用控制台命令eventsourcing_grpc_server启动一个gRPC应用程序服务器。应用程序主题可以作为命令行参数提供。

$ export ORDERS_GRPC_SERVER_ADDRESS=localhost:50051
$ eventsourcing_grpc_server eventsourcing_grpc.example:Orders

通过在环境中设置SYSTEM_TOPIC,可以以这种方式启动相互连接的应用程序服务器系统。

$ export SYSTEM_TOPIC=eventsourcing_grpc.example:system
$ export ORDERS_GRPC_SERVER_ADDRESS=localhost:50051
$ export RESERVATIONS_GRPC_SERVER_ADDRESS=localhost:50052
$ export PAYMENTS_GRPC_SERVER_ADDRESS=localhost:50053

$ eventsourcing_grpc_server eventsourcing_grpc.example:Orders &
$ eventsourcing_grpc_server eventsourcing_grpc.example:Reservations &
$ eventsourcing_grpc_server eventsourcing_grpc.example:Payments &

在上面的示例中,服务器作为后台进程运行。

从命令行启动运行器

可以使用控制台命令eventsourcing_grpc_runner运行一个事件源应用程序系统。只需配置系统主题。

可以将系统主题作为命令行参数提供。

$ eventsourcing_grpc_runner eventsourcing_grpc.example:system

或者,可以使用SYSTEM_TOPIC环境变量设置系统主题。

$ export SYSTEM_TOPIC=eventsourcing_grpc.example:system
$ eventsourcing_grpc_runner

除非将它们设置为环境变量,否则将自动配置服务器地址。

Docker容器

可以定义一个Docker文件,在安装事件源系统和eventsourcing_grpc包后运行上述eventsourcing_grpc_server命令。

FROM python:3.10

WORKDIR /app

# Install package(s).
RUN pip install eventsourcing_grpc

# Run application server.
ENV PYTHONUNBUFFERED = 1
CMD ["eventsourcing_grpc_server"]

然后可以使用Docker构建容器镜像。

$ docker build -t eventsourcing-grpc -f Dockerfile .

Docker Compose

可以使用Docker Compose运行gRPC应用程序服务器系统。以下Docker Compose文件假设已在一个本地的./ssl文件夹中生成SSL/TLS PKI证书。

version: '2'

services:
  orders:
    image: "eventsourcing-grpc:v1"
    environment:
      APPLICATION_TOPIC: "eventsourcing_grpc.example:Orders"
      SYSTEM_TOPIC: "eventsourcing_grpc.example:system"
      ORDERS_GRPC_SERVER_ADDRESS: "orders:50051"
      RESERVATIONS_GRPC_SERVER_ADDRESS: "reservations:50052"
      PAYMENTS_GRPC_SERVER_ADDRESS: "payments:50053"
      MAX_PULL_INTERVAL: "10"
      GRPC_SSL_PRIVATE_KEY_PATH: /app/ssl/orders.key
      GRPC_SSL_CERTIFICATE_PATH: /app/ssl/orders.crt
      GRPC_SSL_ROOT_CERTIFICATE_PATH: /app/ssl/root.crt
    volumes:
      - ./ssl/orders:/app/ssl
    ports:
      - "50051:50051"

  reservations:
    image: "eventsourcing-grpc:v1"
    environment:
      APPLICATION_TOPIC: "eventsourcing_grpc.example:Reservations"
      SYSTEM_TOPIC: "eventsourcing_grpc.example:system"
      ORDERS_GRPC_SERVER_ADDRESS: "orders:50051"
      RESERVATIONS_GRPC_SERVER_ADDRESS: "reservations:50052"
      PAYMENTS_GRPC_SERVER_ADDRESS: "payments:50053"
      MAX_PULL_INTERVAL: "10"
      GRPC_SSL_PRIVATE_KEY_PATH: /app/ssl/reservations.key
      GRPC_SSL_CERTIFICATE_PATH: /app/ssl/reservations.crt
      GRPC_SSL_ROOT_CERTIFICATE_PATH: /app/ssl/root.crt
    volumes:
      - ./ssl/reservations:/app/ssl
    ports:
      - "50052:50052"

  payments:
    image: "eventsourcing-grpc:v1"
    environment:
      APPLICATION_TOPIC: "eventsourcing_grpc.example:Payments"
      SYSTEM_TOPIC: "eventsourcing_grpc.example:system"
      ORDERS_GRPC_SERVER_ADDRESS: "orders:50051"
      RESERVATIONS_GRPC_SERVER_ADDRESS: "reservations:50052"
      PAYMENTS_GRPC_SERVER_ADDRESS: "payments:50053"
      MAX_PULL_INTERVAL: "10"
      GRPC_SSL_PRIVATE_KEY_PATH: /app/ssl/payments.key
      GRPC_SSL_CERTIFICATE_PATH: /app/ssl/payments.crt
      GRPC_SSL_ROOT_CERTIFICATE_PATH: /app/ssl/root.crt
    volumes:
      - ./ssl/payments:/app/ssl
    ports:
      - "50053:50053"

然后可以启动容器。

$ docker-compose up -d

Kubernetes

可以使用Kubernetes运行gRPC应用程序服务器系统。

从SSL/TLS PKI证书创建Kubernetes Secrets。

$ kubectl create secret generic root-ssl-secret \
--from-file=root.crt=ssl/root/root.crt

$ kubectl create secret tls orders-ssl-secret \
--key ./ssl/orders/orders.key \
--cert ./ssl/orders/orders.crt

$ kubectl create secret tls reservations-ssl-secret \
--key ./ssl/reservations/reservations.key \
--cert ./ssl/reservations/reservations.crt

$ kubectl create secret tls payments-ssl-secret \
--key ./ssl/payments/payments.key \
--cert ./ssl/payments/payments.crt

为每个应用程序创建Kubernetes Deployment。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: orders
  labels:
    system: orders
spec:
  selector:
    matchLabels:
      app: orders
  replicas: 1
  template:
    metadata:
      labels:
        app: orders
    spec:
      volumes:
      - name: ssl-secret-volume
        secret:
          secretName: orders-ssl-secret
      - name: root-ssl-secret-volume
        secret:
          secretName: root-ssl-secret
      containers:
      - name: orders
        image: "eventsourcing-grpc:latest"
        imagePullPolicy: Never
        ports:
        - containerPort: 50051
        volumeMounts:
          - mountPath: /app/ssl
            name: ssl-secret-volume
          - mountPath: /app/root-ssl
            name: root-ssl-secret-volume
        env:
          - name: APPLICATION_TOPIC
            value: "eventsourcing_grpc.example:Orders"
          - name: SYSTEM_TOPIC
            value: "eventsourcing_grpc.example:system"
          - name: ORDERS_GRPC_SERVER_ADDRESS
            value: "0.0.0.0:50051"
          - name: RESERVATIONS_GRPC_SERVER_ADDRESS
            value: "reservations:50052"
          - name: PAYMENTS_GRPC_SERVER_ADDRESS
            value: "payments:50053"
          - name: GRPC_SSL_PRIVATE_KEY_PATH
            value: /app/ssl/tls.key
          - name: GRPC_SSL_CERTIFICATE_PATH
            value: /app/ssl/tls.crt
          - name: GRPC_SSL_ROOT_CERTIFICATE_PATH
            value: /app/root-ssl/root.crt

为每个Deployment创建Kubernetes Service。

apiVersion: v1
kind: Service
metadata:
  name: orders
  labels:
    system: orders
spec:
  selector:
    app: orders
  type: LoadBalancer
  ports:
  - port: 50051
    targetPort: 50051
    protocol: TCP

上述配置将每个应用程序运行在单独的Pod中。或者,您可以通过定义多容器Deployment,在同一Pod中运行所有应用程序。

请参阅项目仓库(以及其GitHub操作工作流程和Makefile),以查看运行应用程序系统(使用minikube)的完整示例。

开发者

在克隆eventsourcing-grpc仓库后,在根目录中运行以下命令设置虚拟环境并安装依赖项。

$ make install

make install命令使用构建工具Poetry为此项目创建一个专用的Python虚拟环境,并安装流行的开发依赖项,如Black、isort和pytest。

./tests中添加测试。在./eventsourcing_grpc中添加代码。

为测试生成SSL/TLS证书和私钥。

$ make ssl

运行测试。

$ make test

检查代码格式。

$ make lint

重新格式化代码。

$ make fmt

pyproject.toml 中添加依赖项,然后更新已安装的包。

$ make update-packages

如果修改了 .proto 文件,请重新生成项目的 protos 包。

$ make grpc-stubs

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。

源分布

eventsourcing-grpc-0.1.2.tar.gz (24.8 kB 查看哈希值)

上传时间

构建分布

eventsourcing_grpc-0.1.2-py3-none-any.whl (23.5 kB 查看哈希值)

上传时间 Python 3

由以下支持