openai API的官方Python库
项目描述
OpenAI Python API库
OpenAI Python库提供了从任何Python 3.7+应用程序访问OpenAI REST API的便捷方式。该库包括所有请求参数和响应字段的类型定义,并提供了由httpx支持的同步和异步客户端。
它是通过我们的OpenAPI规范和Stainless生成的。
文档
REST API文档可在platform.openai.com找到。该库的完整API可在api.md中找到。
安装
[!IMPORTANT] SDK在v1中重写,并于2023年11月6日发布。请参阅v1迁移指南,其中包含自动更新您代码的脚本。
# install from PyPI
pip install openai
用法
该库的完整API可在api.md中找到。
import os
from openai import OpenAI
client = OpenAI(
# This is the default and can be omitted
api_key=os.environ.get("OPENAI_API_KEY"),
)
chat_completion = client.chat.completions.create(
messages=[
{
"role": "user",
"content": "Say this is a test",
}
],
model="gpt-3.5-turbo",
)
虽然您可以提供一个api_key
关键字参数,但我们建议使用python-dotenv将OPENAI_API_KEY="My API Key"
添加到您的.env
文件中,以便您的API密钥不会存储在源代码控制中。
视觉
使用托管图像
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"{img_url}"},
},
],
}
],
)
使用作为base64编码字符串的图像
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:{img_type};base64,{img_b64_str}"},
},
],
}
],
)
轮询助手
与API交互时,一些操作(如启动运行和向矢量存储添加文件)是异步的,需要时间才能完成。SDK包含辅助函数,这些函数会轮询状态,直到达到最终状态,然后返回结果对象。如果API方法导致可能受益于轮询的操作,将有一个以'_and_poll'结尾的方法版本。
例如,要创建一个运行实例并轮询其达到最终状态,可以运行
run = client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=assistant.id,
)
有关运行的生存周期的更多信息,请参阅运行生命周期文档
批量上传辅助工具
在创建和与矢量存储交互时,您可以使用轮询辅助工具来监视操作状态。为了方便起见,我们还提供批量上传辅助工具,允许您一次上传多个文件。
sample_files = [Path("sample-paper.pdf"), ...]
batch = await client.vector_stores.file_batches.upload_and_poll(
store.id,
files=sample_files,
)
流辅助工具
SDK还包括处理流和处理传入事件的辅助工具。
with client.beta.threads.runs.stream(
thread_id=thread.id,
assistant_id=assistant.id,
instructions="Please address the user as Jane Doe. The user has a premium account.",
) as stream:
for event in stream:
# Print the text from text delta events
if event.type == "thread.message.delta" and event.data.delta.content:
print(event.data.delta.content[0].text)
有关流辅助工具的更多信息,请参阅专门的文档:helpers.md
异步使用
只需导入AsyncOpenAI
而不是OpenAI
,然后在每个API调用中使用await
import os
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
# This is the default and can be omitted
api_key=os.environ.get("OPENAI_API_KEY"),
)
async def main() -> None:
chat_completion = await client.chat.completions.create(
messages=[
{
"role": "user",
"content": "Say this is a test",
}
],
model="gpt-3.5-turbo",
)
asyncio.run(main())
同步和异步客户端之间的功能是相同的。
流响应
我们支持使用服务器端事件(SSE)进行流式响应。
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Say this is a test"}],
stream=True,
)
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
异步客户端使用完全相同的接口。
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
stream = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Say this is a test"}],
stream=True,
)
async for chunk in stream:
print(chunk.choices[0].delta.content or "", end="")
asyncio.run(main())
模块级客户端
[!重要]我们强烈建议实例化客户端实例,而不是依赖于全局客户端。
我们还公开了一个全局客户端实例,其访问方式类似于v1之前的版本。
import openai
# optional; defaults to `os.environ['OPENAI_API_KEY']`
openai.api_key = '...'
# all client options can be configured just like the `OpenAI` instantiation counterpart
openai.base_url = "https://..."
openai.default_headers = {"x-foo": "true"}
completion = openai.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "user",
"content": "How do I output all files in a directory using Python?",
},
],
)
print(completion.choices[0].message.content)
API与标准基于实例的客户端API完全相同。
此功能旨在用于REPL或笔记本中,以实现更快的迭代,而不是在应用程序代码中使用。
我们建议您始终在应用程序代码中实例化客户端(例如,使用client = OpenAI()
),因为
- 很难推断客户端选项的配置位置
- 更改某些客户端选项可能会导致竞态条件
- 更难用于测试目的的模拟
- 无法控制网络连接的清理
使用类型
嵌套请求参数是TypedDicts。响应是Pydantic models,还提供了诸如
- 将序列化回JSON,
model.to_json()
- 转换为字典,
model.to_dict()
类型请求和响应在您的编辑器中提供自动完成和文档。如果要在VS Code中设置类型错误以帮助早期捕获错误,请将python.analysis.typeCheckingMode
设置为basic
。
分页
OpenAI API中的列表方法是分页的。
此库在每个列表响应中提供自动分页迭代器,因此您无需手动请求后续页面
from openai import OpenAI
client = OpenAI()
all_jobs = []
# Automatically fetches more pages as needed.
for job in client.fine_tuning.jobs.list(
limit=20,
):
# Do something with job here
all_jobs.append(job)
print(all_jobs)
或者,异步地
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main() -> None:
all_jobs = []
# Iterate through items across all pages, issuing requests as needed.
async for job in client.fine_tuning.jobs.list(
limit=20,
):
all_jobs.append(job)
print(all_jobs)
asyncio.run(main())
或者,您可以使用.has_next_page()
、.next_page_info()
或.get_next_page()
方法对页面进行更细粒度的控制
first_page = await client.fine_tuning.jobs.list(
limit=20,
)
if first_page.has_next_page():
print(f"will fetch next page using these details: {first_page.next_page_info()}")
next_page = await first_page.get_next_page()
print(f"number of items we just fetched: {len(next_page.data)}")
# Remove `await` for non-async usage.
或者直接处理返回的数据
first_page = await client.fine_tuning.jobs.list(
limit=20,
)
print(f"next page cursor: {first_page.after}") # => "next page cursor: ..."
for job in first_page.data:
print(job.id)
# Remove `await` for non-async usage.
嵌套参数
嵌套参数是使用TypedDict
类型化的字典,例如
from openai import OpenAI
client = OpenAI()
completion = client.chat.completions.create(
messages=[
{
"role": "user",
"content": "Can you generate an example json object describing a fruit?",
}
],
model="gpt-3.5-turbo-1106",
response_format={"type": "json_object"},
)
文件上传
与文件上传对应的请求参数可以传递为bytes
、一个PathLike
实例或一个(filename, contents, media type)
元组。
from pathlib import Path
from openai import OpenAI
client = OpenAI()
client.files.create(
file=Path("input.jsonl"),
purpose="fine-tune",
)
异步客户端使用完全相同的接口。如果您传递一个PathLike
实例,文件内容将自动异步读取。
处理错误
当库无法连接到API(例如,由于网络连接问题或超时)时,将引发openai.APIConnectionError
的子类。
当API返回非成功状态码(即4xx或5xx响应)时,会抛出openai.APIStatusError
的子类,包含status_code
和response
属性。
所有错误都继承自openai.APIError
。
import openai
from openai import OpenAI
client = OpenAI()
try:
client.fine_tuning.jobs.create(
model="gpt-3.5-turbo",
training_file="file-abc123",
)
except openai.APIConnectionError as e:
print("The server could not be reached")
print(e.__cause__) # an underlying Exception, likely raised within httpx.
except openai.RateLimitError as e:
print("A 429 status code was received; we should back off a bit.")
except openai.APIStatusError as e:
print("Another non-200-range status code was received")
print(e.status_code)
print(e.response)
错误代码如下
状态码 | 错误类型 |
---|---|
400 | BadRequestError |
401 | AuthenticationError |
403 | PermissionDeniedError |
404 | NotFoundError |
422 | UnprocessableEntityError |
429 | RateLimitError |
>=500 | InternalServerError |
不适用 | APIConnectionError |
请求ID
有关调试请求的更多信息,请参阅这些文档
SDK中的所有对象响应都提供了一个_request_id
属性,该属性是从响应头中的x-request-id
添加的,以便您快速记录失败的请求并将其报告给OpenAI。
completion = await client.chat.completions.create(
messages=[{"role": "user", "content": "Say this is a test"}], model="gpt-4"
)
print(completion._request_id) # req_123
请注意,与使用_
前缀的其他属性不同,_request_id
属性是公开的。除非文档另有说明,否则所有其他使用_
前缀的属性、方法和模块都是私有的。
重试
某些错误默认自动重试2次,使用短指数退避。默认情况下,连接错误(例如,由于网络连接问题)、408请求超时、409冲突、429速率限制和>=500内部错误都会进行重试。
您可以使用max_retries
选项配置或禁用重试设置
from openai import OpenAI
# Configure the default for all requests:
client = OpenAI(
# default is 2
max_retries=0,
)
# Or, configure per-request:
client.with_options(max_retries=5).chat.completions.create(
messages=[
{
"role": "user",
"content": "How can I get the name of the current day in Node.js?",
}
],
model="gpt-3.5-turbo",
)
超时
默认情况下,请求在10分钟后超时。您可以使用timeout
选项配置此设置,该选项接受一个浮点数或一个httpx.Timeout
对象
from openai import OpenAI
# Configure the default for all requests:
client = OpenAI(
# 20 seconds (default is 10 minutes)
timeout=20.0,
)
# More granular control:
client = OpenAI(
timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)
# Override per-request:
client.with_options(timeout=5.0).chat.completions.create(
messages=[
{
"role": "user",
"content": "How can I list all files in a directory using Python?",
}
],
model="gpt-3.5-turbo",
)
超时时会抛出APITimeoutError
。
请注意,默认情况下,超时的请求会自动重试两次。
高级
日志记录
我们使用标准库logging
模块。
您可以通过将环境变量OPENAI_LOG
设置为debug
来启用日志记录。
$ export OPENAI_LOG=debug
如何判断None
代表null
还是缺失
在API响应中,一个字段可能被显式设置为null
,或者完全缺失;在两种情况下,该字段在这个库中的值都是None
。您可以使用.model_fields_set
区分这两种情况。
if response.my_field is None:
if 'my_field' not in response.model_fields_set:
print('Got json like {}, without a "my_field" key present at all.')
else:
print('Got json like {"my_field": null}.')
访问原始响应数据(例如,头部信息)
可以通过在任何HTTP方法调用前加上.with_raw_response.
来访问“原始”响应对象,例如
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.with_raw_response.create(
messages=[{
"role": "user",
"content": "Say this is a test",
}],
model="gpt-3.5-turbo",
)
print(response.headers.get('X-My-Header'))
completion = response.parse() # get the object that `chat.completions.create()` would have returned
print(completion)
这些方法返回一个LegacyAPIResponse
对象。这是一个旧类,因为我们将在下一个主要版本中对其进行轻微修改。
对于同步客户端,这基本上与以下内容相同,除了content
和text
将是方法而不是属性。在异步客户端中,所有方法都将异步执行。
将提供迁移脚本,并且迁移应该总体上很顺利。
.with_streaming_response
上述接口在您发出请求时会积极读取完整的响应体,这可能并不总是您想要的。
要流式传输响应体,请使用.with_streaming_response
,它需要一个上下文管理器,并且只在您调用.read()
、.text()
、.json()
、.iter_bytes()
、.iter_text()
、.iter_lines()
或.parse()
时读取响应体。在异步客户端中,这些是异步方法。
因此,.with_streaming_response
方法返回不同的APIResponse
对象,而异步客户端返回一个AsyncAPIResponse
对象。
with client.chat.completions.with_streaming_response.create(
messages=[
{
"role": "user",
"content": "Say this is a test",
}
],
model="gpt-3.5-turbo",
) as response:
print(response.headers.get("X-My-Header"))
for line in response.iter_lines():
print(line)
需要上下文管理器,以确保可靠地关闭响应。
制作自定义/未记录的请求
此库针对方便访问已记录的API进行了类型化。
如果您需要访问未记录的端点、参数或响应属性,该库仍然可以使用。
未记录的端点
要向未记录的端点发送请求,您可以使用 client.get
、client.post
和其他 HTTP 动词进行请求。在执行此请求时,客户端选项(如重试)将被尊重。
import httpx
response = client.post(
"/foo",
cast_to=httpx.Response,
body={"my_param": True},
)
print(response.headers.get("x-foo"))
未记录的请求参数
如果您想显式发送额外的参数,可以使用 extra_query
、extra_body
和 extra_headers
请求选项来实现。
未记录的响应属性
要访问未记录的响应属性,您可以访问像 response.unknown_prop
这样的额外字段。您还可以将 Pydantic 模型上的所有额外字段作为字典使用 response.model_extra
获取。
配置 HTTP 客户端
您可以直接覆盖 httpx 客户端 以定制它以适应您的用例,包括
- 代理支持
- 自定义传输
- 额外的 高级 功能
from openai import OpenAI, DefaultHttpxClient
client = OpenAI(
# Or use the `OPENAI_BASE_URL` env var
base_url="http://my.test.server.example.com:8083/v1",
http_client=DefaultHttpxClient(
proxies="http://my.test.proxy.example.com",
transport=httpx.HTTPTransport(local_address="0.0.0.0"),
),
)
您还可以通过使用 with_options()
在每次请求的基础上定制客户端。
client.with_options(http_client=DefaultHttpxClient(...))
管理 HTTP 资源
默认情况下,当客户端被 垃圾回收 时,库会关闭底层 HTTP 连接。如果需要,您可以使用 .close()
方法手动关闭客户端,或者使用在退出时关闭的上下文管理器。
Microsoft Azure OpenAI
要使用此库与 Azure OpenAI,请使用 AzureOpenAI
类而不是 OpenAI
类。
[!IMPORTANT] Azure API 的形状与核心 API 的形状不同,这意味着响应/参数的静态类型不一定总是正确的。
from openai import AzureOpenAI
# gets the API Key from environment variable AZURE_OPENAI_API_KEY
client = AzureOpenAI(
# https://learn.microsoft.com/azure/ai-services/openai/reference#rest-api-versioning
api_version="2023-07-01-preview",
# https://learn.microsoft.com/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource
azure_endpoint="https://example-endpoint.openai.azure.com",
)
completion = client.chat.completions.create(
model="deployment-name", # e.g. gpt-35-instant
messages=[
{
"role": "user",
"content": "How do I output all files in a directory using Python?",
},
],
)
print(completion.to_json())
除了基本 OpenAI
客户端提供的选项之外,还提供了以下选项
azure_endpoint
(或AZURE_OPENAI_ENDPOINT
环境变量)azure_deployment
api_version
(或OPENAI_API_VERSION
环境变量)azure_ad_token
(或AZURE_OPENAI_AD_TOKEN
环境变量)azure_ad_token_provider
有关使用 Microsoft Entra ID(以前称为 Azure Active Directory)使用客户端的示例,请参阅 此处。
版本管理
此软件包通常遵循 SemVer 规范,尽管某些不兼容的更改可能作为次要版本发布。
- 仅影响静态类型而不会破坏运行时行为的更改。
- 对库内部进行更改,技术上公开但未打算或文档化供外部使用。(如果您依赖此类内部,请打开 GitHub 上的问题)。
- 我们预计不会影响大多数用户的实际实践的更改。
我们非常重视向后兼容性,并努力确保您可以依赖平滑的升级体验。
我们渴望您的反馈;请就问题、错误或建议打开 问题。
确定安装的版本
如果您已升级到最新版本但未看到您期望的新功能,那么您的 Python 环境可能仍在使用较旧版本。
您可以使用以下方法确定运行时使用的版本
import openai
print(openai.__version__)
需求
Python 3.7 或更高版本。
项目细节
下载文件
下载适合您平台文件。如果您不确定选择哪个,请了解有关安装包的更多信息。