基于 sqlalchemy 构建的 PostgreSQL 数据库查询和操作实用程序的集合
项目描述
pg-database-utils
基于 sqlalchemy 构建的 PostgreSQL 数据库查询和操作实用程序的集合。
此库包括对以下内容的支持
TSVECTOR
、JSON
和JSONB
索引(适用于 PostgreSQL 版本 9.5+)- 生成列(适用于 PostgreSQL 版本 12+)
- 可选的 Django 数据库配置,适用于 Django 项目
它还包括
- 使最常见的DDL查询更易读的辅助工具
- 用于查询 JSON 和 TSVECTOR 列的高效函数
- 支持从现有表和/或
VALUES
子句执行SELECT INTO
查询 - 支持需要应用程序逻辑的
UPDATE
查询
安装
使用以下方式安装
pip install pg-database-utils
配置
本项目的目标是使配置变得简单。如果您已经在Django中定义了数据库连接,则可以重用它们;否则,您可以在不依赖Django的情况下自行配置。
使用Django进行配置
如果您想使用“默认”数据库,无需配置。
如果您想指定从Django读取设置的特定数据库
- 创建一个包含数据库名称的JSON配置文件
{
"django-db-key": "other", # To override DATABASES["default"]
"connect-args": {"sslmode": "require"} # To override DATABASES["<db_name>"]["OPTIONS"]
}
- 设置
DATABASE_CONFIG_JSON
环境变量以指向文件的位置
注意:“django-db-key”在JSON文件中优先于所有其他数据库连接设置。如果您指定了Django数据库,将使用那些数据库连接设置。
不使用Django进行配置
- 创建一个包含至少所需设置的JSON配置文件(例如,
database-name
)
{
"database-name": "required", # Name of the database to query
"database-engine": "optional", # Defaults to postgres
"database-host": "optional", # Defaults to 127.0.0.1
"database-port": "optional", # Defaults to 5432
"database-user": "optional", # Defaults to postgres
"database-password": "optional" # For trusted users like postgres
}
- 设置
DATABASE_CONFIG_JSON
环境变量以指向文件的位置
带或不带Django的额外配置
额外的配置选项包括
{
"connect-args": {"sslmode": "require"}, # Defaults to postgres settings, "prefer" by default
"date-format": "optional", # Defaults to "%Y-%m-%d"
"timestamp-format": "optional", # Defaults to "%Y-%m-%d %H:%M:%S"
"pooling-args": { # To override sqlalchemy pooling config
"max_overflow": 0, # Defaults to 10 connections beyond pool size
"pool_recycle": 60, # Defaults to no timeout (-1) in seconds
"pool_size": 20, # Defaults to 5 connections
"pool_timeout": 30 # Defaults to 30 seconds
}
}
注意:“date-format”和“timestamp-format”必须与PostgreSQL中配置的格式兼容。
用法
该库旨在使常见的数据库操作变得简单易读,因此大多数实用函数都设计为可以接受字符串或sqlalchemy
对象作为参数。
模式工具
- 创建和关联表
from pg_database import schema
my_table = schema.create_table(
"my_table",
dropfirst=True,
index_cols={"id": "unique"},
id="int", name="int", addr="text", geom="bytea", deleted="bool"
)
schema.create_index(my_table, "name", index_op="unique")
schema.create_table("other_table", id="int", my_table_id="int", val="text")
schema.create_foreign_key("other_table", "my_table_id", "my_table.id")
- 修改表
from pg_database import schema
schema.alter_column_type("my_table", "name", "text")
schema.create_index("my_table", "name", index_op="to_tsvector")
schema.create_column("my_table", "json_col", "jsonb", checkfirst=True)
schema.create_index("my_table", "json_col", index_op="json_full")
# These steps require the postgis extension
schema.alter_column_type("my_table", "geom", "geometry", using="geom::geometry(Polygon,4326)")
schema.create_index("my_table", "geom", index_op="spatial")
- 删除数据库对象
from pg_database import schema
all_tables = schema.get_metadata().tables
other_table = all_tables["other_table"]
schema.drop_foreign_key(other_table, "other_table_my_table_id_fkey")
schema.drop_index("my_table", index_name="my_table_json_col_json_full_idx")
schema.drop_table("my_table")
schema.drop_table(other_table)
SQL工具
- 插入行
import json
from datetime import datetime, timedelta
from pg_database import sql
create_date = datetime.now()
sql.select_into(
"new_table",
[
(1, "one", {}, create_date),
(2, "two", {}, create_date),
(3, "three", {}, create_date)
],
"id,val,json,created",
"int,text,jsonb,date"
)
- 更新行
from pg_database import sql
def update_row(row):
row = list(row)
pk, val, created, jval = row[0], row[1], row[2], row[3]
row[1] = f"{pk} {val} first batch"
row[2] = created + timedelta(days=1)
row[3] = {"id": pk, "val": val, "batch": "first"}
return row
sql.update_rows("new_table", "id", "val,created,json", update_row, batch_size=3)
- 查询行
from pg_database import sql, schema
# Reduce database queries by sending a sqlalchemy table
all_tables = schema.get_metadata().tables
new_table = all_tables["new_table"]
schema.create_index(new_table, "json", index_op="json_path")
schema.create_index(new_table, "val", index_op="to_tsvector")
sql.query_json_keys(new_table, "json", {"batch": "first"})
sql.query_tsvector_columns("new_table", "val", "batch first")
- 为
INSERT
或SELECT INTO
的值子句提供,在执行时具有自定义连接参数
from datetime import datetime
from sqlalchemy import column
from sqlalchemy.sql import Insert, Select
from pg_database import sql, schema
# Prepare data, column names, column types and table name
create_date = datetime.now()
values_data = [
(1, "one", {}, True, create_date),
(2, "two", {}, False, create_date),
(3, "three", {}, 0, create_date)
]
values_names = ["id", "val", "json", "boolean", "created"]
values_types = ["int", "text", "jsonb", "bool", "date"]
values_table = "values_table"
# SELECT INTO to create a new table from raw values using sslmode==require
select_vals = sql.Values(values_names, values_types, *values_data)
select_into = sql.SelectInto([column(c) for c in values_names], values_table)
with schema.get_engine(connect_args={"sslmode": "require"}).connect() as conn:
conn.execute(select_into.select_from(select_vals).execution_options(autocommit=True))
# INSERT INTO to add new records from raw values using custom pooling args
existing_table = schema.get_metadata().tables[values_table]
insert_vals = sql.Values(values_names, values_types, *values_data)
insert_from = Select([column(c) for c in values_names]).select_from(insert_vals)
insert_into = Insert(existing_table).from_select(names=values_names, select=insert_from)
with schema.get_engine(pooling_args={"pool_size": 20, "max_overflow": 0}).connect() as conn:
conn.execute(insert_into.execution_options(autocommit=True))
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源分布
pg_database_utils-1.0.0.tar.gz (36.6 kB 查看散列)
构建分布
pg_database_utils-1.0.0-py3-none-any.whl (39.8 kB 查看散列)