简单的广播频道。
项目描述
广播器
广播器通过在多个不同的后端服务上提供一个简单的广播API,帮助您开发实时流功能。
目前支持 Redis PUB/SUB、Redis Streams、Apache Kafka 和 Postgres LISTEN/NOTIFY,以及一个简单的内存后端,您可以用它进行本地开发或在测试期间使用。
以下是简单websocket聊天应用的后端代码的完整示例
app.py
# Requires: `starlette`, `uvicorn`, `jinja2`
# Run with `uvicorn example:app`
import anyio
from broadcaster import Broadcast
from starlette.applications import Starlette
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates
broadcast = Broadcast("redis://localhost:6379")
templates = Jinja2Templates("templates")
async def homepage(request):
template = "index.html"
context = {"request": request}
return templates.TemplateResponse(template, context)
async def chatroom_ws(websocket):
await websocket.accept()
async with anyio.create_task_group() as task_group:
# run until first is complete
async def run_chatroom_ws_receiver() -> None:
await chatroom_ws_receiver(websocket=websocket)
task_group.cancel_scope.cancel()
task_group.start_soon(run_chatroom_ws_receiver)
await chatroom_ws_sender(websocket)
async def chatroom_ws_receiver(websocket):
async for message in websocket.iter_text():
await broadcast.publish(channel="chatroom", message=message)
async def chatroom_ws_sender(websocket):
async with broadcast.subscribe(channel="chatroom") as subscriber:
async for event in subscriber:
await websocket.send_text(event.message)
routes = [
Route("/", homepage),
WebSocketRoute("/", chatroom_ws, name='chatroom_ws'),
]
app = Starlette(
routes=routes, on_startup=[broadcast.connect], on_shutdown=[broadcast.disconnect],
)
前端HTML模板 在此处提供,并改编自 Pieter Noordhuis的PUB/SUB演示。
需求
Python 3.8+
安装
pip install broadcaster
pip install broadcaster[redis]
pip install broadcaster[postgres]
pip install broadcaster[kafka]
可用的后端
Broadcast('memory://')
Broadcast("redis://localhost:6379")
Broadcast("redis-stream://localhost:6379")
Broadcast("postgres://localhost:5432/broadcaster")
Broadcast("kafka://localhost:9092")
使用自定义后端
您可以创建自己的后端并将其与broadcaster
一起使用。为此,您需要创建一个继承自BroadcastBackend
的类,并将其通过backend
参数传递给broadcaster
。
from broadcaster import Broadcaster, BroadcastBackend
class MyBackend(BroadcastBackend):
broadcaster = Broadcaster(backend=MyBackend())
下一步是什么?
目前,broadcaster
处于Alpha版本,应被视为工作设计文档。
API可能会发生变化。如果您确实想使用当前状态的Broadcaster,请确保将您的需求严格锁定到broadcaster==0.3.0
。
为了提高能力,我们真的希望添加一些额外的后端,提供API支持以从持久存储中读取最近的事件历史,并提供序列化/反序列化API...
- 支持广播结构化数据的序列化/反序列化。
- 支持RabbitMQ的后端。
- 为支持持久化的后端添加对
subscribe('chatroom', history=100)
的支持。(Redis Streams,Apache Kafka)这将允许应用程序订阅频道更新,同时也能看到最近的事件窗口。我们可能还想支持一些基本的分页操作,以便应用程序可以扫描事件历史。 - 支持后端支持的模式订阅。
第三方包
MQTT后端
将MQTT与广播器集成
项目详情
下载文件
下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。
源代码分发
broadcaster-0.3.1.tar.gz (6.9 kB 查看哈希)
构建分发
broadcaster-0.3.1-py3-none-any.whl (9.5 kB 查看哈希)
关闭
broadcaster-0.3.1.tar.gz 哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 35d1e174a98346d184cc6558979548340e877ae841c65c7e354153a84fe675d6 |
|
MD5 | 4069d669af0c76d36fa7bcd3ff3a2146 |
|
BLAKE2b-256 | 0f9a937530b4b1339f96423ef314b832aa585250c99de465a6ec1a12da03bbda |
关闭
broadcaster-0.3.1-py3-none-any.whl 哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 433023ab6b6b4a8da9cbba95910eff52b1e767141419659be287cfd49f2a3ecb |
|
MD5 | 7880619b13c1f6da6b42977cc5e13576 |
|
BLAKE2b-256 | 17db6789b924f1349739ea3a4eff0ad6877d1c4a2f844e8a15db49fd6335a532 |