跳转到主要内容

用于事件源与Axon Server的Python包

项目描述

使用Axon Server进行事件源

此包支持使用Python eventsourcing库和Axon Server

安装

使用pip从Python包索引安装稳定版本。

$ pip install eventsourcing-axonserver

请注意,建议将Python包安装到Python虚拟环境中。

入门

按常规方式定义聚合和应用。请注意,Axon Server中的聚合序列应从位置0开始,而库的Aggregate类的默认值是从1开始。因此,我们需要将聚合类的INITIAL_VERSION属性设置为0

from typing import Any, Dict
from uuid import UUID

from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event


class TrainingSchool(Application):
    def register(self, name: str) -> UUID:
        dog = Dog(name)
        self.save(dog)
        return dog.id

    def add_trick(self, dog_id: UUID, trick: str) -> None:
        dog = self.repository.get(dog_id)
        dog.add_trick(trick)
        self.save(dog)

    def get_dog(self, dog_id) -> Dict[str, Any]:
        dog = self.repository.get(dog_id)
        return {'name': dog.name, 'tricks': tuple(dog.tricks)}


class Dog(Aggregate):
    INITIAL_VERSION = 0

    @event('Registered')
    def __init__(self, name: str) -> None:
        self.name = name
        self.tricks = []

    @event('TrickAdded')
    def add_trick(self, trick: str) -> None:
        self.tricks.append(trick)

配置应用程序以使用Axon Server。将环境变量PERSISTENCE_MODULE设置为'eventsourcing_axonserver',并将AXONSERVER_URI设置为您的Axon Server的主机和端口。

school = TrainingSchool(env={
    "PERSISTENCE_MODULE": "eventsourcing_axonserver",
    "AXONSERVER_URI": "localhost:8124",
})

然后可以从测试和用户界面调用应用程序的方法。

# Register dog.
dog_id = school.register('Fido')

# Add tricks.
school.add_trick(dog_id, 'roll over')
school.add_trick(dog_id, 'play dead')

# Get details.
dog = school.get_dog(dog_id)
assert dog["name"] == 'Fido'
assert dog["tricks"] == ('roll over', 'play dead')

有关更多信息,请参阅Python eventsourcing库和Axon Server项目。

开发者

克隆项目仓库,设置虚拟环境,并安装依赖项。

使用您的集成开发环境(例如PyCharm)打开项目仓库。创建一个Poetry虚拟环境,然后更新包。

$ make update-packages

或者,使用make install命令为此项目创建一个专门的Python虚拟环境。

$ make install

启动Axon服务器。

$ make start-axon-server

运行测试。

$ make test

./tests中添加测试。在./eventsourcing_axonserver中添加代码。

检查代码格式。

$ make lint

重新格式化代码。

$ make fmt

pyproject.toml中添加依赖项,然后更新已安装的包。

$ make update-packages

停止Axon服务器。

$ make stop-axon-server

项目详情


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分布

eventsourcing-axonserver-0.1.5.tar.gz (6.6 kB 查看哈希值)

上传时间

构建分布

eventsourcing_axonserver-0.1.5-py3-none-any.whl (6.5 kB 查看哈希值)

上传时间 Python 3

支持者