支持分页、异步、SQLModel和pytest的FastAPI SQLAlchemy扩展,适用于生产环境。
项目描述
Fastapi-SQLA
Fastapi-SQLA 是一个用于 SQLAlchemy 的扩展,适用于 FastAPI,易于设置,支持分页、异步IO、SQLModel 和 pytest。它支持 SQLAlchemy≥1.3,并且完全符合 SQLAlchemy 2.0。它由 @dialoguemd 团队在蒙特利尔(🇨🇦)开发、维护并在生产中使用,充满爱心。
安装
使用 pip
pip install fastapi-sqla
请注意,您需要一个与 SQLAlchemy 兼容的引擎适配器。我们使用 psycopg2
进行测试,您可以使用 psycopg2
扩展进行安装。
快速示例
假设它运行在包含一个名为 user
的表上,该表有 3 个列,id
、name
和唯一的 email
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi_sqla import Base, Item, Page, Paginate, Session, setup_middlewares, startup
from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
@asynccontextmanager
async def lifespan(app: FastAPI):
await startup()
yield
app = FastAPI(lifespan=lifespan)
setup_middlewares(app)
class User(Base):
__tablename__ = "user"
class UserIn(BaseModel):
name: str
email: EmailStr
class UserModel(UserIn):
id: int
class Config:
orm_mode = True
@app.get("/users", response_model=Page[UserModel])
def list_users(paginate: Paginate):
return paginate(select(User))
@app.get("/users/{user_id}", response_model=Item[UserModel])
def get_user(user_id: int, session: Session):
user = session.get(User, user_id)
if user is None:
raise HTTPException(404)
return {"data": user}
@app.post("/users", response_model=Item[UserModel])
def create_user(new_user: UserIn, session: Session):
user = User(**new_user.model_dump())
session.add(user)
try:
session.flush()
except IntegrityError:
raise HTTPException(409, "Email is already taken.")
return {"data": user}
使用 sqlite3
创建数据库
sqlite3 db.sqlite <<EOF
CREATE TABLE user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
name TEXT NOT NULL
);
CREATE UNIQUE INDEX user_email_idx ON user (email);
EOF
运行应用程序
sqlalchemy_url=sqlite:///db.sqlite?check_same_thread=false uvicorn main:app
配置
环境变量
在 os.environ
中,感兴趣的键名以 sqlalchemy_
开头。每个匹配的键(在去掉前缀后)被视为 sqlalchemy.create_engine
调用的相应关键字参数。
唯一必需的键是 sqlalchemy_url
,它提供数据库 URL,例如
export sqlalchemy_url=postgresql://postgres@localhost
多会话支持
为了配置应用程序的多个会话,使用以下前缀格式设置环境变量: fastapi_sqla__MY_KEY__
。
与默认会话一样,每个匹配的键(去掉前缀后)被视为 sqlalchemy.create_engine
调用的相应关键字参数。
例如,要配置具有 read_only
键的会话
export fastapi_sqla__read_only__sqlalchemy_url=postgresql://postgres@localhost
使用 asyncpg
支持 asyncio
SQLAlchemy >= 1.4
支持 asyncio
。要启用对 Postgres 数据库的 asyncio
支持,安装 asyncpg
pip install asyncpg
并定义环境变量 sqlalchemy_url
使用 postgres+asyncpg
方案
export sqlalchemy_url=postgresql+asyncpg://postgres@localhost
设置应用程序 AsyncContextManager(推荐)
import fastapi_sqla
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
await fastapi_sqla.startup()
yield
app = FastAPI(lifespan=lifespan)
fastapi_sqla.setup_middlewares(app)
使用启动/关闭事件设置应用程序(已弃用)
import fastapi_sqla
from fastapi import FastAPI
app = FastAPI()
fastapi_sqla.setup(app)
SQLAlchemy
添加新的实体类
from fastapi_sqla import Base
class Entity(Base):
__tablename__ = "table-name-in-db"
获取 sqla 会话
使用依赖注入
使用 FastAPI 依赖注入 将会话作为路径操作函数的参数获取。
在返回响应之前提交 SQLAlchemy 会话,或者在发生任何异常时回滚
from fastapi import APIRouter
from fastapi_sqla import AsyncSession, Session
router = APIRouter()
@router.get("/example")
def example(session: Session):
return session.execute("SELECT now()").scalar()
@router.get("/async_example")
async def async_example(session: AsyncSession):
return await session.scalar("SELECT now()")
为了获取使用自定义键配置的会话
from typing import Annotated
from fastapi import APIRouter, Depends
from fastapi_sqla import (
AsyncSessionDependency,
SessionDependency,
SqlaAsyncSession,
SqlaSession,
)
router = APIRouter()
# Preferred
ReadOnlySession = Annotated[SqlaSession, Depends(SessionDependency(key="read_only"))]
AsyncReadOnlySession = Annotated[
SqlaAsyncSession, Depends(AsyncSessionDependency(key="read_only"))
]
@router.get("/example")
def example(session: ReadOnlySession):
return session.execute("SELECT now()").scalar()
@router.get("/async_example")
async def async_example(session: AsyncReadOnlySession):
return await session.scalar("SELECT now()")
# Alternative
@router.get("/example/alt")
def example_alt(session: SqlaSession = Depends(SessionDependency(key="read_only"))):
return session.execute("SELECT now()").scalar()
@router.get("/async_example/alt")
async def async_example_alt(
session: SqlaAsyncSession = Depends(AsyncSessionDependency(key="read_only")),
):
return await session.scalar("SELECT now()")
使用上下文管理器
当需要在路径操作之外获取会话时,例如在使用 FastAPI 后台任务 时,使用 fastapi_sqla.open_session
上下文管理器。SQLAlchemy 会话在退出上下文时提交,或者在发生任何异常时回滚
from fastapi import APIRouter, BackgroundTasks
from fastapi_sqla import open_async_session, open_session
router = APIRouter()
@router.get("/example")
def example(bg: BackgroundTasks):
bg.add_task(run_bg)
bg.add_task(run_async_bg)
def run_bg():
with open_session() as session:
session.execute("SELECT now()").scalar()
def run_bg_with_key():
with open_session(key="read_only") as session:
session.execute("SELECT now()").scalar()
async def run_async_bg():
async with open_async_session() as session:
await session.scalar("SELECT now()")
async def run_async_bg_with_key():
async with open_async_session(key="read_only") as session:
await session.scalar("SELECT now()")
分页
from fastapi import APIRouter
from fastapi_sqla import Base, Page, Paginate
from pydantic import BaseModel
from sqlalchemy import select
router = APIRouter()
class User(Base):
__tablename__ = "user"
class UserModel(BaseModel):
id: int
name: str
class Config:
orm_mode = True
@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: Paginate):
return paginate(select(User))
默认情况下
-
它返回每页 10 项,最多 100 项;
-
使用
Query.count
查询集合中的项目总数。 -
/users?offset=40&limit=10
的响应示例{ "data": [ { "id": 41, "name": "Pat Thomas" }, { "id": 42, "name": "Mulatu Astatke" } ], "meta": { "offset": 40, "total_items": 42, "total_pages": 5, "page_number": 5 } }
分页非标量结果
要分页不返回 标量 的查询,在调用 paginate
时指定 scalars=False
from fastapi import APIRouter
from fastapi_sqla import Base, Page, Paginate
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.orm import relationship
router = APIRouter()
class User(Base):
__tablename__ = "user"
notes = relationship("Note")
class Note(Base):
__tablename__ = "note"
class UserModel(BaseModel):
id: int
name: str
notes_count: int
@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: Paginate):
query = (
select(User.id, User.name, func.count(Note.id).label("notes_count"))
.join(Note)
.group_by(User)
)
return paginate(query, scalars=False)
自定义分页
您可以自定义
- 每页的最小和最大项目数;
- 如何查询集合中的项目总数;
要自定义分页,创建一个使用 fastapi_sqla.Pagination
的依赖项
from fastapi import APIRouter, Depends
from fastapi_sqla import Base, Page, Pagination, Session
from pydantic import BaseModel
from sqlalchemy import func, select
router = APIRouter()
class User(Base):
__tablename__ = "user"
class UserModel(BaseModel):
id: int
name: str
def query_count(session: Session) -> int:
return session.execute(select(func.count()).select_from(User)).scalar()
CustomPaginate = Pagination(min_page_size=5, max_page_size=500, query_count=query_count)
@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: CustomPaginate = Depends()):
return paginate(select(User))
异步分页
当使用异步支持时,使用 AsyncPaginate
依赖项
from fastapi import APIRouter
from fastapi_sqla import Base, Page, AsyncPaginate
from pydantic import BaseModel
from sqlalchemy import select
router = APIRouter()
class User(Base):
__tablename__ = "user"
class UserModel(BaseModel):
id: int
name: str
class Config:
orm_mode = True
@router.get("/users", response_model=Page[UserModel])
async def all_users(paginate: AsyncPaginate):
return await paginate(select(User))
通过创建使用 fastapi_sqla.AsyncPagination
的依赖项来自定义分页
from fastapi import APIRouter, Depends
from fastapi_sqla import Base, Page, AsyncPagination, AsyncSession
from pydantic import BaseModel
from sqlalchemy import func, select
router = APIRouter()
class User(Base):
__tablename__ = "user"
class UserModel(BaseModel):
id: int
name: str
async def query_count(session: AsyncSession) -> int:
result = await session.execute(select(func.count()).select_from(User))
return result.scalar()
CustomPaginate = AsyncPagination(min_page_size=5, max_page_size=500, query_count=query_count)
@router.get("/users", response_model=Page[UserModel])
async def all_users(paginate: CustomPaginate = Depends()):
return await paginate(select(User))
多会话支持
分页支持多个会话。要使用配置了自定义键的会话进行分页
from typing import Annotated
from fastapi import APIRouter, Depends
from fastapi_sqla import (
AsyncPaginateSignature,
AsyncPagination,
Base,
Page,
PaginateSignature,
Pagination,
)
from pydantic import BaseModel
from sqlalchemy import func, select
router = APIRouter()
class User(Base):
__tablename__ = "user"
class UserModel(BaseModel):
id: int
name: str
# Preferred
ReadOnlyPaginate = Annotated[
PaginateSignature, Depends(Pagination(session_key="read_only"))
]
AsyncReadOnlyPaginate = Annotated[
AsyncPaginateSignature, Depends(AsyncPagination(session_key="read_only"))
]
@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: ReadOnlyPaginate):
return paginate(select(User))
@router.get("/async_users", response_model=Page[UserModel])
async def async_all_users(paginate: AsyncReadOnlyPaginate):
return await paginate(select(User))
# Alternative
@router.get("/users/alt", response_model=Page[UserModel])
def all_users_alt(
paginate: PaginateSignature = Depends(
Pagination(session_key="read_only")
),
):
return paginate(select(User))
@router.get("/async_users/alt", response_model=Page[UserModel])
async def async_all_users_alt(
paginate: AsyncPaginateSignature = Depends(
AsyncPagination(session_key="read_only")
),
):
return await paginate(select(User))
SQLModel 支持 🎉
如果你的项目使用 SQLModel,则 Session
依赖项是一个 SQLModel 会话:
from http import HTTPStatus
from fastapi import FastAPI, HTTPException
from fastapi_sqla import Item, Page, Paginate, Session, setup
from sqlmodel import Field, SQLModel, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
secret_name: str
age: int | None = None
app = FastAPI()
setup(app)
@app.get("/heros", response_model=Page[Hero])
def list_hero(paginate: Paginate) -> Page[Hero]:
return paginate(select(Hero))
@app.get("/heros/{hero_id}", response_model=Item[Hero])
def get_hero(hero_id: int, session: Session) -> Item[Hero]:
hero = session.get(Hero, hero_id)
if hero is None:
raise HTTPException(HTTPStatus.NOT_FOUND)
return {"data": hero}
Pytest 固定值
此库通过其 PyTest 插件提供了一组实用固定值,该插件与库自动安装。使用插件需要 pytest_plugin
扩展。
默认情况下,在运行测试时不会实际将记录写入数据库。目前无法更改此行为。
sqla_modules
必须定义此固定值,以便插件可以在你的 SQLAlchemy 实体中反映表元数据。它应该只导入包含 SQLAlchemy 模型的所有应用程序模块。
示例
# tests/conftest.py
from pytest import fixture
@fixture
def sqla_modules():
from app import sqla # noqa
db_url
要使用的数据库 URL。
当在环境变量中设置 CI
键时,默认使用 postgres
作为主机名
postgresql://postgres@postgres/postgres
在其他情况下,主机设置为 localhost
postgresql://postgres@localhost/postgres
当然,你可以通过覆盖固定值来覆盖它
from pytest import fixture
@fixture(scope="session")
def db_url():
return "postgresql://postgres@localhost/test_database"
async_sqlalchemy_url
使用 asyncio
支持时使用的数据库 URL。默认为 db_url
固定值,带有 postgresql+asyncpg://
方案。
session
& async_session
用于创建 db 固定值的 Sqla 会话
- 在测试设置或测试期间所做的所有更改将在测试清理时回滚;
- 实际上不会将任何记录写入数据库;
- 一个常规会话中的更改需要提交才能从其他常规会话中可用;
- 一个异步会话中的更改需要提交才能从其他异步会话中可用;
- 常规会话的更改即使在提交后也无法从
async
会话中获取,反之亦然;
示例
from pytest import fixture
@fixture
def patient(session):
from er.sqla import Patient
patient = Patient(first_name="Bob", last_name="David")
session.add(patient)
session.commit()
return patient
@fixture
async def doctor(async_session):
from er.sqla import Doctor
doctor = Doctor(name="who")
async_session.add(doctor)
await async_session.commit()
return doctor
db_migration
一个会话范围固定值,在测试会话设置时运行 alembic upgrade
,在清理时运行 alembic downgrade
。
它依赖于 alembic_ini_path
固定值来获取 alembic.ini
文件的路径。
在测试或测试模块中使用
from pytest import mark
pytestmark = mark.usefixtures("db_migration")
要在全局范围内使用,请将其添加到 pytest 选项
[pytest]
usefixtures =
db_migration
或在顶层 conftest.py
中依赖它,并将其标记为 auto-used
from pytest import fixture
@fixture(scope="session", autouse=True)
def db_migration(db_migration):
pass
alembic_ini_path
它返回 alembic.ini
配置文件的路径。默认情况下,它返回 ./alembic.ini
。
开发
先决条件
- Python >=3.9
- Poetry 用于安装包依赖项。
- 可访问的 postgres 数据库
postgresql://postgres@localhost/postgres
设置
$ poetry install --all-extras
运行测试
$ poetry run pytest
在多个环境中运行测试
$ poetry run tox
项目详情
下载文件
下载适合您平台的文件。如果您不确定该选择哪个,请了解更多关于安装包的信息。