跳转到主要内容

简单的广播频道。

项目描述

广播器

广播器通过在多个不同的后端服务上提供一个简单的广播API,帮助您开发实时流功能。

目前支持 Redis PUB/SUBRedis StreamsApache KafkaPostgres LISTEN/NOTIFY,以及一个简单的内存后端,您可以用它进行本地开发或在测试期间使用。

WebSockets Demo

以下是简单websocket聊天应用的后端代码的完整示例

app.py

# Requires: `starlette`, `uvicorn`, `jinja2`
# Run with `uvicorn example:app`
import anyio
from broadcaster import Broadcast
from starlette.applications import Starlette
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates


broadcast = Broadcast("redis://localhost:6379")
templates = Jinja2Templates("templates")


async def homepage(request):
    template = "index.html"
    context = {"request": request}
    return templates.TemplateResponse(template, context)


async def chatroom_ws(websocket):
    await websocket.accept()

    async with anyio.create_task_group() as task_group:
        # run until first is complete
        async def run_chatroom_ws_receiver() -> None:
            await chatroom_ws_receiver(websocket=websocket)
            task_group.cancel_scope.cancel()

        task_group.start_soon(run_chatroom_ws_receiver)
        await chatroom_ws_sender(websocket)


async def chatroom_ws_receiver(websocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="chatroom", message=message)


async def chatroom_ws_sender(websocket):
    async with broadcast.subscribe(channel="chatroom") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)


routes = [
    Route("/", homepage),
    WebSocketRoute("/", chatroom_ws, name='chatroom_ws'),
]


app = Starlette(
    routes=routes, on_startup=[broadcast.connect], on_shutdown=[broadcast.disconnect],
)

前端HTML模板 在此处提供,并改编自 Pieter Noordhuis的PUB/SUB演示

需求

Python 3.8+

安装

  • pip install broadcaster
  • pip install broadcaster[redis]
  • pip install broadcaster[postgres]
  • pip install broadcaster[kafka]

可用的后端

  • Broadcast('memory://')
  • Broadcast("redis://localhost:6379")
  • Broadcast("redis-stream://localhost:6379")
  • Broadcast("postgres://localhost:5432/broadcaster")
  • Broadcast("kafka://localhost:9092")

使用自定义后端

您可以创建自己的后端并将其与broadcaster一起使用。为此,您需要创建一个继承自BroadcastBackend的类,并将其通过backend参数传递给broadcaster

from broadcaster import Broadcaster, BroadcastBackend

class MyBackend(BroadcastBackend):

broadcaster = Broadcaster(backend=MyBackend())

下一步是什么?

目前,broadcaster处于Alpha版本,应被视为工作设计文档。

API可能会发生变化。如果您确实想使用当前状态的Broadcaster,请确保将您的需求严格锁定到broadcaster==0.3.0

为了提高能力,我们真的希望添加一些额外的后端,提供API支持以从持久存储中读取最近的事件历史,并提供序列化/反序列化API...

  • 支持广播结构化数据的序列化/反序列化。
  • 支持RabbitMQ的后端。
  • 为支持持久化的后端添加对subscribe('chatroom', history=100)的支持。(Redis Streams,Apache Kafka)这将允许应用程序订阅频道更新,同时也能看到最近的事件窗口。我们可能还想支持一些基本的分页操作,以便应用程序可以扫描事件历史。
  • 支持后端支持的模式订阅。

第三方包

MQTT后端

概要

将MQTT与广播器集成

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解有关安装包的更多信息。

源代码分发

broadcaster-0.3.1.tar.gz (6.9 kB 查看哈希

上传时间 源代码

构建分发

broadcaster-0.3.1-py3-none-any.whl (9.5 kB 查看哈希

上传时间 Python 3

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面