PostgreSQL接口库
项目描述
pg8000
pg8000是一个纯Python的Python PostgreSQL 驱动程序,符合DB-API 2.0。它在Python 3.8+版本、CPython和PyPy上进行了测试,并支持PostgreSQL 12+版本。pg8000的名称源于它可能是Python的第8000个PostgreSQL接口。pg8000采用BSD 3条款许可证分发。
欢迎在http://github.com/tlocke/pg8000/提交所有错误报告、功能请求和贡献。
安装
要使用pip
安装pg8000,请输入:pip install pg8000
本地API交互示例
pg8000提供了两个API,即本地pg8000 API和DB-API 2.0标准API。以下是本地API的示例,DB-API 2.0的示例将在下一节中介绍。
基本示例
导入pg8000,连接到数据库,创建一个表,添加一些行,然后查询该表
>>> import pg8000.native
>>>
>>> # Connect to the database with user name postgres
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> # Create a temporary table
>>>
>>> con.run("CREATE TEMPORARY TABLE book (id SERIAL, title TEXT)")
>>>
>>> # Populate the table
>>>
>>> for title in ("Ender's Game", "The Magus"):
... con.run("INSERT INTO book (title) VALUES (:title)", title=title)
>>>
>>> # Print all the rows in the table
>>>
>>> for row in con.run("SELECT * FROM book"):
... print(row)
[1, "Ender's Game"]
[2, 'The Magus']
>>>
>>> con.close()
事务
这是如何在事务中运行一系列SQL语句的方法(请参阅事务教程)
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("START TRANSACTION")
>>>
>>> # Create a temporary table
>>> con.run("CREATE TEMPORARY TABLE book (id SERIAL, title TEXT)")
>>>
>>> for title in ("Ender's Game", "The Magus", "Phineas Finn"):
... con.run("INSERT INTO book (title) VALUES (:title)", title=title)
>>> con.run("COMMIT")
>>> for row in con.run("SELECT * FROM book"):
... print(row)
[1, "Ender's Game"]
[2, 'The Magus']
[3, 'Phineas Finn']
>>>
>>> con.close()
回滚事务
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> # Create a temporary table
>>> con.run("CREATE TEMPORARY TABLE book (id SERIAL, title TEXT)")
>>>
>>> for title in ("Ender's Game", "The Magus", "Phineas Finn"):
... con.run("INSERT INTO book (title) VALUES (:title)", title=title)
>>>
>>> con.run("START TRANSACTION")
>>> con.run("DELETE FROM book WHERE title = :title", title="Phineas Finn")
>>> con.run("ROLLBACK")
>>> for row in con.run("SELECT * FROM book"):
... print(row)
[1, "Ender's Game"]
[2, 'The Magus']
[3, 'Phineas Finn']
>>>
>>> con.close()
注意。在PostgreSQL服务器中存在一个长期存在的错误,即如果对失败的事务发出COMMIT,事务将被静默回滚,而不是返回错误。pg8000会尝试检测这种情况并引发InterfaceError
。
使用函数进行查询
另一个使用一些PostgreSQL函数的查询
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT TO_CHAR(TIMESTAMP '2021-10-10', 'YYYY BC')")
[['2021 AD']]
>>>
>>> con.close()
区间类型
一个返回PostgreSQL区间类型的查询
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> import datetime
>>>
>>> ts = datetime.date(1980, 4, 27)
>>> con.run("SELECT timestamp '2013-12-01 16:06' - :ts", ts=ts)
[[datetime.timedelta(days=12271, seconds=57960)]]
>>>
>>> con.close()
点类型
使用PostgreSQL点类型的往返操作
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT CAST(:pt as point)", pt=(2.3,1))
[[(2.3, 1.0)]]
>>>
>>> con.close()
客户端编码
当与服务器通信时,pg8000使用服务器请求它使用的字符集(客户端编码)。默认情况下,客户端编码是数据库的字符集(在创建数据库时选择),但可以通过多种方式更改客户端编码(例如,在postgresql.conf
中设置CLIENT_ENCODING
)。更改客户端编码的另一种方式是使用SQL命令。例如
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SET CLIENT_ENCODING TO 'UTF8'")
>>> con.run("SHOW CLIENT_ENCODING")
[['UTF8']]
>>>
>>> con.close()
JSON
JSON始终以反序列化的形式从服务器返回。如果您要发送的JSON是dict
,则可以这样做
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> val = {'name': 'Apollo 11 Cave', 'zebra': True, 'age': 26.003}
>>> con.run("SELECT CAST(:apollo as jsonb)", apollo=val)
[[{'age': 26.003, 'name': 'Apollo 11 Cave', 'zebra': True}]]
>>>
>>> con.close()
JSON可以始终以序列化形式发送到服务器
>>> import json
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>>
>>> val = ['Apollo 11 Cave', True, 26.003]
>>> con.run("SELECT CAST(:apollo as jsonb)", apollo=json.dumps(val))
[[['Apollo 11 Cave', True, 26.003]]]
>>>
>>> con.close()
JSON查询可以有参数
>>> import pg8000.native
>>>
>>> with pg8000.native.Connection("postgres", password="cpsnow") as con:
... con.run(""" SELECT CAST('{"a":1, "b":2}' AS jsonb) @> :v """, v={"b": 2})
[[True]]
从结果中检索列元数据
查找查询返回的列元数据
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("create temporary table quark (id serial, name text)")
>>> for name in ('Up', 'Down'):
... con.run("INSERT INTO quark (name) VALUES (:name)", name=name)
>>> # Now execute the query
>>>
>>> con.run("SELECT * FROM quark")
[[1, 'Up'], [2, 'Down']]
>>>
>>> # and retrieve the metadata
>>>
>>> con.columns
[{'table_oid': ..., 'column_attrnum': 1, 'type_oid': 23, 'type_size': 4, 'type_modifier': -1, 'format': 0, 'name': 'id'}, {'table_oid': ..., 'column_attrnum': 2, 'type_oid': 25, 'type_size': -1, 'type_modifier': -1, 'format': 0, 'name': 'name'}]
>>>
>>> # Show just the column names
>>>
>>> [c['name'] for c in con.columns]
['id', 'name']
>>>
>>> con.close()
通知和通知
PostgreSQL的通知存储在名为Connection.notices
的双端队列中,并使用append()
方法添加。类似地,还有Connection.notifications
用于通知。以下是一个示例
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("LISTEN aliens_landed")
>>> con.run("NOTIFY aliens_landed")
>>> # A notification is a tuple containing (backend_pid, channel, payload)
>>>
>>> con.notifications[0]
(..., 'aliens_landed', '')
>>>
>>> con.close()
参数状态
某些参数值在连接启动时或其值更改时会由服务器自动报告,并存储在名为Connection.parameter_statuses
的字典中。以下是一个示例,其中我们设置了aplication_name
参数,然后从parameter_statuses
中读取它
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection(
... "postgres", password="cpsnow", application_name='AGI')
>>>
>>> con.parameter_statuses['application_name']
'AGI'
>>>
>>> con.close()
LIMIT ALL
您可能认为以下操作会成功,但实际上它失败了
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT 'silo 1' LIMIT :lim", lim='ALL')
Traceback (most recent call last):
pg8000.exceptions.DatabaseError: ...
>>>
>>> con.close()
相反,文档指出,您可以将null
发送为ALL
的替代方案,这确实可行
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT 'silo 1' LIMIT :lim", lim=None)
[['silo 1']]
>>>
>>> con.close()
IN和NOT IN
您可能认为以下操作会成功,但实际上服务器不喜欢它
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT 'silo 1' WHERE 'a' IN :v", v=['a', 'b'])
Traceback (most recent call last):
pg8000.exceptions.DatabaseError: ...
>>>
>>> con.close()
解决此问题的最直接方法是使用ANY
函数重写查询
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT 'silo 1' WHERE 'a' = ANY(:v)", v=['a', 'b'])
[['silo 1']]
>>> con.close()
然而,使用ANY
的数组变体可能会导致性能问题,因此您可以使用IN
的子查询变体与unnest函数一起使用
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run(
... "SELECT 'silo 1' WHERE 'a' IN (SELECT unnest(CAST(:v as varchar[])))",
... v=['a', 'b'])
[['silo 1']]
>>> con.close()
并且您可以为NOT IN
做同样的事情。
许多SQL语句无法参数化
在PostgreSQL中,参数只能用于数据值,而不是标识符。有时这可能不会按预期工作,例如以下操作会失败
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> channel = 'top_secret'
>>>
>>> con.run("LISTEN :channel", channel=channel)
Traceback (most recent call last):
pg8000.exceptions.DatabaseError: ...
>>>
>>> con.close()
它失败是因为PostgreSQL服务器不允许此语句有任何参数。有许多SQL语句可能会被认为可以参数化,但实际上不能。对于这些情况,SQL必须手动创建,并注意使用identifier()
和literal()
函数来转义值,以避免SQL注入攻击
>>> from pg8000.native import Connection, identifier, literal
>>>
>>> con = Connection("postgres", password="cpsnow")
>>>
>>> channel = 'top_secret'
>>> payload = 'Aliens Landed!'
>>> con.run(f"LISTEN {identifier(channel)}")
>>> con.run(f"NOTIFY {identifier(channel)}, {literal(payload)}")
>>>
>>> con.notifications[0]
(..., 'top_secret', 'Aliens Landed!')
>>>
>>> con.close()
从流中复制到和从流中复制
SQL COPY 语句可以用来从文件或类似文件的对象中复制数据。以下是一个使用CSV格式的示例
>>> import pg8000.native
>>> from io import StringIO
>>> import csv
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> # Create a CSV file in memory
>>>
>>> stream_in = StringIO()
>>> csv_writer = csv.writer(stream_in)
>>> csv_writer.writerow([1, "electron"])
12
>>> csv_writer.writerow([2, "muon"])
8
>>> csv_writer.writerow([3, "tau"])
7
>>> stream_in.seek(0)
0
>>>
>>> # Create a table and then copy the CSV into it
>>>
>>> con.run("CREATE TEMPORARY TABLE lepton (id SERIAL, name TEXT)")
>>> con.run("COPY lepton FROM STDIN WITH (FORMAT CSV)", stream=stream_in)
>>>
>>> # COPY from a table to a stream
>>>
>>> stream_out = StringIO()
>>> con.run("COPY lepton TO STDOUT WITH (FORMAT CSV)", stream=stream_out)
>>> stream_out.seek(0)
0
>>> for row in csv.reader(stream_out):
... print(row)
['1', 'electron']
['2', 'muon']
['3', 'tau']
>>>
>>> con.close()
还可以从可迭代对象中复制数据,这在您需要以编程方式创建行时非常有用
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> # Generator function for creating rows
>>> def row_gen():
... for i, name in ((1, "electron"), (2, "muon"), (3, "tau")):
... yield f"{i},{name}\n"
>>>
>>> # Create a table and then copy the CSV into it
>>>
>>> con.run("CREATE TEMPORARY TABLE lepton (id SERIAL, name TEXT)")
>>> con.run("COPY lepton FROM STDIN WITH (FORMAT CSV)", stream=row_gen())
>>>
>>> # COPY from a table to a stream
>>>
>>> stream_out = StringIO()
>>> con.run("COPY lepton TO STDOUT WITH (FORMAT CSV)", stream=stream_out)
>>> stream_out.seek(0)
0
>>> for row in csv.reader(stream_out):
... print(row)
['1', 'electron']
['2', 'muon']
['3', 'tau']
>>>
>>> con.close()
执行多个SQL语句
如果您想执行一系列SQL语句(例如.sql文件),您可以像预期的那样运行它们
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> statements = "SELECT 5; SELECT 'Erich Fromm';"
>>>
>>> con.run(statements)
[[5], ['Erich Fromm']]
>>>
>>> con.close()
唯一的限制是,在执行多个语句时,不能有任何参数。
SQL中的引号标识符
假设您有一个名为My Column
的列。由于它是区分大小写的,并且包含空格,因此您必须用双引号包围它。但是您不能这样做
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("select 'hello' as "My Column"")
Traceback (most recent call last):
SyntaxError: invalid syntax...
>>>
>>> con.close()
因为Python使用双引号来界定字符串字面量,所以一个解决方案是使用Python的三引号来界定字符串
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run('''SELECT 'hello' AS "My Column"''')
[['hello']]
>>>
>>> con.close()
另一个解决方案,特别是当标识符来自不受信任的来源时特别有用,是使用identifier()
函数,该函数会正确地引号和转义标识符
>>> from pg8000.native import Connection, identifier
>>>
>>> con = Connection("postgres", password="cpsnow")
>>>
>>> sql = f"SELECT 'hello' as {identifier('My Column')}"
>>> print(sql)
SELECT 'hello' as "My Column"
>>>
>>> con.run(sql)
[['hello']]
>>>
>>> con.close()
这种方法可以防止SQL注入攻击。如果您使用显式模式(例如pg_catalog.pg_language
),请注意,模式名称和表名称都是独立的标识符。因此,要转义它们,您会这样做
>>> from pg8000.native import Connection, identifier
>>>
>>> con = Connection("postgres", password="cpsnow")
>>>
>>> query = (
... f"SELECT lanname FROM {identifier('pg_catalog')}.{identifier('pg_language')} "
... f"WHERE lanname = 'sql'"
... )
>>> print(query)
SELECT lanname FROM pg_catalog.pg_language WHERE lanname = 'sql'
>>>
>>> con.run(query)
[['sql']]
>>>
>>> con.close()
从Python类型到PostgreSQL类型的自定义适配器
当pg8000需要将SQL参数发送到服务器时,它有一个Python类型到PostgreSQL类型的映射。pg8000附带默认映射在大多数情况下都设计得很好,但您可能想要添加或替换默认映射。
Python datetime.timedelta
对象被发送到服务器作为具有oid
1186的PostgreSQL interval
类型。但是假设我们想要创建一个Python类,该类作为interval
类型发送。那么我们必须注册一个适配器
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> class MyInterval(str):
... pass
>>>
>>> def my_interval_out(my_interval):
... return my_interval # Must return a str
>>>
>>> con.register_out_adapter(MyInterval, my_interval_out)
>>> con.run("SELECT CAST(:interval as interval)", interval=MyInterval("2 hours"))
[[datetime.timedelta(seconds=7200)]]
>>>
>>> con.close()
请注意,它仍然作为datetime.timedelta
对象返回,因为我们只更改了从Python到PostgreSQL的映射。下面是一个更改从PostgreSQL到Python映射的示例。
从PostgreSQL类型到Python类型的自定义适配器
当pg8000从服务器接收SQL结果时,它有一个PostgreSQL类型到Python类型的映射。pg8000附带默认映射在大多数情况下都设计得很好,但您可能想要添加或替换默认映射。
如果pg8000收到具有oid
1186的PostgreSQL interval
类型,它会将其转换为Python datetime.timedelta
对象。但是假设我们想要创建一个Python类,该类将用于替代datetime.timedelta
。那么我们必须注册一个适配器
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> class MyInterval(str):
... pass
>>>
>>> def my_interval_in(my_interval_str): # The parameter is of type str
... return MyInterval(my_interval)
>>>
>>> con.register_in_adapter(1186, my_interval_in)
>>> con.run("SELECT \'2 years'")
[['2 years']]
>>>
>>> con.close()
请注意,注册的“in”适配器仅影响从PostgreSQL类型到Python类型的映射。上面有一个更改从PostgreSQL到Python映射的示例。
无法确定参数的数据类型
有时您可能会从服务器收到“无法确定参数的数据类型”错误消息
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT :v IS NULL", v=None)
Traceback (most recent call last):
pg8000.exceptions.DatabaseError: {'S': 'ERROR', 'V': 'ERROR', 'C': '42P18', 'M': 'could not determine data type of parameter $1', 'F': 'postgres.c', 'L': '...', 'R': '...'}
>>>
>>> con.close()
解决方法之一是在SQL中添加一个CAST
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT cast(:v as TIMESTAMP) IS NULL", v=None)
[[True]]
>>>
>>> con.close()
另一种方法是覆盖pg8000与每个参数一起发送的类型
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> con.run("SELECT :v IS NULL", v=None, types={'v': pg8000.native.TIMESTAMP})
[[True]]
>>>
>>> con.close()
预定义语句
预定义语句在您需要重复执行语句以提高性能时非常有用。以下是一个示例
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection("postgres", password="cpsnow")
>>>
>>> # Create the prepared statement
>>> ps = con.prepare("SELECT cast(:v as varchar)")
>>>
>>> # Execute the statement repeatedly
>>> ps.run(v="speedy")
[['speedy']]
>>> ps.run(v="rapid")
[['rapid']]
>>> ps.run(v="swift")
[['swift']]
>>>
>>> # Close the prepared statement, releasing resources on the server
>>> ps.close()
>>>
>>> con.close()
使用环境变量作为连接默认值
例如,您可能想使用当前用户作为数据库用户名
>>> import pg8000.native
>>> import getpass
>>>
>>> # Connect to the database with current user name
>>> username = getpass.getuser()
>>> connection = pg8000.native.Connection(username, password="cpsnow")
>>>
>>> connection.run("SELECT 'pilau'")
[['pilau']]
>>>
>>> connection.close()
或者,您可能想使用libpg使用的某些相同的环境变量
>>> import pg8000.native
>>> from os import environ
>>>
>>> username = environ.get('PGUSER', 'postgres')
>>> password = environ.get('PGPASSWORD', 'cpsnow')
>>> host = environ.get('PGHOST', 'localhost')
>>> port = environ.get('PGPORT', '5432')
>>> database = environ.get('PGDATABASE')
>>>
>>> connection = pg8000.native.Connection(
... username, password=password, host=host, port=port, database=database)
>>>
>>> connection.run("SELECT 'Mr Cairo'")
[['Mr Cairo']]
>>>
>>> connection.close()
可能会问,为什么 pg8000 没有内置这种行为?这种思考遵循了《Python之禅》的第二条格言
显式优于隐式。
因此,我们只允许使用 pg8000.native.Connection()
构造函数来设置连接参数。
通过 SSL 连接到 PostgreSQL
默认情况下,ssl_context
连接参数的值为 None
,这意味着 pg8000 将尝试使用 SSL 连接到服务器,如果服务器拒绝 SSL,则回退到普通套接字。如果您想强制使用 SSL(即如果未实现则失败),则可以将 ssl_context=True
设置
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection('postgres', password="cpsnow", ssl_context=True)
>>> con.run("SELECT 'The game is afoot!'")
[['The game is afoot!']]
>>> con.close()
另一方面,如果您想使用自定义设置通过 SSL 连接,则将 ssl_context
参数设置为 ssl.SSLContext
对象
>>> import pg8000.native
>>> import ssl
>>>
>>> ssl_context = ssl.create_default_context()
>>> ssl_context.check_hostname = False
>>> ssl_context.verify_mode = ssl.CERT_NONE
>>> con = pg8000.native.Connection(
... 'postgres', password="cpsnow", ssl_context=ssl_context)
>>> con.run("SELECT 'Work is the curse of the drinking classes.'")
[['Work is the curse of the drinking classes.']]
>>> con.close()
可能的情况是您的 PostgreSQL 服务器后面有一个 SSL 代理服务器,在这种情况下,您可以通过 sock
参数向 pg8000 提供SSL套接字,然后设置 ssl_context=False
,这意味着不会尝试创建到服务器的 SSL 连接。
服务器端游标
您可以使用 SQL 命令 DECLARE、FETCH、MOVE 和 CLOSE 来操作服务器端游标。例如
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection('postgres', password="cpsnow")
>>> con.run("START TRANSACTION")
>>> con.run("DECLARE c SCROLL CURSOR FOR SELECT * FROM generate_series(1, 100)")
>>> con.run("FETCH FORWARD 5 FROM c")
[[1], [2], [3], [4], [5]]
>>> con.run("MOVE FORWARD 50 FROM c")
>>> con.run("FETCH BACKWARD 10 FROM c")
[[54], [53], [52], [51], [50], [49], [48], [47], [46], [45]]
>>> con.run("CLOSE c")
>>> con.run("ROLLBACK")
>>>
>>> con.close()
BLOB(二进制大对象)
有一组用于操作 BLOB 的 SQL 函数。以下是一个示例
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection('postgres', password="cpsnow")
>>>
>>> # Create a BLOB and get its oid
>>> data = b'hello'
>>> res = con.run("SELECT lo_from_bytea(0, :data)", data=data)
>>> oid = res[0][0]
>>>
>>> # Create a table and store the oid of the BLOB
>>> con.run("CREATE TEMPORARY TABLE image (raster oid)")
>>>
>>> con.run("INSERT INTO image (raster) VALUES (:oid)", oid=oid)
>>> # Retrieve the data using the oid
>>> con.run("SELECT lo_get(:oid)", oid=oid)
[[b'hello']]
>>>
>>> # Add some data to the end of the BLOB
>>> more_data = b' all'
>>> offset = len(data)
>>> con.run(
... "SELECT lo_put(:oid, :offset, :data)",
... oid=oid, offset=offset, data=more_data)
[['']]
>>> con.run("SELECT lo_get(:oid)", oid=oid)
[[b'hello all']]
>>>
>>> # Download a part of the data
>>> con.run("SELECT lo_get(:oid, 6, 3)", oid=oid)
[[b'all']]
>>>
>>> con.close()
复制协议
使用创建连接时的 replication
关键字支持 PostgreSQL 的复制协议
>>> import pg8000.native
>>>
>>> con = pg8000.native.Connection(
... 'postgres', password="cpsnow", replication="database")
>>>
>>> con.run("IDENTIFY_SYSTEM")
[['...', 1, '.../...', 'postgres']]
>>>
>>> con.close()
DB-API 2 交互示例
这些示例遵循 DB-API 2.0 标准。
基本示例
导入pg8000,连接到数据库,创建一个表,添加一些行,然后查询该表
>>> import pg8000.dbapi
>>>
>>> conn = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cursor = conn.cursor()
>>> cursor.execute("CREATE TEMPORARY TABLE book (id SERIAL, title TEXT)")
>>> cursor.execute(
... "INSERT INTO book (title) VALUES (%s), (%s) RETURNING id, title",
... ("Ender's Game", "Speaker for the Dead"))
>>> results = cursor.fetchall()
>>> for row in results:
... id, title = row
... print("id = %s, title = %s" % (id, title))
id = 1, title = Ender's Game
id = 2, title = Speaker for the Dead
>>> conn.commit()
>>>
>>> conn.close()
使用函数进行查询
另一个使用一些PostgreSQL函数的查询
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cursor = con.cursor()
>>>
>>> cursor.execute("SELECT TO_CHAR(TIMESTAMP '2021-10-10', 'YYYY BC')")
>>> cursor.fetchone()
['2021 AD']
>>>
>>> con.close()
区间类型
一个返回PostgreSQL区间类型的查询
>>> import datetime
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cursor = con.cursor()
>>>
>>> cursor.execute("SELECT timestamp '2013-12-01 16:06' - %s",
... (datetime.date(1980, 4, 27),))
>>> cursor.fetchone()
[datetime.timedelta(days=12271, seconds=57960)]
>>>
>>> con.close()
点类型
与 PostgreSQL 点类型进行往返通信
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cursor = con.cursor()
>>>
>>> cursor.execute("SELECT cast(%s as point)", ((2.3,1),))
>>> cursor.fetchone()
[(2.3, 1.0)]
>>>
>>> con.close()
数字参数样式
pg8000 支持所有 DB-API 参数样式。以下是一个使用 'numeric' 参数样式的示例
>>> import pg8000.dbapi
>>>
>>> pg8000.dbapi.paramstyle = "numeric"
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cursor = con.cursor()
>>>
>>> cursor.execute("SELECT array_prepend(:1, CAST(:2 AS int[]))", (500, [1, 2, 3, 4],))
>>> cursor.fetchone()
[[500, 1, 2, 3, 4]]
>>> pg8000.dbapi.paramstyle = "format"
>>>
>>> con.close()
自动提交
遵循 DB-API 规范,默认情况下自动提交是关闭的。可以通过使用连接的 autocommit 属性来打开它
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> con.autocommit = True
>>>
>>> cur = con.cursor()
>>> cur.execute("vacuum")
>>> conn.autocommit = False
>>> cur.close()
>>>
>>> con.close()
客户端编码
当与服务器通信时,pg8000使用服务器请求它使用的字符集(客户端编码)。默认情况下,客户端编码是数据库的字符集(在创建数据库时选择),但可以通过多种方式更改客户端编码(例如,在postgresql.conf
中设置CLIENT_ENCODING
)。更改客户端编码的另一种方式是使用SQL命令。例如
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cur = con.cursor()
>>> cur.execute("SET CLIENT_ENCODING TO 'UTF8'")
>>> cur.execute("SHOW CLIENT_ENCODING")
>>> cur.fetchone()
['UTF8']
>>> cur.close()
>>>
>>> con.close()
JSON
JSON 以序列化形式发送到服务器,并以反序列化形式返回。以下是一个示例
>>> import json
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cur = con.cursor()
>>> val = ['Apollo 11 Cave', True, 26.003]
>>> cur.execute("SELECT cast(%s as json)", (json.dumps(val),))
>>> cur.fetchone()
[['Apollo 11 Cave', True, 26.003]]
>>> cur.close()
>>>
>>> con.close()
JSON查询可以有参数
>>> import pg8000.dbapi
>>>
>>> with pg8000.dbapi.connect("postgres", password="cpsnow") as con:
... cur = con.cursor()
... cur.execute(""" SELECT CAST('{"a":1, "b":2}' AS jsonb) @> %s """, ({"b": 2},))
... for row in cur.fetchall():
... print(row)
[True]
从结果中检索列名
使用从查询中检索到的列名
>>> import pg8000
>>> conn = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> c = conn.cursor()
>>> c.execute("create temporary table quark (id serial, name text)")
>>> c.executemany("INSERT INTO quark (name) VALUES (%s)", (("Up",), ("Down",)))
>>> #
>>> # Now retrieve the results
>>> #
>>> c.execute("select * from quark")
>>> rows = c.fetchall()
>>> keys = [k[0] for k in c.description]
>>> results = [dict(zip(keys, row)) for row in rows]
>>> assert results == [{'id': 1, 'name': 'Up'}, {'id': 2, 'name': 'Down'}]
>>>
>>> conn.close()
从文件到文件或类似文件对象的复制
SQL COPY 语句可以用于从文件或类似文件对象复制到文件或从文件复制到类似文件对象
>>> from io import StringIO
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cur = con.cursor()
>>> #
>>> # COPY from a stream to a table
>>> #
>>> stream_in = StringIO('1\telectron\n2\tmuon\n3\ttau\n')
>>> cur = con.cursor()
>>> cur.execute("create temporary table lepton (id serial, name text)")
>>> cur.execute("COPY lepton FROM stdin", stream=stream_in)
>>> #
>>> # Now COPY from a table to a stream
>>> #
>>> stream_out = StringIO()
>>> cur.execute("copy lepton to stdout", stream=stream_out)
>>> stream_out.getvalue()
'1\telectron\n2\tmuon\n3\ttau\n'
>>>
>>> con.close()
服务器端游标
您可以使用 SQL 命令 DECLARE、FETCH、MOVE 和 CLOSE 来操作服务器端游标。例如
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cur = con.cursor()
>>> cur.execute("START TRANSACTION")
>>> cur.execute(
... "DECLARE c SCROLL CURSOR FOR SELECT * FROM generate_series(1, 100)")
>>> cur.execute("FETCH FORWARD 5 FROM c")
>>> cur.fetchall()
([1], [2], [3], [4], [5])
>>> cur.execute("MOVE FORWARD 50 FROM c")
>>> cur.execute("FETCH BACKWARD 10 FROM c")
>>> cur.fetchall()
([54], [53], [52], [51], [50], [49], [48], [47], [46], [45])
>>> cur.execute("CLOSE c")
>>> cur.execute("ROLLBACK")
>>>
>>> con.close()
BLOB(二进制大对象)
有一组用于操作 BLOB 的 SQL 函数。以下是一个示例
>>> import pg8000.dbapi
>>>
>>> con = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cur = con.cursor()
>>>
>>> # Create a BLOB and get its oid
>>> data = b'hello'
>>> cur = con.cursor()
>>> cur.execute("SELECT lo_from_bytea(0, %s)", [data])
>>> oid = cur.fetchone()[0]
>>>
>>> # Create a table and store the oid of the BLOB
>>> cur.execute("CREATE TEMPORARY TABLE image (raster oid)")
>>> cur.execute("INSERT INTO image (raster) VALUES (%s)", [oid])
>>>
>>> # Retrieve the data using the oid
>>> cur.execute("SELECT lo_get(%s)", [oid])
>>> cur.fetchall()
([b'hello'],)
>>>
>>> # Add some data to the end of the BLOB
>>> more_data = b' all'
>>> offset = len(data)
>>> cur.execute("SELECT lo_put(%s, %s, %s)", [oid, offset, more_data])
>>> cur.execute("SELECT lo_get(%s)", [oid])
>>> cur.fetchall()
([b'hello all'],)
>>>
>>> # Download a part of the data
>>> cur.execute("SELECT lo_get(%s, 6, 3)", [oid])
>>> cur.fetchall()
([b'all'],)
>>>
>>> con.close()
参数限制
PostgreSQL 使用的协议限制了参数数量为 6,5535。以下将产生错误
>>> import pg8000.dbapi
>>>
>>> conn = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cursor = conn.cursor()
>>> SIZE = 100000
>>> cursor.execute(
... f"SELECT 1 WHERE 1 IN ({','.join(['%s'] * SIZE)})",
... [1] * SIZE,
... )
Traceback (most recent call last):
struct.error: 'H' format requires 0 <= number <= 65535
解决此问题的一种方法是用 unnest 函数
>>> import pg8000.dbapi
>>>
>>> conn = pg8000.dbapi.connect(user="postgres", password="cpsnow")
>>> cursor = conn.cursor()
>>> SIZE = 100000
>>> cursor.execute(
... "SELECT 1 WHERE 1 IN (SELECT unnest(CAST(%s AS int[])))",
... [[1] * SIZE],
... )
>>> conn.close()
类型映射
以下表显示了 Python 类型与 PostgreSQL 类型之间的默认映射,反之亦然。
如果 pg8000 识别不到从 PostgreSQL 接收到的类型,它将以 str
类型返回它。这是 pg8000 处理 PostgreSQL enum
和 XML 类型的做法。可以使用适配器来更改默认映射(请参阅示例)。
Python 类型 | PostgreSQL 类型 | 注释 |
---|---|---|
bool | bool | |
int | int4 | |
str | text | |
float | float8 | |
decimal.Decimal | numeric | |
bytes | bytea | |
datetime.datetime(无 tzinfo) | timestamp without timezone | +/-infinity PostgreSQL 值表示为 Python str 值。如果 timestamp 对 datetime.datetime 来说太大,则使用 str 。 |
datetime.datetime(带 tzinfo) | timestamp with timezone | PostgreSQL中的+/-无穷大值表示为Python str 值。如果一个timestamptz 值太大,无法使用datetime.datetime 表示,则使用str 。 |
datetime.date | 日期 | PostgreSQL中的+/-无穷大值表示为Python str 值。如果一个date 值太大,无法使用datetime.date 表示,则使用str 。 |
datetime.time | 无时区的时间 | |
datetime.timedelta | 间隔 | 如果一个interval 值太大,无法使用datetime.timedelta 表示,则使用PGInterval 。 |
None | NULL | |
uuid.UUID | uuid | |
ipaddress.IPv4Address | inet | |
ipaddress.IPv6Address | inet | |
ipaddress.IPv4Network | inet | |
ipaddress.IPv6Network | inet | |
int | xid | |
整数列表 | INT4[] | |
浮点数列表 | FLOAT8[] | |
布尔值列表 | BOOL[] | |
字符串列表 | TEXT[] | |
int | int2vector | 仅从PostgreSQL到Python |
JSON | json, jsonb | Python JSON以Python序列化字符串的形式提供。返回的结果为反序列化的JSON。 |
pg8000.Range | 范围 | PostgreSQL多范围类型是 |
元组 | 复合类型 | 仅从Python到PostgreSQL |
操作理论
在微内核内容忍一个概念,只有当将其移出内核(即,允许竞争性实现)会阻止实现系统的所需功能时。
-- Jochen Liedtke,Liedtke的简约性原则
pg8000设计为每个连接使用一个线程。
pg8000使用PostgreSQL前端/后端协议 (FEBE)与数据库通信。如果没有参数的查询,pg8000使用'简单查询协议'。如果有参数的查询,pg8000使用带有未命名预编译语句的'扩展查询协议'。带有参数的查询步骤如下
-
查询进入。
-
向服务器发送一个PARSE消息以创建一个未命名的预编译语句。
-
向未命名的预编译语句发送一个BIND消息以运行,从而在服务器上产生一个未命名的端口。
-
发送一个EXECUTE消息以从端口中读取所有结果。
也可以使用命名预编译语句。在这种情况下,预编译语句在服务器上持久化,并在pg8000中使用PreparedStatement
对象表示。这意味着PARSE步骤只在前端执行一次,然后只重复BIND和EXECUTE步骤。
尽管有许多PostgreSQL数据类型,但Python中原始数据类型很少。默认情况下,pg8000不在PARSE步骤中发送PostgreSQL数据类型信息,在这种情况下,PostgreSQL假设SQL语句隐含的类型。在某些情况下,PostgreSQL无法确定参数类型,因此可以在SQL中使用显式转换 。
在FEBE协议中,每个查询参数可以根据格式代码以二进制或文本形式发送到服务器。在pg8000中,参数始终以文本形式发送。
偶尔,pg8000和服务器之间的网络连接可能会断开。如果pg8000遇到网络问题,它将引发带有消息network error
和原始异常作为原因 的InterfaceError
。
本地API文档
pg8000.native.Error
是其他错误异常的基类。
pg8000.native.InterfaceError
用于源于pg8000的错误。
pg8000.native.DatabaseError
用于源于服务器的错误。
pg8000.native.Connection(user, host='localhost', database=None, port=5432, password=None, source_address=None, unix_sock=None, ssl_context=None, timeout=None, tcp_keepalive=True, application_name=None, replication=None, sock=None)
创建到PostgreSQL数据库的连接。
- user - 连接到PostgreSQL服务器时使用的用户名。如果您的服务器字符编码不是
ascii
或utf8
,则需要将user
作为字节提供,例如:'my_name'.encode('EUC-JP')
。 - host - 要连接的PostgreSQL服务器的主机名。对于TCP/IP连接,提供此参数是必要的。必须提供
host
或unix_sock
中的一个。默认为localhost
。 - database - 要连接的数据库实例的名称。如果为
None
,则PostgreSQL服务器将假定数据库名称与用户名相同。如果您的服务器字符编码不是ascii
或utf8
,则需要将database
作为字节提供,例如:'my_db'.encode('EUC-JP')
。 - port - PostgreSQL服务器实例的TCP/IP端口。此参数默认为
5432
,这是PostgreSQL TCP/IP服务器的注册通用端口。 - password - 连接到服务器时使用的用户密码。此参数是可选的;如果省略并且数据库服务器请求基于密码的认证,则连接将无法打开。如果提供了此参数但服务器没有请求,则不会发生错误。如果您的服务器字符编码不是
ascii
或utf8
,则需要将password
作为字节提供,例如:'my_password'.encode('EUC-JP')
。 - source_address - 初始化到PostgreSQL服务器的连接的源IP地址。默认为
None
,表示操作系统将选择源地址。 - unix_sock - 访问数据库的UNIX套接字路径,例如:
'/tmp/.s.PGSQL.5432'
。必须提供host
或unix_sock
中的一个。 - ssl_context - 这控制TCP/IP套接字的SSL加密。它可以有四个值
None
,默认值,表示将尝试通过SSL连接,但如果被服务器拒绝,则pg8000将回退到使用普通套接字。True
,表示使用具有最少检查的ssl.SSLContext
进行SSL。False
,表示不尝试创建SSL套接字。- 用于创建SSL连接的
ssl.SSLContext
实例。
- timeout - 这是连接到服务器将超时的时间(以秒为单位)。默认为
None
,表示没有超时。 - tcp_keepalive - 如果为
True
,则使用TCP keepalive。默认为True
。 - application_name - 设置application_name。如果您的服务器字符编码不是
ascii
或utf8
,则需要将值作为字节提供,例如:'my_application_name'.encode('EUC-JP')
。默认为None
,表示服务器将设置应用程序名称。 - replication - 用于在流复制模式中运行。如果您的服务器字符编码不是
ascii
或utf8
,则需要将值作为字节提供,例如:'database'.encode('EUC-JP')
。 - sock - 用于连接的类似套接字的对象。例如,
sock
可以是普通的socket.socket
,也可以表示SSH隧道或可能是SSL代理的ssl.SSLSocket
。如果提供了ssl.SSLContext
,则它将用于尝试从提供的套接字创建SSL套接字。
pg8000.native.Connection.notifications
由数据库连接(通过LISTEN
/ NOTIFY
PostgreSQL命令)接收到的服务器端通知的双端队列。每个列表项是一个包含发出通知的PostgreSQL后端PID、通道和有效负载的三个元素的元组。
pg8000.native.Connection.notices
数据库连接收到的服务器通知的双端队列。
pg8000.native.Connection.parameter_statuses
数据库连接收到的服务器端参数状态的字典。
pg8000.native.Connection.run(sql, stream=None, types=None, **kwargs)
执行一个SQL语句,并返回结果列表。例如
con.run("SELECT * FROM cities where population > :pop", pop=10000)
- sql - 要执行的SQL语句。参数占位符以
:
后跟参数名称的形式出现。 - stream - 用于PostgreSQL的COPY命令。参数的性质取决于SQL命令是
COPY FROM
还是COPY TO
。COPY FROM
- stream参数必须是可读的文件样对象或可迭代对象。如果是可迭代对象,则项目可以是str
或二进制。COPY TO
- stream参数必须是可写的文件样对象。
- types - oids的字典。一个键对应一个参数。
- kwargs - SQL语句的参数。
pg8000.native.Connection.row_count
该只读属性包含最后一次run()
方法生成的行数(对于像SELECT
这样的查询语句)或影响的行数(对于像UPDATE
这样的修改语句)。
如果值为-1,则
- 尚未执行任何
run()
方法。 - 最后一次
run()
没有与行数相关联。
pg8000.native.Connection.columns
列元数据的列表。列表中的每个项目都是一个包含以下键的字典
- name
- table_oid
- column_attrnum
- type_oid
- type_size
- type_modifier
- format
pg8000.native.Connection.close()
关闭数据库连接。
pg8000.native.Connection.register_out_adapter(typ, out_func)
注册一个用于从pg8000到服务器的类型的类型适配器。
- typ - 适配器所针对的Python类。
- out_func - 一个函数,它接受Python对象并返回服务器所需的格式的字符串表示形式。
pg8000.native.Connection.register_in_adapter(oid, in_func)
注册一个用于从服务器到pg8000的类型的类型适配器。
- oid - 在pg_type系统目录中找到的PostgreSQL类型标识符。
- in_func - 一个函数,它接受PostgreSQL字符串表示形式并返回相应的Python对象。
pg8000.native.Connection.prepare(sql)
返回一个PreparedStatement
对象,该对象代表服务器上的预定义语句。它可以随后被反复执行。
- sql - 要准备的SQL语句。参数占位符以
:
后跟参数名称的形式出现。
pg8000.native.PreparedStatement
连接的pg8000.native.Connection.prepare()
方法返回一个预定义语句对象。它具有以下方法
pg8000.native.PreparedStatement.run(**kwargs)
执行预定义语句,并返回结果元组。
- kwargs - 预定义语句的参数。
pg8000.native.PreparedStatement.close()
关闭预定义语句,释放服务器上持有的预定义语句。
pg8000.native.identifier(ident)
正确地引号和转义字符串,以便用作SQL标识符。
- ident - 要用作SQL标识符的字符串。
pg8000.native.literal(value)
正确地引号和转义值,以便用作SQL文本。
- value - 要用作SQL文本的值。
DB-API 2 文档
属性
pg8000.dbapi.apilevel
支持的DBAPI级别,目前为"2.0"。
pg8000.dbapi.threadsafety
表示 DBAPI 接口支持的线程安全级别的整数常量。对于 pg8000,线程安全值为 1,表示线程可以共享模块但不能共享连接。
pg8000.dbapi.paramstyle
字符串属性,表示接口期望的参数标记格式化类型。默认值为 "format",其中参数按此格式标记:"WHERE name=%s"。
作为 DBAPI 规范的扩展,此值不是常量;它可以更改为以下任何值
- qmark - 问号样式,例如:
WHERE name=?
- numeric - 数字位置样式,例如:
WHERE name=:1
- named - 命名样式,例如:
WHERE name=:paramname
- format - printf 格式代码,例如:
WHERE name=%s
- pyformat - Python 格式代码,例如:
WHERE name=%(paramname)s
pg8000.dbapi.STRING
字符串类型 oid。
pg8000.dbapi.BINARY
pg8000.dbapi.NUMBER
数字类型 oid。
pg8000.dbapi.DATETIME
时间戳类型 oid
pg8000.dbapi.ROWID
ROWID 类型 oid
函数
pg8000.dbapi.connect(user, host='localhost', database=None, port=5432, password=None, source_address=None, unix_sock=None, ssl_context=None, timeout=None, tcp_keepalive=True, applicationa_name=None, replication=None, sock=None)
创建到PostgreSQL数据库的连接。
- user - 连接到PostgreSQL服务器时使用的用户名。如果您的服务器字符编码不是
ascii
或utf8
,则需要将user
作为字节提供,例如:'my_name'.encode('EUC-JP')
。 - host - 要连接的PostgreSQL服务器的主机名。对于TCP/IP连接,提供此参数是必要的。必须提供
host
或unix_sock
中的一个。默认为localhost
。 - database - 要连接的数据库实例的名称。如果为
None
,则PostgreSQL服务器将假定数据库名称与用户名相同。如果您的服务器字符编码不是ascii
或utf8
,则需要将database
作为字节提供,例如:'my_db'.encode('EUC-JP')
。 - port - PostgreSQL服务器实例的TCP/IP端口。此参数默认为
5432
,这是PostgreSQL TCP/IP服务器的注册通用端口。 - password - 连接到服务器时使用的用户密码。此参数是可选的;如果省略并且数据库服务器请求基于密码的认证,则连接将无法打开。如果提供了此参数但服务器没有请求,则不会发生错误。如果您的服务器字符编码不是
ascii
或utf8
,则需要将password
作为字节提供,例如:'my_password'.encode('EUC-JP')
。 - source_address - 初始化到PostgreSQL服务器的连接的源IP地址。默认为
None
,表示操作系统将选择源地址。 - unix_sock - 访问数据库的UNIX套接字路径,例如:
'/tmp/.s.PGSQL.5432'
。必须提供host
或unix_sock
中的一个。 - ssl_context - 这控制TCP/IP套接字的SSL加密。它可以有四个值
None
,默认值,表示将尝试通过SSL连接,但如果被服务器拒绝,则pg8000将回退到使用普通套接字。True
,表示使用具有最少检查的ssl.SSLContext
进行SSL。False
,表示不尝试创建SSL套接字。- 用于创建SSL连接的
ssl.SSLContext
实例。
- timeout - 这是连接到服务器将超时的时间(以秒为单位)。默认为
None
,表示没有超时。 - tcp_keepalive - 如果为
True
,则使用TCP keepalive。默认为True
。 - application_name - 设置application_name。如果您的服务器字符编码不是
ascii
或utf8
,则需要将值作为字节提供,例如:'my_application_name'.encode('EUC-JP')
。默认为None
,表示服务器将设置应用程序名称。 - replication - 用于在流复制模式中运行。如果您的服务器字符编码不是
ascii
或utf8
,则需要将值作为字节提供,例如:'database'.encode('EUC-JP')
。 - sock - 用于连接的类似套接字的对象。例如,
sock
可以是普通的socket.socket
,也可以表示SSH隧道或可能是SSL代理的ssl.SSLSocket
。如果提供了ssl.SSLContext
,则它将用于尝试从提供的套接字创建SSL套接字。
pg8000.dbapi.Date(year, month, day)
构造一个包含日期值的对象。
此属性是 DBAPI 2.0 规范 <https://pythonlang.cn/dev/peps/pep-0249/>
的一部分。
返回:datetime.date
pg8000.dbapi.Time(hour, minute, second)
构造一个包含时间值的对象。
返回:datetime.time
pg8000.dbapi.Timestamp(year, month, day, hour, minute, second)
构造一个包含时间戳值的对象。
返回:datetime.datetime
pg8000.dbapi.DateFromTicks(ticks)
从给定的 tick 值(自纪元以来的秒数)构造一个包含日期值的对象。
返回:datetime.datetime
pg8000.dbapi.TimeFromTicks(ticks)
从给定的 tick 值(自纪元以来的秒数)构造一个包含时间值的对象。
返回:datetime.time
pg8000.dbapi.TimestampFromTicks(ticks)
从给定的 tick 值(自纪元以来的秒数)构造一个包含时间戳值的对象。
返回:datetime.datetime
pg8000.dbapi.Binary(value)
构造一个包含二进制数据的对象。
返回:bytes
通用异常
Pg8000 使用标准 DBAPI 2.0 异常树作为 "通用" 异常。通常,会引发更具体的异常类型;这些特定异常类型是从通用异常派生出来的。
pg8000.dbapi.Warning
对于重要的数据库警告(如数据截断)引发的一般异常。此异常目前未由 pg8000 使用。
pg8000.dbapi.Error
是所有其他错误异常的基异常的一般异常。
pg8000.dbapi.InterfaceError
对于与数据库接口相关而非数据库本身的错误引发的一般异常。例如,如果接口尝试使用 SSL 连接但服务器拒绝,将引发 InterfaceError。
pg8000.dbapi.DatabaseError
对于与数据库相关的错误引发的一般异常。此异常目前从未由 pg8000 引发。
pg8000.dbapi.DataError
对于由于处理数据中的问题而引发的错误引发的一般异常。此异常目前未由 pg8000 引发。
pg8000.dbapi.OperationalError
对于与数据库操作相关而非程序员的控制相关的错误引发的一般异常。此异常目前从未由 pg8000 引发。
pg8000.dbapi.IntegrityError
当数据库的关联完整性受到影响时引发的一般异常。此异常目前未由 pg8000 引发。
pg8000.dbapi.InternalError
当数据库遇到内部错误时引发的一般异常。这仅在 pg8000 接口本身发生意外状态时引发,通常是接口错误的结果。
pg8000.dbapi.ProgrammingError
通用异常由编程错误引发。例如,如果查询字符串中的参数字段多于可用的参数,则会引发此异常。
pg8000.dbapi.NotSupportedError
如果使用了数据库不支持的方法或数据库API,则会引发通用异常。
类
pg8000.dbapi.Connection
pg8000.dbapi.connect()
函数返回一个连接对象。它表示与PostgreSQL数据库的单个物理连接。
pg8000.dbapi.Connection.autocommit
遵循DB-API规范,默认情况下自动提交是关闭的。可以通过将此特定于pg8000的布尔自动提交属性设置为True
来打开它。
pg8000.dbapi.Connection.close()
关闭数据库连接。
pg8000.dbapi.Connection.cursor()
创建一个与该连接绑定的 pg8000.dbapi.Cursor
对象。
pg8000.dbapi.Connection.rollback()
回滚当前数据库事务。
pg8000.dbapi.Connection.tpc_begin(xid)
使用给定的交易ID xid开始一个TPC事务。此方法应在事务之外调用(即自上次 commit()
或 rollback()
以来没有执行任何操作。此外,在TPC事务中调用 commit()
或 rollback()
是错误的。如果在活动的TPC事务期间应用程序调用 commit()
或 rollback()
,将引发 ProgrammingError
。
pg8000.dbapi.Connection.tpc_commit(xid=None)
当不带参数调用时,tpc_commit()
提交先前使用 tpc_prepare()
准备的TPC事务。如果在调用 tpc_prepare()
之前调用 tpc_commit()
,则执行单阶段提交。如果只有一个资源参与全局事务,事务管理器可能会选择这样做。
当使用交易ID xid
调用时,数据库提交给定的交易。如果提供了无效的交易ID,将引发 ProgrammingError
。应在外部调用此形式,并用于恢复。
返回时,TPC事务结束。
pg8000.dbapi.Connection.tpc_prepare()
执行使用 .tpc_begin()
开始的事务的第一阶段。如果在此方法调用之外调用此方法,将引发 ProgrammingError
。
在调用 tpc_prepare()
之后,在调用 tpc_commit()
或 tpc_rollback()
之前不能执行任何语句。
pg8000.dbapi.Connection.tpc_recover()
返回一个适合与 tpc_commit(xid)
或 tpc_rollback(xid)
一起使用的挂起交易ID列表。
pg8000.dbapi.Connection.tpc_rollback(xid=None)
当不带参数调用时,tpc_rollback()
回滚TPC事务。它可以在调用 tpc_prepare()
之前或之后调用。
当使用交易ID xid
调用时,它回滚给定的交易。如果提供了无效的交易ID,将引发 ProgrammingError
。应在外部调用此形式,并用于恢复。
返回时,TPC事务结束。
pg8000.dbapi.Connection.xid(format_id, global_transaction_id, branch_qualifier)
创建交易ID(在pg中仅使用全局交易ID),format_id 和 branch_qualifier 在postgres中未使用,global_transaction_id 可以是postgres支持的任何字符串标识符,返回一个元组 (format_id, global_transaction_id, branch_qualifier)
pg8000.dbapi.Cursor
连接的 pg8000.dbapi.Connection.cursor()
方法返回一个游标对象。它具有以下属性和方法
pg8000.dbapi.Cursor.arraysize
此读写属性指定使用 pg8000.dbapi.Cursor.fetchmany()
一次获取的行数。默认值为1。
pg8000.dbapi.Cursor.connection
此只读属性包含对创建游标的连接对象(pg8000.dbapi.Connection
实例)的引用。
pg8000.dbapi.Cursor.rowcount
此只读属性包含最后 execute()
或 executemany()
方法产生的行数(对于查询语句如 SELECT
)或影响的行数(对于修改语句如 UPDATE
。
如果值为-1,则
- 该游标尚未执行过任何
execute()
或executemany()
方法。 - 上一次
execute()
没有相关联的行数。 - 在
executemany()
执行的语句中,至少有一个没有行数相关联。
pg8000.dbapi.Cursor.description
这是一个只读属性,它是一个包含7个元素的序列。每个值包含描述一个结果列的信息。每个列返回的7个元素是(名称,类型代码,显示大小,内部大小,精度,缩放,null_ok)。只有前两个值由当前实现提供。
pg8000.dbapi.Cursor.close()
关闭游标。
pg8000.dbapi.Cursor.execute(operation, args=None, stream=None)
执行数据库操作。参数可以是序列,也可以是映射,具体取决于 pg8000.dbapi.paramstyle
的值。返回游标,可以迭代。
- operation - 要执行的SQL语句。
- args - 如果
pg8000.dbapi.paramstyle
是qmark
、numeric
或format
,则此参数应是一个参数数组,用于绑定到语句中。如果pg8000.dbapi.paramstyle
是named
,则参数应是一个映射参数的dict
。如果pg8000.dbapi.paramstyle
是pyformat
,则参数值可以是数组或映射。 - stream - 这是pg8000用于与PostgreSQL COPY 命令的扩展。对于
COPY FROM
,参数必须是一个可读的类似文件对象,对于COPY TO
,它必须是可写的。
pg8000.dbapi.Cursor.executemany(operation, param_sets)
准备一个数据库操作,然后对提供的所有参数序列或映射执行。
- operation - 要执行的SQL语句。
- parameter_sets - 执行语句的参数序列。序列中的值应该是参数序列或映射,与
pg8000.dbapi.Cursor.execute()
方法的 args 参数相同。
pg8000.dbapi.Cursor.callproc(procname, parameters=None)
使用给定的名称和可选参数调用存储的数据库过程。
- procname - 要调用的过程的名称。
- parameters - 参数列表。
pg8000.dbapi.Cursor.fetchall()
获取查询结果的剩余所有行。
返回:一个序列,每个条目都是一个由组成行的字段值序列。
pg8000.dbapi.Cursor.fetchmany(size=None)
获取查询结果的下一组行。
- size - 当调用时获取的行数。如果没有提供,则使用
pg8000.dbapi.Cursor.arraysize
属性值。
返回:一个序列,每个条目都是一个由组成行的字段值序列。如果没有更多的行可用,则返回一个空序列。
pg8000.dbapi.Cursor.fetchone()
获取查询结果集的下一行。
返回:一个行,作为字段值的序列,如果没有更多的行可用,则返回 None
。
pg8000.dbapi.Cursor.setinputsizes(*sizes)
用于设置下一个查询的参数类型。如果在参数本身中难以确定 pg8000 的类型时很有用(例如,对于类型为 None 的参数)。
- sizes - 位置参数,可以是发送的参数的 Python 类型,也可以是 PostgreSQL oid。常见的 oid 可以作为常数使用,如
pg8000.STRING
、pg8000.INTEGER
、pg8000.TIME
等。
pg8000.dbapi.Cursor.setoutputsize(size, column=None)
未实现。
pg8000.dbapi.Interval
Interval 表示时间的度量。在 PostgreSQL 中,间隔以月、日和微秒来定义;因此,pg8000 间隔类型表示相同的信息。
请注意,pg8000.dbapi.Interval.microseconds
、pg8000.dbapi.Interval.days
和 pg8000.dbapi.Interval.months
属性的值是独立测量的,不能相互转换。一个月可能是 28、29、30 或 31 天,而一天有时会因闰秒而略有增加。
设计决策
对于Range
类型,构造函数遵循PostgreSQL范围构造函数,这使得[闭开,开)表示起来最简单。
>>> from pg8000.types import Range
>>>
>>> pg_range = Range(2, 6)
测试
-
安装
tox
:pip install tox
-
通过运行SQL命令启用PostgreSQL hstore扩展:
create extension hstore;
-
在
pg_hba.conf
中添加一行以设置各种认证选项
host pg8000_md5 all 127.0.0.1/32 md5
host pg8000_gss all 127.0.0.1/32 gss
host pg8000_password all 127.0.0.1/32 password
host pg8000_scram_sha_256 all 127.0.0.1/32 scram-sha-256
host all all 127.0.0.1/32 trust
-
在
postgresql.conf
中将密码加密设置为scram-sha-256
:password_encryption = 'scram-sha-256'
-
为postgres用户设置密码:
ALTER USER postgresql WITH PASSWORD 'pw';
-
从
pg8000
目录运行tox
:tox
这将运行测试,针对虚拟环境中的Python版本、机器上的版本以及监听端口5432的安装的PostgreSQL版本,或如果设置了PGPORT
环境变量,则针对该变量。
基准测试作为测试套件的一部分在tests/test_benchmarks.py
中运行。
发布pg8000版本
运行tox
以确保所有测试通过,然后更新发行说明,然后执行
git tag -a x.y.z -m "version x.y.z"
rm -r dist
python -m build
twine upload dist/*
发行说明
版本1.31.2,2024-04-28
- 修复了
parameter_statuses
在非ascii编码时失败的bug。 - 添加了对Python 3.12的支持
版本1.31.1,2024-04-01
- 迁移到src样式布局,并且对于打包使用Hatch而不是setuptools。这意味着如果源分发版中添加了目录(如操作系统分发的打包所需),仍然可以构建包。
版本1.31.0,2024-03-31
- 现在
ssl_context
连接参数可以取四个值之一- None - 默认值,意味着它将尝试通过SSL连接,但如果不成功,则回退到普通套接字。
- True - 将尝试通过SSL连接,如果不成功,则失败。
- False - 它将不会尝试通过SSL连接。
- SSLContext对象 - 它将使用此对象通过SSL连接。
版本1.30.5,2024-02-22
- 修复了现在参数数量可以高达无符号16位整数所能达到的数量的bug。
版本1.30.4,2024-01-03
- 添加了对更多范围和多范围类型的支持。
- 将
Connection.parameter_statuses
属性从dequeue
更改为dict
。
版本1.30.3,2023-10-31
- 修复了PG日期溢出Python类型的bug。现在如果我们无法解析它,则返回从服务器获得的
str
。
版本1.30.2,2023-09-17
- 修复了不支持美元引号字符串常量的bug。
版本1.30.1,2023-07-29
- 由于README.rst的标记无效,之前版本(1.30.0)上传到PyPI时出现了问题。现在在自动化测试中增加了一个步骤来检查这一点。
版本1.30.0,2023-07-27
- 移除对Python 3.7的支持
- 添加一个
sock
关键字参数,用于从预配置的套接字创建连接。
版本1.29.8,2023-06-16
- 范围在旧版API中不起作用。
版本1.29.7,2023-06-16
- 添加了对PostgreSQL
range
和multirange
类型的支持。之前pg8000会将它们作为字符串返回,但现在它们作为Range
和Range
列表返回。 - PostgreSQL的
record
类型现在作为字符串的tuple
返回,而之前它作为单个字符串返回。
版本1.29.6,2023-05-29
- 修复了复合类型中的两个bug。空值应该表示为空字符串,在复合类型的数组中,元素应该被双引号包围。
版本1.29.5,2023-05-09
- 修复了pg8000未处理从套接字接收的字节数少于请求的情况的bug。这被解释为网络错误,但实际上我们只需要等待更多的字节数可用。
- 当使用
PGInterval
类型时,如果服务器响应中包含周期millennium
,则无法识别。这是由于我们使用了millenium
而不是正确的拼写millennium
。 - 增加了发送PostgreSQL组合类型的功能。如果值以
tuple
发送,pg8000将将其作为以(
分隔的组合字符串发送到服务器。
版本1.29.4,2022-12-14
- 修复了
pg8000.dbapi
中的setinputsizes()
方法中的bug,其中如果size
是可识别的Python类型,则方法会失败。
版本1.29.3,2022-10-26
- 将SCRAM库升级到版本1.4.3。这增加了对客户端支持通道绑定但服务器不支持的情况的支持。
版本1.29.2,2022-10-09
- 修复了在文字数组中,如
\n
和\r
等项在发送到服务器之前未正确转义的bug。 - 修复了如果PostgreSQL服务器设置了半小时时区,则
timestamp with time zone
类型的值失败的bug。通过使用dateutil
包的parse
函数(如果datetime
解析器失败)来修复。
版本1.29.1,2022-05-23
- 在尝试确定是否发生了失败的提交时,检查
ROLLBACK TO SAVEPOINT
。
版本1.29.0,2022-05-21
- 针对静默失败的提交bug实现了一个绕过方法。
- 之前如果发送空字符串作为查询会引发异常,但现在不再这样做。
版本1.28.3,2022-05-18
- 恢复了一些意外删除的
__version__
属性。
版本1.28.2,2022-05-17
- 使用符合PEP517的构建系统。
版本1.28.1,2022-05-17
- 如果在执行
COPY FROM
时,stream
参数是一个str
的迭代器,则pg8000曾经会静默地在末尾添加一个换行符。现在不再发生这种情况。
版本1.28.0,2022-05-17
- 当使用
COPY FROM
SQL语句时,允许stream
参数是一个可迭代对象。
版本1.27.1,2022-05-16
PGInterval
的seconds
属性现在是始终为float
,以处理分数秒。- 更新了
iso_8601
和sql_standard
的interval
解析器,以考虑分数秒。
版本1.27.0,2022-05-16
- 以前,如果pg8000从服务器接收一个超出
datetime.timedelta
容量的interval
类型,则会引发异常。现在如果间隔太大,无法放入datetime.timedelta
,则返回一个PGInterval
。 - pg8000现在支持所有
interval
的输出格式(postgres
、postgres_verbose
、iso_8601
和sql_standard
)。
版本1.26.1,2022-04-23
- 确保在提交时通过GitHub Actions测试运行所有测试。
- 移除对Python 3.6的支持
- 移除对PostgreSQL 9.6的支持
版本1.26.0,2022-04-18
- 在连接时,引发
InterfaceError('network error')
而不是让底层的struct.error
浮出。 - 将许可文本与OSI使用的许可文本保持一致。之前,许可文本与BSD 3 Clause许可有轻微差异。这意味着自动化工具没有将其识别为开源。相信这些更改根本不会改变许可证的含义。
版本1.25.0,2022-04-17
- 修复了更多由于未关闭的套接字而引发
ResourceWarning
的情况。 - 我们现在有一个单个的
InterfaceError
,错误信息为 '网络错误',所有网络错误都使用这个错误,其底层异常保存在异常的cause
中。
版本 1.24.2,2022-04-15
- 为了防止
ResourceWarning
,如果无法建立连接,则关闭套接字。
版本 1.24.1,2022-03-02
- 将 pg +/-infinity 日期作为
str
返回。之前,当返回 +/-infinity pg 值时会导致错误,但现在我们以字符串形式返回 +/-infinity。
版本 1.24.0,2022-02-06
- 将 SQL 转义函数 identifier() 和 literal() 添加到本地 API 中。当查询无法参数化并且必须使用不受信任的值创建 SQL 字符串时使用。
版本 1.23.0,2021-11-13
- 如果查询没有参数,则将不再解析查询。尽管这样做有性能优势,但主要原因是为了避免查询重写,这可能会引入错误。
版本 1.22.1,2021-11-10
- 修复了 PGInterval 类型中的一个错误,在该错误中, millennia 值的
str()
调用失败。
版本 1.22.0,2021-10-13
- 与在 Postgres 协议的
Parse
步骤中指定 oids 相比,pg8000 现在省略了它们,因此 Postgres 将使用从查询中确定的 oids。这使得 pg8000 代码更简单,而且这也应该使类型匹配的细微差别更加直接。
项目详情
下载文件
下载您平台上的文件。如果您不确定选择哪个,请了解有关 安装包 的更多信息。
源代码分发
构建分发
pg8000-1.31.2.tar.gz的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1ea46cf09d8eca07fe7eaadefd7951e37bee7fabe675df164f1a572ffb300876 |
|
MD5 | d133eebc067dbe2318a861f1589cafee |
|
BLAKE2b-256 | 0fd70554640cbe3e193184796bedb6de23f797c03958425176faf0e694c06eb0 |
pg8000-1.31.2-py3-none-any.whl的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 436c771ede71af4d4c22ba867a30add0bc5c942d7ab27fadbb6934a487ecc8f6 |
|
MD5 | 6785e25d1cf8df0fb15a5451290c2561 |
|
BLAKE2b-256 | 09a02b30d52017c4ced8fc107386666ea7573954eb708bf66121f0229df05d41 |