跳转到主要内容

openai API的官方Python库

项目描述

OpenAI Python API库

PyPI version

OpenAI Python库提供了从任何Python 3.7+应用程序访问OpenAI REST API的便捷方式。该库包括所有请求参数和响应字段的类型定义,并提供了由httpx支持的同步和异步客户端。

它是通过我们的OpenAPI规范Stainless生成的。

文档

REST API文档可在platform.openai.com找到。该库的完整API可在api.md中找到。

安装

[!IMPORTANT] SDK在v1中重写,并于2023年11月6日发布。请参阅v1迁移指南,其中包含自动更新您代码的脚本。

# install from PyPI
pip install openai

用法

该库的完整API可在api.md中找到。

import os
from openai import OpenAI

client = OpenAI(
    # This is the default and can be omitted
    api_key=os.environ.get("OPENAI_API_KEY"),
)

chat_completion = client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ],
    model="gpt-3.5-turbo",
)

虽然您可以提供一个api_key关键字参数,但我们建议使用python-dotenvOPENAI_API_KEY="My API Key"添加到您的.env文件中,以便您的API密钥不会存储在源代码控制中。

视觉

使用托管图像

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": prompt},
                {
                    "type": "image_url",
                    "image_url": {"url": f"{img_url}"},
                },
            ],
        }
    ],
)

使用作为base64编码字符串的图像

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": prompt},
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:{img_type};base64,{img_b64_str}"},
                },
            ],
        }
    ],
)

轮询助手

与API交互时,一些操作(如启动运行和向矢量存储添加文件)是异步的,需要时间才能完成。SDK包含辅助函数,这些函数会轮询状态,直到达到最终状态,然后返回结果对象。如果API方法导致可能受益于轮询的操作,将有一个以'_and_poll'结尾的方法版本。

例如,要创建一个运行实例并轮询其达到最终状态,可以运行

run = client.beta.threads.runs.create_and_poll(
    thread_id=thread.id,
    assistant_id=assistant.id,
)

有关运行的生存周期的更多信息,请参阅运行生命周期文档

批量上传辅助工具

在创建和与矢量存储交互时,您可以使用轮询辅助工具来监视操作状态。为了方便起见,我们还提供批量上传辅助工具,允许您一次上传多个文件。

sample_files = [Path("sample-paper.pdf"), ...]

batch = await client.vector_stores.file_batches.upload_and_poll(
    store.id,
    files=sample_files,
)

流辅助工具

SDK还包括处理流和处理传入事件的辅助工具。

with client.beta.threads.runs.stream(
    thread_id=thread.id,
    assistant_id=assistant.id,
    instructions="Please address the user as Jane Doe. The user has a premium account.",
) as stream:
    for event in stream:
        # Print the text from text delta events
        if event.type == "thread.message.delta" and event.data.delta.content:
            print(event.data.delta.content[0].text)

有关流辅助工具的更多信息,请参阅专门的文档:helpers.md

异步使用

只需导入AsyncOpenAI而不是OpenAI,然后在每个API调用中使用await

import os
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    # This is the default and can be omitted
    api_key=os.environ.get("OPENAI_API_KEY"),
)


async def main() -> None:
    chat_completion = await client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": "Say this is a test",
            }
        ],
        model="gpt-3.5-turbo",
    )


asyncio.run(main())

同步和异步客户端之间的功能是相同的。

流响应

我们支持使用服务器端事件(SSE)进行流式响应。

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Say this is a test"}],
    stream=True,
)
for chunk in stream:
    print(chunk.choices[0].delta.content or "", end="")

异步客户端使用完全相同的接口。

from openai import AsyncOpenAI

client = AsyncOpenAI()


async def main():
    stream = await client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Say this is a test"}],
        stream=True,
    )
    async for chunk in stream:
        print(chunk.choices[0].delta.content or "", end="")


asyncio.run(main())

模块级客户端

[!重要]我们强烈建议实例化客户端实例,而不是依赖于全局客户端。

我们还公开了一个全局客户端实例,其访问方式类似于v1之前的版本。

import openai

# optional; defaults to `os.environ['OPENAI_API_KEY']`
openai.api_key = '...'

# all client options can be configured just like the `OpenAI` instantiation counterpart
openai.base_url = "https://..."
openai.default_headers = {"x-foo": "true"}

completion = openai.chat.completions.create(
    model="gpt-4",
    messages=[
        {
            "role": "user",
            "content": "How do I output all files in a directory using Python?",
        },
    ],
)
print(completion.choices[0].message.content)

API与标准基于实例的客户端API完全相同。

此功能旨在用于REPL或笔记本中,以实现更快的迭代,而不是在应用程序代码中使用。

我们建议您始终在应用程序代码中实例化客户端(例如,使用client = OpenAI()),因为

  • 很难推断客户端选项的配置位置
  • 更改某些客户端选项可能会导致竞态条件
  • 更难用于测试目的的模拟
  • 无法控制网络连接的清理

使用类型

嵌套请求参数是TypedDicts。响应是Pydantic models,还提供了诸如

  • 将序列化回JSON,model.to_json()
  • 转换为字典,model.to_dict()

类型请求和响应在您的编辑器中提供自动完成和文档。如果要在VS Code中设置类型错误以帮助早期捕获错误,请将python.analysis.typeCheckingMode设置为basic

分页

OpenAI API中的列表方法是分页的。

此库在每个列表响应中提供自动分页迭代器,因此您无需手动请求后续页面

from openai import OpenAI

client = OpenAI()

all_jobs = []
# Automatically fetches more pages as needed.
for job in client.fine_tuning.jobs.list(
    limit=20,
):
    # Do something with job here
    all_jobs.append(job)
print(all_jobs)

或者,异步地

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()


async def main() -> None:
    all_jobs = []
    # Iterate through items across all pages, issuing requests as needed.
    async for job in client.fine_tuning.jobs.list(
        limit=20,
    ):
        all_jobs.append(job)
    print(all_jobs)


asyncio.run(main())

或者,您可以使用.has_next_page().next_page_info().get_next_page()方法对页面进行更细粒度的控制

first_page = await client.fine_tuning.jobs.list(
    limit=20,
)
if first_page.has_next_page():
    print(f"will fetch next page using these details: {first_page.next_page_info()}")
    next_page = await first_page.get_next_page()
    print(f"number of items we just fetched: {len(next_page.data)}")

# Remove `await` for non-async usage.

或者直接处理返回的数据

first_page = await client.fine_tuning.jobs.list(
    limit=20,
)

print(f"next page cursor: {first_page.after}")  # => "next page cursor: ..."
for job in first_page.data:
    print(job.id)

# Remove `await` for non-async usage.

嵌套参数

嵌套参数是使用TypedDict类型化的字典,例如

from openai import OpenAI

client = OpenAI()

completion = client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "Can you generate an example json object describing a fruit?",
        }
    ],
    model="gpt-3.5-turbo-1106",
    response_format={"type": "json_object"},
)

文件上传

与文件上传对应的请求参数可以传递为bytes、一个PathLike实例或一个(filename, contents, media type)元组。

from pathlib import Path
from openai import OpenAI

client = OpenAI()

client.files.create(
    file=Path("input.jsonl"),
    purpose="fine-tune",
)

异步客户端使用完全相同的接口。如果您传递一个PathLike实例,文件内容将自动异步读取。

处理错误

当库无法连接到API(例如,由于网络连接问题或超时)时,将引发openai.APIConnectionError的子类。

当API返回非成功状态码(即4xx或5xx响应)时,会抛出openai.APIStatusError的子类,包含status_coderesponse属性。

所有错误都继承自openai.APIError

import openai
from openai import OpenAI

client = OpenAI()

try:
    client.fine_tuning.jobs.create(
        model="gpt-3.5-turbo",
        training_file="file-abc123",
    )
except openai.APIConnectionError as e:
    print("The server could not be reached")
    print(e.__cause__)  # an underlying Exception, likely raised within httpx.
except openai.RateLimitError as e:
    print("A 429 status code was received; we should back off a bit.")
except openai.APIStatusError as e:
    print("Another non-200-range status code was received")
    print(e.status_code)
    print(e.response)

错误代码如下

状态码 错误类型
400 BadRequestError
401 AuthenticationError
403 PermissionDeniedError
404 NotFoundError
422 UnprocessableEntityError
429 RateLimitError
>=500 InternalServerError
不适用 APIConnectionError

请求ID

有关调试请求的更多信息,请参阅这些文档

SDK中的所有对象响应都提供了一个_request_id属性,该属性是从响应头中的x-request-id添加的,以便您快速记录失败的请求并将其报告给OpenAI。

completion = await client.chat.completions.create(
    messages=[{"role": "user", "content": "Say this is a test"}], model="gpt-4"
)
print(completion._request_id)  # req_123

请注意,与使用_前缀的其他属性不同,_request_id属性是公开的。除非文档另有说明,否则所有其他使用_前缀的属性、方法和模块都是私有的。

重试

某些错误默认自动重试2次,使用短指数退避。默认情况下,连接错误(例如,由于网络连接问题)、408请求超时、409冲突、429速率限制和>=500内部错误都会进行重试。

您可以使用max_retries选项配置或禁用重试设置

from openai import OpenAI

# Configure the default for all requests:
client = OpenAI(
    # default is 2
    max_retries=0,
)

# Or, configure per-request:
client.with_options(max_retries=5).chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "How can I get the name of the current day in Node.js?",
        }
    ],
    model="gpt-3.5-turbo",
)

超时

默认情况下,请求在10分钟后超时。您可以使用timeout选项配置此设置,该选项接受一个浮点数或一个httpx.Timeout对象

from openai import OpenAI

# Configure the default for all requests:
client = OpenAI(
    # 20 seconds (default is 10 minutes)
    timeout=20.0,
)

# More granular control:
client = OpenAI(
    timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)

# Override per-request:
client.with_options(timeout=5.0).chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": "How can I list all files in a directory using Python?",
        }
    ],
    model="gpt-3.5-turbo",
)

超时时会抛出APITimeoutError

请注意,默认情况下,超时的请求会自动重试两次

高级

日志记录

我们使用标准库logging模块。

您可以通过将环境变量OPENAI_LOG设置为debug来启用日志记录。

$ export OPENAI_LOG=debug

如何判断None代表null还是缺失

在API响应中,一个字段可能被显式设置为null,或者完全缺失;在两种情况下,该字段在这个库中的值都是None。您可以使用.model_fields_set区分这两种情况。

if response.my_field is None:
  if 'my_field' not in response.model_fields_set:
    print('Got json like {}, without a "my_field" key present at all.')
  else:
    print('Got json like {"my_field": null}.')

访问原始响应数据(例如,头部信息)

可以通过在任何HTTP方法调用前加上.with_raw_response.来访问“原始”响应对象,例如

from openai import OpenAI

client = OpenAI()
response = client.chat.completions.with_raw_response.create(
    messages=[{
        "role": "user",
        "content": "Say this is a test",
    }],
    model="gpt-3.5-turbo",
)
print(response.headers.get('X-My-Header'))

completion = response.parse()  # get the object that `chat.completions.create()` would have returned
print(completion)

这些方法返回一个LegacyAPIResponse对象。这是一个旧类,因为我们将在下一个主要版本中对其进行轻微修改。

对于同步客户端,这基本上与以下内容相同,除了contenttext将是方法而不是属性。在异步客户端中,所有方法都将异步执行。

将提供迁移脚本,并且迁移应该总体上很顺利。

.with_streaming_response

上述接口在您发出请求时会积极读取完整的响应体,这可能并不总是您想要的。

要流式传输响应体,请使用.with_streaming_response,它需要一个上下文管理器,并且只在您调用.read().text().json().iter_bytes().iter_text().iter_lines().parse()时读取响应体。在异步客户端中,这些是异步方法。

因此,.with_streaming_response方法返回不同的APIResponse对象,而异步客户端返回一个AsyncAPIResponse对象。

with client.chat.completions.with_streaming_response.create(
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ],
    model="gpt-3.5-turbo",
) as response:
    print(response.headers.get("X-My-Header"))

    for line in response.iter_lines():
        print(line)

需要上下文管理器,以确保可靠地关闭响应。

制作自定义/未记录的请求

此库针对方便访问已记录的API进行了类型化。

如果您需要访问未记录的端点、参数或响应属性,该库仍然可以使用。

未记录的端点

要向未记录的端点发送请求,您可以使用 client.getclient.post 和其他 HTTP 动词进行请求。在执行此请求时,客户端选项(如重试)将被尊重。

import httpx

response = client.post(
    "/foo",
    cast_to=httpx.Response,
    body={"my_param": True},
)

print(response.headers.get("x-foo"))

未记录的请求参数

如果您想显式发送额外的参数,可以使用 extra_queryextra_bodyextra_headers 请求选项来实现。

未记录的响应属性

要访问未记录的响应属性,您可以访问像 response.unknown_prop 这样的额外字段。您还可以将 Pydantic 模型上的所有额外字段作为字典使用 response.model_extra 获取。

配置 HTTP 客户端

您可以直接覆盖 httpx 客户端 以定制它以适应您的用例,包括

  • 代理支持
  • 自定义传输
  • 额外的 高级 功能
from openai import OpenAI, DefaultHttpxClient

client = OpenAI(
    # Or use the `OPENAI_BASE_URL` env var
    base_url="http://my.test.server.example.com:8083/v1",
    http_client=DefaultHttpxClient(
        proxies="http://my.test.proxy.example.com",
        transport=httpx.HTTPTransport(local_address="0.0.0.0"),
    ),
)

您还可以通过使用 with_options() 在每次请求的基础上定制客户端。

client.with_options(http_client=DefaultHttpxClient(...))

管理 HTTP 资源

默认情况下,当客户端被 垃圾回收 时,库会关闭底层 HTTP 连接。如果需要,您可以使用 .close() 方法手动关闭客户端,或者使用在退出时关闭的上下文管理器。

Microsoft Azure OpenAI

要使用此库与 Azure OpenAI,请使用 AzureOpenAI 类而不是 OpenAI 类。

[!IMPORTANT] Azure API 的形状与核心 API 的形状不同,这意味着响应/参数的静态类型不一定总是正确的。

from openai import AzureOpenAI

# gets the API Key from environment variable AZURE_OPENAI_API_KEY
client = AzureOpenAI(
    # https://learn.microsoft.com/azure/ai-services/openai/reference#rest-api-versioning
    api_version="2023-07-01-preview",
    # https://learn.microsoft.com/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource
    azure_endpoint="https://example-endpoint.openai.azure.com",
)

completion = client.chat.completions.create(
    model="deployment-name",  # e.g. gpt-35-instant
    messages=[
        {
            "role": "user",
            "content": "How do I output all files in a directory using Python?",
        },
    ],
)
print(completion.to_json())

除了基本 OpenAI 客户端提供的选项之外,还提供了以下选项

  • azure_endpoint(或 AZURE_OPENAI_ENDPOINT 环境变量)
  • azure_deployment
  • api_version(或 OPENAI_API_VERSION 环境变量)
  • azure_ad_token(或 AZURE_OPENAI_AD_TOKEN 环境变量)
  • azure_ad_token_provider

有关使用 Microsoft Entra ID(以前称为 Azure Active Directory)使用客户端的示例,请参阅 此处

版本管理

此软件包通常遵循 SemVer 规范,尽管某些不兼容的更改可能作为次要版本发布。

  1. 仅影响静态类型而不会破坏运行时行为的更改。
  2. 对库内部进行更改,技术上公开但未打算或文档化供外部使用。(如果您依赖此类内部,请打开 GitHub 上的问题)
  3. 我们预计不会影响大多数用户的实际实践的更改。

我们非常重视向后兼容性,并努力确保您可以依赖平滑的升级体验。

我们渴望您的反馈;请就问题、错误或建议打开 问题

确定安装的版本

如果您已升级到最新版本但未看到您期望的新功能,那么您的 Python 环境可能仍在使用较旧版本。

您可以使用以下方法确定运行时使用的版本

import openai
print(openai.__version__)

需求

Python 3.7 或更高版本。

发布历史 发布通知 | RSS订阅

下载文件

下载适合您平台文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

openai-1.51.0.tar.gz (306.9 kB 查看哈希值)

上传 源代码

构建分发

openai-1.51.0-py3-none-any.whl (383.5 kB 查看哈希值)

上传 Python 3

由以下支持