用于事件源与Axon Server的Python包
项目描述
使用Axon Server进行事件源
此包支持使用Python eventsourcing库和Axon Server。
安装
使用pip从Python包索引安装稳定版本。
$ pip install eventsourcing-axonserver
请注意,建议将Python包安装到Python虚拟环境中。
入门
按常规方式定义聚合和应用。请注意,Axon Server中的聚合序列应从位置0
开始,而库的Aggregate
类的默认值是从1
开始。因此,我们需要将聚合类的INITIAL_VERSION
属性设置为0
。
from typing import Any, Dict
from uuid import UUID
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
class TrainingSchool(Application):
def register(self, name: str) -> UUID:
dog = Dog(name)
self.save(dog)
return dog.id
def add_trick(self, dog_id: UUID, trick: str) -> None:
dog = self.repository.get(dog_id)
dog.add_trick(trick)
self.save(dog)
def get_dog(self, dog_id) -> Dict[str, Any]:
dog = self.repository.get(dog_id)
return {'name': dog.name, 'tricks': tuple(dog.tricks)}
class Dog(Aggregate):
INITIAL_VERSION = 0
@event('Registered')
def __init__(self, name: str) -> None:
self.name = name
self.tricks = []
@event('TrickAdded')
def add_trick(self, trick: str) -> None:
self.tricks.append(trick)
配置应用程序以使用Axon Server。将环境变量PERSISTENCE_MODULE
设置为'eventsourcing_axonserver'
,并将AXONSERVER_URI
设置为您的Axon Server的主机和端口。
school = TrainingSchool(env={
"PERSISTENCE_MODULE": "eventsourcing_axonserver",
"AXONSERVER_URI": "localhost:8124",
})
然后可以从测试和用户界面调用应用程序的方法。
# Register dog.
dog_id = school.register('Fido')
# Add tricks.
school.add_trick(dog_id, 'roll over')
school.add_trick(dog_id, 'play dead')
# Get details.
dog = school.get_dog(dog_id)
assert dog["name"] == 'Fido'
assert dog["tricks"] == ('roll over', 'play dead')
有关更多信息,请参阅Python eventsourcing库和Axon Server项目。
开发者
克隆项目仓库,设置虚拟环境,并安装依赖项。
使用您的集成开发环境(例如PyCharm)打开项目仓库。创建一个Poetry虚拟环境,然后更新包。
$ make update-packages
或者,使用make install
命令为此项目创建一个专门的Python虚拟环境。
$ make install
启动Axon服务器。
$ make start-axon-server
运行测试。
$ make test
在./tests
中添加测试。在./eventsourcing_axonserver
中添加代码。
检查代码格式。
$ make lint
重新格式化代码。
$ make fmt
在pyproject.toml
中添加依赖项,然后更新已安装的包。
$ make update-packages
停止Axon服务器。
$ make stop-axon-server
项目详情
关闭
eventsourcing-axonserver-0.1.5.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 47c2746dcad2438556b1790bebcaf0e33815e14b5f5ade7eb3252ddf5163faa2 |
|
MD5 | 770981e507c1952ac7c76d70d7d958b0 |
|
BLAKE2b-256 | 81d660504bf83a57b26e9c06e2a7c66072b51cf7a4dc593654d80fce0f16509c |
关闭
eventsourcing_axonserver-0.1.5-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | b99017a662badab5503a97a31deb1435f3695c1c2baf23ac6fa4f1489abf1352 |
|
MD5 | 03af2e54544c4b3708f83a32cdba668b |
|
BLAKE2b-256 | e7507d1ca7464323c5a5f88f6ddfa4c87a93ec9eadfd0255a54ed38f88889081 |