Python的CQRS和事件源框架
项目描述
# Kant框架
[](https://travis-ci.org/patrickporto/kant)
[](https://codecov.io/github/patrickporto/kant?branch=master)
[](https://pypi.python.org/pypi/kant)
[](https://pypi.python.org/pypi/kant)
[](https://pypi.python.org/pypi/kant)
一个安全的人性化CQRS和事件源框架。
## 功能支持
* 事件存储
* 乐观并发控制
* JSON序列化
* SQLAlchemy投影
* 快照 **[进行中]**
康德官方支持Python 3.5-3.6。
## 入门
创建声明性事件
```python
from kant import events
class BankAccountCreated(events.Event)
id = events.CUIDField(primary_key=True)
owner = events.CharField()
class DepositPerformed(events.Event)
amount = events.DecimalField()
```
创建聚合以应用事件
```python
from kant import aggregates
class BankAccount(aggregates.Aggregate)
id = aggregates.CUIDField()
owner = aggregates.CharField()
balance = aggregates.DecimalField()
def apply_bank_account_created(self, event)
self.id = event.id
self.owner = event.owner
self.balance = 0
定义函数 apply_deposit_performed(self, event)
self.balance += event.amount
```
现在,保存事件
```python
从 kant.eventstore 导入 connect
await connect(user='user', password='user', database='database')
创建银行账户事件存储
conn.create_keyspace('bank_account')
创建事件
bank_account_created = BankAccountCreated(
id=123,
owner='John Doe',
)
deposit_performed = DepositPerformed(
amount=20,
)
bank_account = BankAccount()
bank_account.dispatch([bank_account_created, deposit_performed])
bank_account.save()
stored_bank_account = BankAccount.objects.get(123)
```
## 安装
要安装 Kant,只需使用 [pipenv](pipenv.org)(或 pip)
```bash
$ pipenv install kant
```
## 贡献
请阅读贡献指南 [CONTRIBUTING](CONTRIBUTING.md)。
[](https://travis-ci.org/patrickporto/kant)
[](https://codecov.io/github/patrickporto/kant?branch=master)
[](https://pypi.python.org/pypi/kant)
[](https://pypi.python.org/pypi/kant)
[](https://pypi.python.org/pypi/kant)
一个安全的人性化CQRS和事件源框架。
## 功能支持
* 事件存储
* 乐观并发控制
* JSON序列化
* SQLAlchemy投影
* 快照 **[进行中]**
康德官方支持Python 3.5-3.6。
## 入门
创建声明性事件
```python
from kant import events
class BankAccountCreated(events.Event)
id = events.CUIDField(primary_key=True)
owner = events.CharField()
class DepositPerformed(events.Event)
amount = events.DecimalField()
```
创建聚合以应用事件
```python
from kant import aggregates
class BankAccount(aggregates.Aggregate)
id = aggregates.CUIDField()
owner = aggregates.CharField()
balance = aggregates.DecimalField()
def apply_bank_account_created(self, event)
self.id = event.id
self.owner = event.owner
self.balance = 0
定义函数 apply_deposit_performed(self, event)
self.balance += event.amount
```
现在,保存事件
```python
从 kant.eventstore 导入 connect
await connect(user='user', password='user', database='database')
创建银行账户事件存储
conn.create_keyspace('bank_account')
创建事件
bank_account_created = BankAccountCreated(
id=123,
owner='John Doe',
)
deposit_performed = DepositPerformed(
amount=20,
)
bank_account = BankAccount()
bank_account.dispatch([bank_account_created, deposit_performed])
bank_account.save()
stored_bank_account = BankAccount.objects.get(123)
```
## 安装
要安装 Kant,只需使用 [pipenv](pipenv.org)(或 pip)
```bash
$ pipenv install kant
```
## 贡献
请阅读贡献指南 [CONTRIBUTING](CONTRIBUTING.md)。
项目详情
关闭
kant-2.1.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 7ceac36edfeddb8ab55fefc0d8853dcf03d0c89bb594c2d069cd556e5c5d3075 |
|
MD5 | c71be7ba34b74eaa6169ddacc864d51a |
|
BLAKE2b-256 | b608c104b9c0170b676b24c21100695271fff7853c56bb18ea3792b88f373cf9 |