跳转到主要内容

用于SQLAlchemy的事件源存储Python包。

项目描述

使用SQLAlchemy的Python事件源

此包支持使用Python eventsourcing 库与 SQLAlchemy 一起使用。

目录

快速开始

要使用SQLAlchemy与您的Python事件源应用程序一起使用

  • 安装Python包 eventsourcing_sqlalchemy
  • 设置环境变量 PERSISTENCE_MODULE'eventsourcing_sqlalchemy'
  • 设置环境变量 SQLALCHEMY_URL 为一个SQLAlchemy数据库URL

更多信息请参见下方。

安装

使用pip从Python包索引安装稳定发行版。请注意,建议将Python包安装到Python虚拟环境中。

$ pip install eventsourcing_sqlalchemy

入门指南

按常规方式定义聚合和应用。

from eventsourcing.application import Application
from eventsourcing.domain import Aggregate, event
from uuid import uuid5, NAMESPACE_URL


class TrainingSchool(Application):
    def register(self, name):
        dog = Dog(name)
        self.save(dog)

    def add_trick(self, name, trick):
        dog = self.repository.get(Dog.create_id(name))
        dog.add_trick(trick)
        self.save(dog)

    def get_tricks(self, name):
        dog = self.repository.get(Dog.create_id(name))
        return dog.tricks


class Dog(Aggregate):
    @event('Registered')
    def __init__(self, name):
        self.name = name
        self.tricks = []

    @staticmethod
    def create_id(name):
        return uuid5(NAMESPACE_URL, f'/dogs/{name}')

    @event('TrickAdded')
    def add_trick(self, trick):
        self.tricks.append(trick)

要将此模块用作应用程序的持久化模块,请将环境变量 PERSISTENCE_MODULE 设置为 'eventsourcing_sqlalchemy'

使用此模块时,您需要将环境变量 SQLALCHEMY_URL 设置为您的数据库的SQLAlchemy数据库URL。有关SQLAlchemy数据库URL的更多信息,请参阅 SQLAlchemy文档

import os

os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'sqlite:///:memory:'

按常规方式构建和使用应用程序。

school = TrainingSchool()
school.register('Fido')
school.add_trick('Fido', 'roll over')
school.add_trick('Fido', 'play dead')
tricks = school.get_tricks('Fido')
assert tricks == ['roll over', 'play dead']

管理应用程序外的交易

有时您可能需要使用事件源应用程序的更新原子地更新SQLAlchemy ORM模型。您可以在应用程序外部管理事务。只需调用应用程序记录器的transaction()方法,并使用返回的Transaction对象作为上下文管理器来获取SQLAlchemy的Session对象。您可以将ORM对象add()到会话中。当Transaction上下文管理器退出时,所有内容都将原子提交。这实际上实现了线程作用域的事务。

with school.recorder.transaction() as session:
    # Update CRUD model.
    ...  # session.add(my_orm_object)
    # Update event-sourced application.
    school.register('Buster')
    school.add_trick('Buster', 'fetch ball')

    tricks = school.get_tricks('Buster')
    assert tricks == ['fetch ball']

请注意,SQLAlchemy的“autoflush”ORM功能默认启用。

app = Application()

with app.recorder.transaction() as session:
    assert session.autoflush is True

如果您需要禁用“autoflush”,则可以设置环境变量SQLALCHEMY_NO_AUTOFLUSH

app = Application(env={'SQLALCHEMY_AUTOFLUSH': 'False'})

with app.recorder.transaction() as session:
    assert session.autoflush is False

或者,您可以直接在SQLAlchemy会话制造器上设置autoflush选项。

app = Application()
app.recorder.datastore.session_maker.kw["autoflush"] = False

with app.recorder.transaction() as session:
    assert session.autoflush is False

或者,您可以使用会话的no_autoflush上下文管理器。

app = Application()

with app.recorder.transaction() as session:
    with session.no_autoflush:
        assert session.autoflush is False

或者,您可以设置会话对象的autoflush属性。

app = Application()

with app.recorder.transaction() as session:
    session.autoflush = False
    # Add CRUD objects to the session.
    ...

使用SQLAlchemy作用域会话

您可以将应用程序配置为使用SQLAlchemy的scoped_session对象,这将作用域会话到标准线程或其他事物,例如Web应用程序框架中的Web请求。

定义一个scoped_session对象的适配器,并使用环境变量SQLALCHEMY_SCOPED_SESSION_TOPIC配置事件源应用程序。

from eventsourcing.application import AggregateNotFound
from eventsourcing.utils import get_topic
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

# Create engine.
engine = create_engine('sqlite:///:memory:')

# Create a scoped_session object.
session = scoped_session(
    sessionmaker(autocommit=False, autoflush=False, bind=engine)
)

# Define an adapter for the scoped session.
class MyScopedSessionAdapter:
    def __getattribute__(self, item: str) -> None:
        return getattr(session, item)

# Produce the topic of the scoped session adapter class.
scoped_session_topic = get_topic(MyScopedSessionAdapter)

# Construct an event-sourced application.
app = Application(
    env={'SQLALCHEMY_SCOPED_SESSION_TOPIC': scoped_session_topic}
)

# During request.
aggregate = Aggregate()
app.save(aggregate)
app.repository.get(aggregate.id)
session.commit()

# After request.
session.remove()

# During request.
app.repository.get(aggregate.id)

# After request.
session.remove()

# During request.
aggregate = Aggregate()
app.save(aggregate)
# forget to commit

# After request.
session.remove()

# During request.
try:
    # forgot to commit
    app.repository.get(aggregate.id)
except AggregateNotFound:
    pass
else:
    raise Exception("Expected aggregate not found")

# After request.
session.remove()

如您所见,您需要在请求期间调用commit(),并在请求完成后调用remove()。将SQLAlchemy与Web应用程序框架集成的包通常会自动调用remove()。其中一些包在处理请求时未引发异常也会自动调用commit()

使用Flask-SQLAlchemy管理会话

Flask-SQLAlchemy完整文档)提供了一个名为SQLAlchemy的类,该类有一个名为session的属性,该属性是一个SQLAlchemy scoped_session。这可以以类似的方式进行适配。

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
try:
    from sqlalchemy.orm import declarative_base  # type: ignore
except ImportError:
    from sqlalchemy.ext.declarative import declarative_base

# Define a Flask app.
flask_app = Flask(__name__)
flask_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'

# Integration between Flask and SQLAlchemy.
Base = declarative_base()
db = SQLAlchemy(flask_app, model_class=Base)


# Define an adapter for the scoped session.
class FlaskScopedSession:
    def __getattribute__(self, item: str) -> None:
        return getattr(db.session, item)


# Run the Flask application in a Web application server.
with flask_app.app_context():

    # Produce the topic of the scoped session adapter class.
    scoped_session_adapter_topic = get_topic(FlaskScopedSession)
    # Construct event-sourced application to use scoped sessions.
    es_app = Application(
        env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": scoped_session_adapter_topic}
    )

    # During request.
    aggregate = Aggregate()
    es_app.save(aggregate)
    db.session.commit()

    # After request (this is done automatically).
    db.session.remove()

    # During request.
    es_app.repository.get(aggregate.id)

    # After request (this is done automatically).
    db.session.remove()

使用FastAPI-SQLAlchemy管理会话

FastAPI-SQLAlchemy实际上没有使用SQLAlchemy的scoped_session,而是有一个全局的db变量,该变量有一个名为session的属性,当访问时返回请求作用域的会话。这可以以类似的方式进行适配。会话在请求成功处理后自动提交,如果引发异常则不提交。

from fastapi import FastAPI
from fastapi_sqlalchemy import db, DBSessionMiddleware

# Define a FastAPI application.
fastapi_app = FastAPI()

# Add SQLAlchemy integration middleware to the FastAPI application.
fastapi_app.add_middleware(
    DBSessionMiddleware, db_url='sqlite:///:memory:'
)

# Build the middleware stack (happens automatically when the FastAPI app runs in a Web app server).
fastapi_app.build_middleware_stack()

# Define an adapter for the scoped session.
class FastapiScopedSession:
    def __getattribute__(self, item: str) -> None:
        return getattr(db.session, item)

# Construct an event-sourced application within a scoped session.
with db(commit_on_exit=True):
    # Produce the topic of the scoped session adapter class.
    scoped_session_adapter_topic = get_topic(FlaskScopedSession)
    # Construct event-sourced application to use scoped sessions.
    es_app = Application(
        env={"SQLALCHEMY_SCOPED_SESSION_TOPIC": get_topic(FastapiScopedSession)}
    )

# Create a new event-sourced aggregate.
with db(commit_on_exit=True):  # This happens automatically before handling a route.
    # Handle request.
    aggregate = Aggregate()
    es_app.save(aggregate)
    es_app.repository.get(aggregate.id)

# The aggregate has been committed.
with db(commit_on_exit=True):  # This happens automatically before handling a route.
    # Handle request.
    es_app.repository.get(aggregate.id)

# Raise exception after creating aggregate.
try:
    with db(commit_on_exit=True):
        # Handle request.
        aggregate = Aggregate()
        es_app.save(aggregate)
        es_app.repository.get(aggregate.id)
        raise TypeError("An error occurred!!!")
except TypeError:
    # Web framework returns an error.
    ...
else:
    raise Exception("Expected type error")

# The aggregate hasn't been committed.
with db(commit_on_exit=True):
    try:
        es_app.repository.get(aggregate.id)
    except AggregateNotFound:
        pass
    else:
        raise Exception("Expected aggregate not found")

Google Cloud SQL Python连接器

您可以将环境变量SQLALCHEMY_CONNECTION_CREATOR_TOPIC设置为将解析为可调用的主题,该可调用用于创建数据库连接。

例如,您可以使用以下方式使用Cloud SQL Python Connector

首先从PyPI安装Cloud SQL Python Connector包。

$ pip install 'cloud-sql-python-connector[pg8000]'

然后定义一个getconn()函数,遵循Cloud SQL Python Connector README页面上的建议。

from google.cloud.sql.connector import Connector

# initialize Connector object
connector = Connector()

# function to return the database connection
def get_google_cloud_sql_conn():
    return connector.connect(
        "project:region:instance",
        "pg8000",
        user="postgres-iam-user@gmail.com",
        db="my-db-name",
        enable_iam_auth=True,
   )

设置环境变量'SQLALCHEMY_CONNECTION_CREATOR_TOPIC',以及'PERSISTENCE_MODULE''SQLALCHEMY_URL'

from eventsourcing.utils import get_topic

os.environ['PERSISTENCE_MODULE'] = 'eventsourcing_sqlalchemy'
os.environ['SQLALCHEMY_URL'] = 'postgresql+pg8000://'
os.environ['SQLALCHEMY_CONNECTION_CREATOR_TOPIC'] = get_topic(get_google_cloud_sql_conn)

更多信息

有关更多信息,请参阅库的文档SQLAlchemy项目。

项目详细信息


下载文件

下载适用于您平台的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源代码分发

eventsourcing_sqlalchemy-0.8.tar.gz (15.1 kB 查看哈希值)

上传时间: 源码

构建版本

eventsourcing_sqlalchemy-0.8-py3-none-any.whl (13.1 kB 查看哈希值)

上传时间: Python 3

由以下支持