跳转到主要内容

使用HTTPX消费服务器发送事件(SSE)消息。

项目描述

httpx-sse

Build Status Coverage Package version

使用服务器发送事件(SSE)HTTPX

目录

安装

注意: 这是测试版软件。请确保固定您的依赖项。

pip install httpx-sse=="0.4.*"

快速入门

httpx-sse 提供了 connect_sseaconnect_sse 辅助函数,用于连接到SSE端点。结果 EventSource 对象公开了 .iter_sse().aiter_sse() 方法,用于迭代服务器发送的事件。

示例用法

import httpx
from httpx_sse import connect_sse

with httpx.Client() as client:
    with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
        for sse in event_source.iter_sse():
            print(sse.event, sse.data, sse.id, sse.retry)

您可以尝试针对此示例Starlette服务器(来源

# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse

async def numbers(minimum, maximum):
    for i in range(minimum, maximum + 1):
        await asyncio.sleep(0.9)
        yield {"data": i}

async def sse(request):
    generator = numbers(1, 5)
    return EventSourceResponse(generator)

routes = [
    Route("/sse", endpoint=sse)
]

app = Starlette(routes=routes)

if __name__ == "__main__":
    uvicorn.run(app)

如何使用

调用Python Web应用

您可以使用HTTPX和httpx-sse调用Python Web应用以测试SSE端点。

以下是一个调用Starlette ASGI应用的示例...

import asyncio

import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route

async def auth_events(request):
    async def events():
        yield {
            "event": "login",
            "data": '{"user_id": "4135"}',
        }

    return EventSourceResponse(events())

app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])

async def main():
    async with httpx.AsyncClient(app=app) as client:
        async with aconnect_sse(
            client, "GET", "http://localhost:8000/sse/auth/"
        ) as event_source:
            events = [sse async for sse in event_source.aiter_sse()]
            (sse,) = events
            assert sse.event == "login"
            assert sse.json() == {"user_id": "4135"}

asyncio.run(main())

处理重连

(高级)

SSETransportAsyncSSETransport 没有内置重连功能。这是因为重试的方式通常取决于您的用例。因此,如果在尝试从服务器读取时连接中断,您将从 iter_sse()(或 aiter_sse())获得一个 httpx.ReadError

然而,httpx-sse 允许通过使用 Last-Event-ID 和重新连接时间(以毫秒为单位)来实现重新连接,分别通过 sse.idsse.retry 暴露。

以下是使用 stamina 实现此功能的示例...

import time
from typing import Iterator

import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry

def iter_sse_retrying(client, method, url):
    last_event_id = ""
    reconnection_delay = 0.0

    # `stamina` will apply jitter and exponential backoff on top of
    # the `retry` reconnection delay sent by the server.
    @retry(on=httpx.ReadError)
    def _iter_sse():
        nonlocal last_event_id, reconnection_delay

        time.sleep(reconnection_delay)

        headers = {"Accept": "text/event-stream"}

        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        with connect_sse(client, method, url, headers=headers) as event_source:
            for sse in event_source.iter_sse():
                last_event_id = sse.id

                if sse.retry is not None:
                    reconnection_delay = sse.retry / 1000

                yield sse

    return _iter_sse()

用法

with httpx.Client() as client:
    for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
        print(sse.event, sse.data)

API参考

connect_sse

def connect_sse(
    client: httpx.Client,
    method: str,
    url: Union[str, httpx.URL],
    **kwargs,
) -> ContextManager[EventSource]

连接到 SSE 端点并返回一个 EventSource 上下文管理器。

这将在请求上设置 Cache-Control: no-store,根据 SSE 规范,以及 Accept: text/event-stream

如果响应的 Content-Type 不是 text/event-stream,则会引发一个 SSEError

aconnect_sse

async def aconnect_sse(
    client: httpx.AsyncClient,
    method: str,
    url: Union[str, httpx.URL],
    **kwargs,
) -> AsyncContextManager[EventSource]

connect_sse 的异步等效函数。

EventSource

def __init__(response: httpx.Response)

用于处理 SSE 响助的辅助函数。

response

底层的 httpx.Response

iter_sse

def iter_sse() -> Iterator[ServerSentEvent]

解码响应内容并生成相应的 ServerSentEvent

示例用法

for sse in event_source.iter_sse():
    ...

aiter_sse

async def iter_sse() -> AsyncIterator[ServerSentEvent]

iter_sse 的异步等效函数。

ServerSentEvent

表示服务器发送的事件。

  • event: str - 默认为 "message"
  • data: str - 默认为 ""
  • id: str - 默认为 ""
  • retry: str | None - 默认为 None

方法

  • json() -> Any - 将 sse.data 解码为 JSON。

SSEError

在请求 SSE 端点时发生的错误。

父类

  • httpx.TransportError

许可证

MIT

变更日志

此项目的所有重大更改都将记录在此文件中。

格式基于 Keep a Changelog

0.4.0 - 2023-12-22

移除

  • 删除了对 Python 3.7 的支持,因为它已经达到生命周期的结束。(拉取请求 #21)

添加

  • 添加了对 Python 3.12 的官方支持。(拉取请求 #21)

修复

  • 允许包含但不限于 text/event-streamContent-Type。(拉取请求 #22 由 @dbuades 提出)
  • 改进了缺少 Content-Type 时的错误消息。(拉取请求 #20 由 @jamesbraza 提出)

0.3.1 - 2023-06-01

添加

  • ServerSentEvent 模型添加了 __repr__(),这可能有助于调试和其他任务。(拉取请求 #16)

0.3.0 - 2023-04-27

更改

  • 现在在执行 iter_sse() / aiter_sse() 时执行将响应内容类型不是 text/event-streamSSEError 抛出,而不是在 connect_sse() / aconnect_sse() 中执行。这允许在迭代服务器发送事件之前检查响应,例如检查错误响应。(拉取请求 #12)

0.2.0 - 2023-03-27

更改

  • connect_sse()aconnect_sse() 现在需要 method 参数: connect_sse(client, "GET", "https://example.org")。这为除了 GET 之外的其他 HTTP 动词的 SSE 请求提供了支持。(拉取请求 #7)

0.1.0 - 2023-02-05

初始发布

添加

  • 添加 connect_sseaconnect_sse()ServerSentEventSSEError

项目详情


下载文件

下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。

源代码分发

httpx-sse-0.4.0.tar.gz (12.6 kB 查看哈希值

源代码

构建分发

httpx_sse-0.4.0-py3-none-any.whl (7.8 kB 查看哈希值)

上传时间 Python 3

由以下提供支持