使用HTTPX消费服务器发送事件(SSE)消息。
项目描述
httpx-sse
目录
安装
注意: 这是测试版软件。请确保固定您的依赖项。
pip install httpx-sse=="0.4.*"
快速入门
httpx-sse
提供了 connect_sse
和 aconnect_sse
辅助函数,用于连接到SSE端点。结果 EventSource
对象公开了 .iter_sse()
和 .aiter_sse()
方法,用于迭代服务器发送的事件。
示例用法
import httpx
from httpx_sse import connect_sse
with httpx.Client() as client:
with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data, sse.id, sse.retry)
您可以尝试针对此示例Starlette服务器(来源)
# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield {"data": i}
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [
Route("/sse", endpoint=sse)
]
app = Starlette(routes=routes)
if __name__ == "__main__":
uvicorn.run(app)
如何使用
调用Python Web应用
您可以使用HTTPX和httpx-sse
调用Python Web应用以测试SSE端点。
以下是一个调用Starlette ASGI应用的示例...
import asyncio
import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
async def auth_events(request):
async def events():
yield {
"event": "login",
"data": '{"user_id": "4135"}',
}
return EventSourceResponse(events())
app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
async def main():
async with httpx.AsyncClient(app=app) as client:
async with aconnect_sse(
client, "GET", "http://localhost:8000/sse/auth/"
) as event_source:
events = [sse async for sse in event_source.aiter_sse()]
(sse,) = events
assert sse.event == "login"
assert sse.json() == {"user_id": "4135"}
asyncio.run(main())
处理重连
(高级)
SSETransport
和 AsyncSSETransport
没有内置重连功能。这是因为重试的方式通常取决于您的用例。因此,如果在尝试从服务器读取时连接中断,您将从 iter_sse()
(或 aiter_sse()
)获得一个 httpx.ReadError
。
然而,httpx-sse
允许通过使用 Last-Event-ID
和重新连接时间(以毫秒为单位)来实现重新连接,分别通过 sse.id
和 sse.retry
暴露。
以下是使用 stamina
实现此功能的示例...
import time
from typing import Iterator
import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry
def iter_sse_retrying(client, method, url):
last_event_id = ""
reconnection_delay = 0.0
# `stamina` will apply jitter and exponential backoff on top of
# the `retry` reconnection delay sent by the server.
@retry(on=httpx.ReadError)
def _iter_sse():
nonlocal last_event_id, reconnection_delay
time.sleep(reconnection_delay)
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
with connect_sse(client, method, url, headers=headers) as event_source:
for sse in event_source.iter_sse():
last_event_id = sse.id
if sse.retry is not None:
reconnection_delay = sse.retry / 1000
yield sse
return _iter_sse()
用法
with httpx.Client() as client:
for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
print(sse.event, sse.data)
API参考
connect_sse
def connect_sse(
client: httpx.Client,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> ContextManager[EventSource]
连接到 SSE 端点并返回一个 EventSource
上下文管理器。
这将在请求上设置 Cache-Control: no-store
,根据 SSE 规范,以及 Accept: text/event-stream
。
如果响应的 Content-Type
不是 text/event-stream
,则会引发一个 SSEError
。
aconnect_sse
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> AsyncContextManager[EventSource]
connect_sse
的异步等效函数。
EventSource
def __init__(response: httpx.Response)
用于处理 SSE 响助的辅助函数。
response
底层的 httpx.Response
。
iter_sse
def iter_sse() -> Iterator[ServerSentEvent]
解码响应内容并生成相应的 ServerSentEvent
。
示例用法
for sse in event_source.iter_sse():
...
aiter_sse
async def iter_sse() -> AsyncIterator[ServerSentEvent]
iter_sse
的异步等效函数。
ServerSentEvent
表示服务器发送的事件。
event: str
- 默认为"message"
。data: str
- 默认为""
。id: str
- 默认为""
。retry: str | None
- 默认为None
。
方法
json() -> Any
- 将sse.data
解码为 JSON。
SSEError
在请求 SSE 端点时发生的错误。
父类
httpx.TransportError
许可证
MIT
变更日志
此项目的所有重大更改都将记录在此文件中。
格式基于 Keep a Changelog。
0.4.0 - 2023-12-22
移除
- 删除了对 Python 3.7 的支持,因为它已经达到生命周期的结束。(拉取请求 #21)
添加
- 添加了对 Python 3.12 的官方支持。(拉取请求 #21)
修复
- 允许包含但不限于
text/event-stream
的Content-Type
。(拉取请求 #22 由 @dbuades 提出) - 改进了缺少
Content-Type
时的错误消息。(拉取请求 #20 由 @jamesbraza 提出)
0.3.1 - 2023-06-01
添加
- 为
ServerSentEvent
模型添加了__repr__()
,这可能有助于调试和其他任务。(拉取请求 #16)
0.3.0 - 2023-04-27
更改
- 现在在执行
iter_sse()
/aiter_sse()
时执行将响应内容类型不是text/event-stream
的SSEError
抛出,而不是在connect_sse()
/aconnect_sse()
中执行。这允许在迭代服务器发送事件之前检查响应,例如检查错误响应。(拉取请求 #12)
0.2.0 - 2023-03-27
更改
connect_sse()
和aconnect_sse()
现在需要method
参数:connect_sse(client, "GET", "https://example.org")
。这为除了GET
之外的其他 HTTP 动词的 SSE 请求提供了支持。(拉取请求 #7)
0.1.0 - 2023-02-05
初始发布
添加
- 添加
connect_sse
、aconnect_sse()
、ServerSentEvent
和SSEError
。
项目详情
下载文件
下载您平台的文件。如果您不确定选择哪个,请了解更多关于 安装软件包 的信息。
源代码分发
构建分发
httpx-sse-0.4.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1e81a3a3070ce322add1d3529ed42eb5f70817f45ed6ec915ab753f961139721 |
|
MD5 | 609d7a2dedb0b6592f4977d0a593268a |
|
BLAKE2b-256 | 4c608f4281fa9bbf3c8034fd54c0e7412e66edbab6bc74c4996bd616f8d0406e |
httpx_sse-0.4.0-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f329af6eae57eaa2bdfd962b42524764af68075ea87370a2de920af5341e318f |
|
MD5 | 40ba2ec6c1889e47a55182e55805369e |
|
BLAKE2b-256 | e19ba181f281f65d776426002f330c31849b86b31fc9d848db62e16f03ff739f |