跳转到主要内容

支持分页、异步、SQLModel和pytest的FastAPI SQLAlchemy扩展,适用于生产环境。

项目描述

Fastapi-SQLA

codecov CircleCI PyPI Conventional Commits Code style: black

Fastapi-SQLA 是一个用于 SQLAlchemy 的扩展,适用于 FastAPI,易于设置,支持分页、异步IO、SQLModelpytest。它支持 SQLAlchemy≥1.3,并且完全符合 SQLAlchemy 2.0。它由 @dialoguemd 团队在蒙特利尔(🇨🇦)开发、维护并在生产中使用,充满爱心。

安装

使用 pip

pip install fastapi-sqla

请注意,您需要一个与 SQLAlchemy 兼容的引擎适配器。我们使用 psycopg2 进行测试,您可以使用 psycopg2 扩展进行安装。

快速示例

假设它运行在包含一个名为 user 的表上,该表有 3 个列,idname 和唯一的 email

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi_sqla import Base, Item, Page, Paginate, Session, setup_middlewares, startup
from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError


@asynccontextmanager
async def lifespan(app: FastAPI):
    await startup()
    yield


app = FastAPI(lifespan=lifespan)
setup_middlewares(app)


class User(Base):
    __tablename__ = "user"


class UserIn(BaseModel):
    name: str
    email: EmailStr


class UserModel(UserIn):
    id: int

    class Config:
        orm_mode = True


@app.get("/users", response_model=Page[UserModel])
def list_users(paginate: Paginate):
    return paginate(select(User))


@app.get("/users/{user_id}", response_model=Item[UserModel])
def get_user(user_id: int, session: Session):
    user = session.get(User, user_id)
    if user is None:
        raise HTTPException(404)
    return {"data": user}


@app.post("/users", response_model=Item[UserModel])
def create_user(new_user: UserIn, session: Session):
    user = User(**new_user.model_dump())
    session.add(user)
    try:
        session.flush()
    except IntegrityError:
        raise HTTPException(409, "Email is already taken.")
    return {"data": user}

使用 sqlite3 创建数据库

sqlite3 db.sqlite <<EOF
CREATE TABLE user (
    id    INTEGER PRIMARY KEY AUTOINCREMENT,
    email TEXT NOT NULL,
    name  TEXT NOT NULL
);
CREATE UNIQUE INDEX user_email_idx ON user (email);
EOF

运行应用程序

sqlalchemy_url=sqlite:///db.sqlite?check_same_thread=false uvicorn main:app

配置

环境变量

os.environ 中,感兴趣的键名以 sqlalchemy_ 开头。每个匹配的键(在去掉前缀后)被视为 sqlalchemy.create_engine 调用的相应关键字参数。

唯一必需的键是 sqlalchemy_url,它提供数据库 URL,例如

export sqlalchemy_url=postgresql://postgres@localhost

多会话支持

为了配置应用程序的多个会话,使用以下前缀格式设置环境变量: fastapi_sqla__MY_KEY__

与默认会话一样,每个匹配的键(去掉前缀后)被视为 sqlalchemy.create_engine 调用的相应关键字参数。

例如,要配置具有 read_only 键的会话

export fastapi_sqla__read_only__sqlalchemy_url=postgresql://postgres@localhost

使用 asyncpg 支持 asyncio

SQLAlchemy >= 1.4 支持 asyncio。要启用对 Postgres 数据库的 asyncio 支持,安装 asyncpg

pip install asyncpg

并定义环境变量 sqlalchemy_url 使用 postgres+asyncpg 方案

export sqlalchemy_url=postgresql+asyncpg://postgres@localhost

设置应用程序 AsyncContextManager(推荐)

import fastapi_sqla
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    await fastapi_sqla.startup()
    yield


app = FastAPI(lifespan=lifespan)
fastapi_sqla.setup_middlewares(app)

使用启动/关闭事件设置应用程序(已弃用)

import fastapi_sqla
from fastapi import FastAPI

app = FastAPI()
fastapi_sqla.setup(app)

SQLAlchemy

添加新的实体类

from fastapi_sqla import Base


class Entity(Base):
    __tablename__ = "table-name-in-db"

获取 sqla 会话

使用依赖注入

使用 FastAPI 依赖注入 将会话作为路径操作函数的参数获取。

在返回响应之前提交 SQLAlchemy 会话,或者在发生任何异常时回滚

from fastapi import APIRouter
from fastapi_sqla import AsyncSession, Session

router = APIRouter()


@router.get("/example")
def example(session: Session):
    return session.execute("SELECT now()").scalar()


@router.get("/async_example")
async def async_example(session: AsyncSession):
    return await session.scalar("SELECT now()")

为了获取使用自定义键配置的会话

from typing import Annotated

from fastapi import APIRouter, Depends
from fastapi_sqla import (
    AsyncSessionDependency,
    SessionDependency,
    SqlaAsyncSession,
    SqlaSession,
)

router = APIRouter()


# Preferred

ReadOnlySession = Annotated[SqlaSession, Depends(SessionDependency(key="read_only"))]
AsyncReadOnlySession = Annotated[
    SqlaAsyncSession, Depends(AsyncSessionDependency(key="read_only"))
]

@router.get("/example")
def example(session: ReadOnlySession):
    return session.execute("SELECT now()").scalar()


@router.get("/async_example")
async def async_example(session: AsyncReadOnlySession):
    return await session.scalar("SELECT now()")


# Alternative

@router.get("/example/alt")
def example_alt(session: SqlaSession = Depends(SessionDependency(key="read_only"))):
    return session.execute("SELECT now()").scalar()


@router.get("/async_example/alt")
async def async_example_alt(
    session: SqlaAsyncSession = Depends(AsyncSessionDependency(key="read_only")),
):
    return await session.scalar("SELECT now()")

使用上下文管理器

当需要在路径操作之外获取会话时,例如在使用 FastAPI 后台任务 时,使用 fastapi_sqla.open_session 上下文管理器。SQLAlchemy 会话在退出上下文时提交,或者在发生任何异常时回滚

from fastapi import APIRouter, BackgroundTasks
from fastapi_sqla import open_async_session, open_session

router = APIRouter()


@router.get("/example")
def example(bg: BackgroundTasks):
    bg.add_task(run_bg)
    bg.add_task(run_async_bg)


def run_bg():
    with open_session() as session:
        session.execute("SELECT now()").scalar()

def run_bg_with_key():
    with open_session(key="read_only") as session:
        session.execute("SELECT now()").scalar()

async def run_async_bg():
    async with open_async_session() as session:
        await session.scalar("SELECT now()")

async def run_async_bg_with_key():
    async with open_async_session(key="read_only") as session:
        await session.scalar("SELECT now()")

分页

from fastapi import APIRouter
from fastapi_sqla import Base, Page, Paginate
from pydantic import BaseModel
from sqlalchemy import select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str

    class Config:
        orm_mode = True


@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: Paginate):
    return paginate(select(User))

默认情况下

  • 它返回每页 10 项,最多 100 项;

  • 使用 Query.count 查询集合中的项目总数。

  • /users?offset=40&limit=10 的响应示例

    {
        "data": [
            {
                "id": 41,
                "name": "Pat Thomas"
            },
            {
                "id": 42,
                "name": "Mulatu Astatke"
            }
        ],
        "meta": {
            "offset": 40,
            "total_items": 42,
            "total_pages": 5,
            "page_number": 5
        }
    }
    

分页非标量结果

要分页不返回 标量 的查询,在调用 paginate 时指定 scalars=False

from fastapi import APIRouter
from fastapi_sqla import Base, Page, Paginate
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.orm import relationship

router = APIRouter()


class User(Base):
    __tablename__ = "user"
    notes = relationship("Note")


class Note(Base):
    __tablename__ = "note"


class UserModel(BaseModel):
    id: int
    name: str
    notes_count: int


@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: Paginate):
    query = (
        select(User.id, User.name, func.count(Note.id).label("notes_count"))
        .join(Note)
        .group_by(User)
    )
    return paginate(query, scalars=False)

自定义分页

您可以自定义

  • 每页的最小和最大项目数;
  • 如何查询集合中的项目总数;

要自定义分页,创建一个使用 fastapi_sqla.Pagination 的依赖项

from fastapi import APIRouter, Depends
from fastapi_sqla import Base, Page, Pagination, Session
from pydantic import BaseModel
from sqlalchemy import func, select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str


def query_count(session: Session) -> int:
    return session.execute(select(func.count()).select_from(User)).scalar()


CustomPaginate = Pagination(min_page_size=5, max_page_size=500, query_count=query_count)


@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: CustomPaginate = Depends()):
    return paginate(select(User))

异步分页

当使用异步支持时,使用 AsyncPaginate 依赖项

from fastapi import APIRouter
from fastapi_sqla import Base, Page, AsyncPaginate
from pydantic import BaseModel
from sqlalchemy import select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str

    class Config:
        orm_mode = True


@router.get("/users", response_model=Page[UserModel])
async def all_users(paginate: AsyncPaginate):
    return await paginate(select(User))

通过创建使用 fastapi_sqla.AsyncPagination 的依赖项来自定义分页

from fastapi import APIRouter, Depends
from fastapi_sqla import Base, Page, AsyncPagination, AsyncSession
from pydantic import BaseModel
from sqlalchemy import func, select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str


async def query_count(session: AsyncSession) -> int:
    result = await session.execute(select(func.count()).select_from(User))
    return result.scalar()


CustomPaginate = AsyncPagination(min_page_size=5, max_page_size=500, query_count=query_count)


@router.get("/users", response_model=Page[UserModel])
async def all_users(paginate: CustomPaginate = Depends()):
    return await paginate(select(User))

多会话支持

分页支持多个会话。要使用配置了自定义键的会话进行分页

from typing import Annotated

from fastapi import APIRouter, Depends
from fastapi_sqla import (
    AsyncPaginateSignature,
    AsyncPagination,
    Base,
    Page,
    PaginateSignature,
    Pagination,
)
from pydantic import BaseModel
from sqlalchemy import func, select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str


# Preferred

ReadOnlyPaginate = Annotated[
    PaginateSignature, Depends(Pagination(session_key="read_only"))
]
AsyncReadOnlyPaginate = Annotated[
    AsyncPaginateSignature, Depends(AsyncPagination(session_key="read_only"))
]

@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: ReadOnlyPaginate):
    return paginate(select(User))

@router.get("/async_users", response_model=Page[UserModel])
async def async_all_users(paginate: AsyncReadOnlyPaginate):
    return await paginate(select(User))


# Alternative

@router.get("/users/alt", response_model=Page[UserModel])
def all_users_alt(
    paginate: PaginateSignature = Depends(
        Pagination(session_key="read_only")
    ),
):
    return paginate(select(User))

@router.get("/async_users/alt", response_model=Page[UserModel])
async def async_all_users_alt(
    paginate: AsyncPaginateSignature = Depends(
        AsyncPagination(session_key="read_only")
    ),
):
    return await paginate(select(User))

SQLModel 支持 🎉

如果你的项目使用 SQLModel,则 Session 依赖项是一个 SQLModel 会话:

    from http import HTTPStatus

    from fastapi import FastAPI, HTTPException
    from fastapi_sqla import Item, Page, Paginate, Session, setup
    from sqlmodel import Field, SQLModel, select

    class Hero(SQLModel, table=True):
        id: int | None = Field(default=None, primary_key=True)
        name: str
        secret_name: str
        age: int | None = None


    app = FastAPI()
    setup(app)

    @app.get("/heros", response_model=Page[Hero])
    def list_hero(paginate: Paginate) -> Page[Hero]:
        return paginate(select(Hero))


    @app.get("/heros/{hero_id}", response_model=Item[Hero])
    def get_hero(hero_id: int, session: Session) -> Item[Hero]:
        hero = session.get(Hero, hero_id)
        if hero is None:
            raise HTTPException(HTTPStatus.NOT_FOUND)
        return {"data": hero}

Pytest 固定值

此库通过其 PyTest 插件提供了一组实用固定值,该插件与库自动安装。使用插件需要 pytest_plugin 扩展。

默认情况下,在运行测试时不会实际将记录写入数据库。目前无法更改此行为。

sqla_modules

必须定义此固定值,以便插件可以在你的 SQLAlchemy 实体中反映表元数据。它应该只导入包含 SQLAlchemy 模型的所有应用程序模块。

示例

# tests/conftest.py
from pytest import fixture


@fixture
def sqla_modules():
    from app import sqla  # noqa

db_url

要使用的数据库 URL。

当在环境变量中设置 CI 键时,默认使用 postgres 作为主机名

postgresql://postgres@postgres/postgres

在其他情况下,主机设置为 localhost

postgresql://postgres@localhost/postgres

当然,你可以通过覆盖固定值来覆盖它

from pytest import fixture


@fixture(scope="session")
def db_url():
    return "postgresql://postgres@localhost/test_database"

async_sqlalchemy_url

使用 asyncio 支持时使用的数据库 URL。默认为 db_url 固定值,带有 postgresql+asyncpg:// 方案。

session & async_session

用于创建 db 固定值的 Sqla 会话

  • 在测试设置或测试期间所做的所有更改将在测试清理时回滚;
  • 实际上不会将任何记录写入数据库;
  • 一个常规会话中的更改需要提交才能从其他常规会话中可用;
  • 一个异步会话中的更改需要提交才能从其他异步会话中可用;
  • 常规会话的更改即使在提交后也无法从 async 会话中获取,反之亦然;

示例

from pytest import fixture


@fixture
def patient(session):
    from er.sqla import Patient
    patient = Patient(first_name="Bob", last_name="David")
    session.add(patient)
    session.commit()
    return patient


@fixture
async def doctor(async_session):
    from er.sqla import Doctor
    doctor = Doctor(name="who")
    async_session.add(doctor)
    await async_session.commit()
    return doctor

db_migration

一个会话范围固定值,在测试会话设置时运行 alembic upgrade,在清理时运行 alembic downgrade

它依赖于 alembic_ini_path 固定值来获取 alembic.ini 文件的路径。

在测试或测试模块中使用

from pytest import mark

pytestmark = mark.usefixtures("db_migration")

要在全局范围内使用,请将其添加到 pytest 选项

[pytest]
usefixtures =
    db_migration

或在顶层 conftest.py 中依赖它,并将其标记为 auto-used

from pytest import fixture


@fixture(scope="session", autouse=True)
def db_migration(db_migration):
    pass

alembic_ini_path

它返回 alembic.ini 配置文件的路径。默认情况下,它返回 ./alembic.ini

开发

先决条件

  • Python >=3.9
  • Poetry 用于安装包依赖项。
  • 可访问的 postgres 数据库 postgresql://postgres@localhost/postgres

设置

$ poetry install --all-extras

运行测试

$ poetry run pytest

在多个环境中运行测试

$ poetry run tox

项目详情


下载文件

下载适合您平台的文件。如果您不确定该选择哪个,请了解更多关于安装包的信息。

源代码分发

fastapi_sqla-3.4.5.tar.gz (20.7 kB 查看哈希值)

上传时间 源代码

构建分发

fastapi_sqla-3.4.5-py3-none-any.whl (20.4 kB 查看哈希值)

上传时间 Python 3

由以下支持