用于SQLAlchemy的事件源存储Python包。
项目描述
使用SQLAlchemy的Python事件源
此包支持使用Python eventsourcing 库与 SQLAlchemy 一起使用。
目录
- 目录
- 快速开始
- 安装
- 入门指南
- 管理应用程序外的交易
- 使用SQLAlchemy作用域会话
- 使用Flask-SQLAlchemy管理会话
- 使用FastAPI-SQLAlchemy管理会话
- Google Cloud SQL Python连接器
- 更多信息
快速开始
要使用SQLAlchemy与您的Python事件源应用程序一起使用
- 安装Python包
eventsourcing_sqlalchemy
- 设置环境变量
PERSISTENCE_MODULE
为'eventsourcing_sqlalchemy'
- 设置环境变量
SQLALCHEMY_URL
为一个SQLAlchemy数据库URL
更多信息请参见下方。
安装
使用pip从Python包索引安装稳定发行版。请注意,建议将Python包安装到Python虚拟环境中。
$ pip install eventsourcing_sqlalchemy
入门指南
按常规方式定义聚合和应用。
from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
from uuid import uuid5, NAMESPACE_URL
class TrainingSchool(Application):
def register(self, name):
dog = Dog(name)
self.save(dog)
def add_trick(self, name, trick):
dog = self.repository.get(Dog.create_id(name))
dog.add_trick(trick)
self.save(dog)
def get_tricks(self, name):
dog = self.repository.get(Dog.create_id(name))
return dog.tricks
class Dog(Aggregate):
@event('Registered')
def __init__(self, name):
self.name = name
self.tricks = []
@staticmethod
def create_id(name):
return uuid5(NAMESPACE_URL, f'/dogs/{name}')
@event('TrickAdded')
def add_trick(self, trick):
self.tricks.append(trick)
要将此模块用作应用程序的持久化模块,请将环境变量 PERSISTENCE_MODULE
设置为 'eventsourcing_sqlalchemy'
。
使用此模块时,您需要将环境变量 SQLALCHEMY_URL
设置为您的数据库的SQLAlchemy数据库URL。有关SQLAlchemy数据库URL的更多信息,请参阅 SQLAlchemy文档。
import os
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'
按常规方式构建和使用应用程序。
school = TrainingSchool()
school.register('Fido')
school.add_trick('Fido', 'roll over')
school.add_trick('Fido', 'play dead')
tricks = school.get_tricks('Fido')
assert tricks == ['roll over', 'play dead']
管理应用程序外的交易
有时您可能需要使用事件源应用程序的更新原子地更新SQLAlchemy ORM模型。您可以在应用程序外部管理事务。只需调用应用程序记录器的transaction()
方法,并使用返回的Transaction
对象作为上下文管理器来获取SQLAlchemy的Session
对象。您可以将ORM对象add()
到会话中。当Transaction
上下文管理器退出时,所有内容都将原子提交。这实际上实现了线程作用域的事务。
with school.recorder.transaction() as session:
# Update CRUD model.
... # session.add(my_orm_object)
# Update event-sourced application.
school.register('Buster')
school.add_trick('Buster', 'fetch ball')
tricks = school.get_tricks('Buster')
assert tricks == ['fetch ball']
请注意,SQLAlchemy的“autoflush”ORM功能默认启用。
app = Application()
with app.recorder.transaction() as session:
assert session.autoflush is True
如果您需要禁用“autoflush”,则可以设置环境变量SQLALCHEMY_NO_AUTOFLUSH
。
app = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})
with app.recorder.transaction() as session:
assert session.autoflush is False
或者,您可以直接在SQLAlchemy会话制造器上设置autoflush选项。
app = Application()
app.recorder.datastore.session_maker.kw["autoflush"] = False
with app.recorder.transaction() as session:
assert session.autoflush is False
或者,您可以使用会话的no_autoflush
上下文管理器。
app = Application()
with app.recorder.transaction() as session:
with session.no_autoflush:
assert session.autoflush is False
或者,您可以设置会话对象的autoflush
属性。
app = Application()
with app.recorder.transaction() as session:
session.autoflush = False
# Add CRUD objects to the session.
...
使用SQLAlchemy作用域会话
您可以将应用程序配置为使用SQLAlchemy的scoped_session
对象,这将作用域会话到标准线程或其他事物,例如Web应用程序框架中的Web请求。
定义一个scoped_session
对象的适配器,并使用环境变量SQLALCHEMY_SCOPED_SESSION_TOPIC
配置事件源应用程序。
from eventsourcing.application import AggregateNotFound
from eventsourcing.utils import get_topic
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
# Create engine.
engine = create_engine('sqlite:///:memory:')
# Create a scoped_session object.
session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine)
)
# Define an adapter for the scoped session.
class MyScopedSessionAdapter:
def __getattribute__(self, item: str) -> None:
return getattr(session, item)
# Produce the topic of the scoped session adapter class.
scoped_session_topic = get_topic(MyScopedSessionAdapter)
# Construct an event-sourced application.
app = Application(
env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}
)
# During request.
aggregate = Aggregate()
app.save(aggregate)
app.repository.get(aggregate.id)
session.commit()
# After request.
session.remove()
# During request.
app.repository.get(aggregate.id)
# After request.
session.remove()
# During request.
aggregate = Aggregate()
app.save(aggregate)
# forget to commit
# After request.
session.remove()
# During request.
try:
# forgot to commit
app.repository.get(aggregate.id)
except AggregateNotFound:
pass
else:
raise Exception("Expected aggregate not found")
# After request.
session.remove()
如您所见,您需要在请求期间调用commit()
,并在请求完成后调用remove()
。将SQLAlchemy与Web应用程序框架集成的包通常会自动调用remove()
。其中一些包在处理请求时未引发异常也会自动调用commit()
。
使用Flask-SQLAlchemy管理会话
包Flask-SQLAlchemy(完整文档)提供了一个名为SQLAlchemy
的类,该类有一个名为session
的属性,该属性是一个SQLAlchemy scoped_session
。这可以以类似的方式进行适配。
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
try:
from sqlalchemy.orm import declarative_base # type: ignore
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
# Define a Flask app.
flask_app = Flask(__name__)
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
# Integration between Flask and SQLAlchemy.
Base = declarative_base()
db = SQLAlchemy(flask_app, model_class=Base)
# Define an adapter for the scoped session.
class FlaskScopedSession:
def __getattribute__(self, item: str) -> None:
return getattr(db.session, item)
# Run the Flask application in a Web application server.
with flask_app.app_context():
# Produce the topic of the scoped session adapter class.
scoped_session_adapter_topic = get_topic(FlaskScopedSession)
# Construct event-sourced application to use scoped sessions.
es_app = Application(
env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": scoped_session_adapter_topic}
)
# During request.
aggregate = Aggregate()
es_app.save(aggregate)
db.session.commit()
# After request (this is done automatically).
db.session.remove()
# During request.
es_app.repository.get(aggregate.id)
# After request (this is done automatically).
db.session.remove()
使用FastAPI-SQLAlchemy管理会话
包FastAPI-SQLAlchemy实际上没有使用SQLAlchemy的scoped_session
,而是有一个全局的db
变量,该变量有一个名为session
的属性,当访问时返回请求作用域的会话。这可以以类似的方式进行适配。会话在请求成功处理后自动提交,如果引发异常则不提交。
from fastapi import FastAPI
from fastapi_sqlalchemy import db, DBSessionMiddleware
# Define a FastAPI application.
fastapi_app = FastAPI()
# Add SQLAlchemy integration middleware to the FastAPI application.
fastapi_app.add_middleware(
DBSessionMiddleware, db_url='sqlite:///:memory:'
)
# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).
fastapi_app.build_middleware_stack()
# Define an adapter for the scoped session.
class FastapiScopedSession:
def __getattribute__(self, item: str) -> None:
return getattr(db.session, item)
# Construct an event-sourced application within a scoped session.
with db(commit_on_exit=True):
# Produce the topic of the scoped session adapter class.
scoped_session_adapter_topic = get_topic(FlaskScopedSession)
# Construct event-sourced application to use scoped sessions.
es_app = Application(
env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": get_topic(FastapiScopedSession)}
)
# Create a new event-sourced aggregate.
with db(commit_on_exit=True): # This happens automatically before handling a route.
# Handle request.
aggregate = Aggregate()
es_app.save(aggregate)
es_app.repository.get(aggregate.id)
# The aggregate has been committed.
with db(commit_on_exit=True): # This happens automatically before handling a route.
# Handle request.
es_app.repository.get(aggregate.id)
# Raise exception after creating aggregate.
try:
with db(commit_on_exit=True):
# Handle request.
aggregate = Aggregate()
es_app.save(aggregate)
es_app.repository.get(aggregate.id)
raise TypeError("An error occurred!!!")
except TypeError:
# Web framework returns an error.
...
else:
raise Exception("Expected type error")
# The aggregate hasn't been committed.
with db(commit_on_exit=True):
try:
es_app.repository.get(aggregate.id)
except AggregateNotFound:
pass
else:
raise Exception("Expected aggregate not found")
Google Cloud SQL Python连接器
您可以将环境变量SQLALCHEMY_CONNECTION_CREATOR_TOPIC
设置为将解析为可调用的主题,该可调用用于创建数据库连接。
例如,您可以使用以下方式使用Cloud SQL Python Connector。
首先从PyPI安装Cloud SQL Python Connector包。
$ pip install 'cloud-sql-python-connector[pg8000]'
然后定义一个getconn()
函数,遵循Cloud SQL Python Connector README页面上的建议。
from google.cloud.sql.connector import Connector
# initialize Connector object
connector = Connector()
# function to return the database connection
def get_google_cloud_sql_conn():
return connector.connect(
"project:region:instance",
"pg8000",
user="postgres-iam-user@gmail.com",
db="my-db-name",
enable_iam_auth=True,
)
设置环境变量'SQLALCHEMY_CONNECTION_CREATOR_TOPIC'
,以及'PERSISTENCE_MODULE'
和'SQLALCHEMY_URL'
。
from eventsourcing.utils import get_topic
os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'
os.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)
更多信息
有关更多信息,请参阅库的文档和SQLAlchemy项目。
项目详细信息
下载文件
下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。
源代码分发
构建版本
eventsourcing_sqlalchemy-0.8.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | a8429fdc5d1bbd6a6c58050ae9cfa831889a932fe9f96f166b4f97e7a8820999 |
|
MD5 | 82f8561a51ea572e53cdf435c92456dd |
|
BLAKE2b-256 | 54bd72e8209290ea6c72eed3a3bf92f55a782fbb74f2c8a7b0579bb49bfd1d60 |
eventsourcing_sqlalchemy-0.8-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | e8950b5b37587e10186b7727c7d89eef2e9b28f37314eb9e799635e20b1890a6 |
|
MD5 | 9bfded657644e682db8dd103b4b6ff97 |
|
BLAKE2b-256 | 50528e152139559a0c46b4864229dbffb53d8aacc034a0d41315adf5f29e4d15 |