跳转到主要内容

一个可重用的Django应用程序,提供将后台处理器中的消息实时存储和显示到Web视图中的工具包,处理器工作时即可实现。

项目描述

Pluto-RT:长运行进程的实时Web结果

每当您需要从Web视图触发一个长运行进程时,都会遇到Python的阻塞性质 - 除非进程完成,否则不会显示任何内容,如果超出您的Web应用程序进程的默认超时时间,您会触发超时。

为了解决长运行方面的问题,我们转向后台工作者,如django-q或django-celery。但是,用户无法了解后台工作者在做什么。

Pluto-RT通过使用Redis作为消息队列服务并具有FIFO(先进先出)队列来解决这个问题。后台工作者可以将消息放入队列,当视图检索它们时,它会首先返回最旧的那些。它可以通过轮询WSGI服务器或使用ASGI的服务器发送事件(SSE)来工作。

Animated gif showing pluto-rt demo functionality

整体策略如下

  1. 创建一个唯一的“队列名称”,可以发送到工作队列并传递到“结果”页面。
  2. 使用该队列名称调用您的后台处理器(工作者)。工作者在执行任务时将消息放置到队列中。
  3. 显示结果模板,传递队列名称、项目模板名称和div目标。模板生成htmxC,检索与该队列相关的消息。
  4. 服务器从队列中删除最旧的消息并将它们发送到客户端。

演示

演示目录中提供了一个演示。最快的方式是使用docker尝试。

docker compose --project-directory demo up

然后在浏览器中打开http://localhost:8000/来查看各种演示。


或者使用venv运行。

python3 -m venv .venv
. .venv/bin/activate
.venv/bin/pip install -r demo/requirements.txt
PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 PYTHONPATH=.:src DJANGO_SETTINGS_MODULE=demo.settings .venv/bin/celery -A demo.tasks worker --loglevel=INFO

在新的终端中运行:

PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 PYTHONPATH=.:src .venv/bin/granian --reload --interface asginl --port 8000 demo.asgi:application

然后在浏览器中打开http://localhost:8000/来查看各种演示。

先决条件

我们假设您已经安装并配置好了以下内容

  • 一个配置了Redis的运行中的Django项目
  • 一个运行中的后台处理器,如celery,并定义了长时间运行的任务

安装

  • pip install pluto-rt
  • pluto_rt添加到项目设置中已安装的应用列表。
  • 在项目的urls.py文件中包含pluto_rt.urls(如果需要,包含login_required等)。选择您想要的任何前缀。

使用方法

您需要控制两个视图:启动流程并将任务传递给后台工作者的视图,以及消费结果的视图。

在完成上述安装步骤后...

在启动视图中

  1. 创建队列名称。它应该是相当独特的,这样就不会因为页面重新加载而创建相同的值。uuid4可以提供这样的功能,同时基本上为内容提供授权(URL将无法猜测)
    queue_name = f"listqueue_{uuid.uuid4()}"
    
  2. 启动长时间运行的过程,并传递队列名称
    sample_ops_function.delay(queue_name)
    
  3. 将队列名称传递给下一个视图(例如,通过页面显示或重定向)
    return render(request, "demo/demo_list.html", {"queue_name": queue_name})
    

在长时间运行的任务中

  1. 使用pluto_rt.ops.get_rt_queue_handle(queue_name)获取队列句柄。将消息项放入队列中。消息必须是可pickle的。
    mqueue = get_rt_queue_handle(queue_name)
    mqueue.push({"status": "info", "msg": "Demo starting"})
    
  2. 当您完成任务(成功或失败)时,在队列句柄上调用complete()
    mqueue.complete()
    

创建一个包含以下内容的模板结果

  1. 加载htmxx和htmxx-sse的脚本元素
  2. 一个带有id的目标div,它将包含表示每个消息的格式化DOM元素。
  3. 使用include过滤器加载pluto_rt/sse.html(用于ASGI服务器),或pluto_rt/polling.html(用于WSGI)。传入必要的item_templatetarget参数,以及可选的modereversereplace

例如

<ul id="results" class="list-group"></ul>
{% include "pluto_rt/sse.html" with item_template="myapp/pluto_rt_item.html" target="#results" %}

<script src="/js/htmx.min.js"></script>
<script src="/js/htmx-sse.min.js"></script>

在模板目录中创建一个"item"模板

  1. item_template的路径(目录或文件名)必须包含pluto_rt。模板的格式由您自己决定!传递到模板中的对象命名为"item",并将包含长时间运行过程中添加的内容。它是一个普通的Django模板项,但将在发送到客户端之前转换为HTML片段。它可以像以下这样简单,也可以像以下这样复杂,只要它可以作为HTML/text MIME类型传递。

一个非常简单的例子

<div>{{ item }}</div>

更实用的例子(基于上面添加的条目)

<li class="list-group-item list-group-item-{{ item.status }}">{{ item.msg }}</li>

我们要求路径中包含"pluto_rt"作为一个安全措施。坦白说,通过模板暴露数据的可能性很小(上下文中只包含item变量),但这可以使安全性更加整洁。您可以使用名为"pluto_rt"的路径,或将它包含在模板文件名中。

停止轮询

如果您在队列上调用complete(),视图将返回一个消息,告诉htmxx停止轮询。因此,在您的处理函数中,确保在任务完成时调用该函数。

默认情况下,调用complete()和清除队列之间的延迟为5秒。这可以通过传递一个整数参数(秒数)给complete()来覆盖,或者通过可选的Django设置PLUTO_RT_CLEAR_DELAY。为了测试,此设置应为0,以防止缓慢的pytests。

分发和许可

首先将其作为一个私有的ES GitHub仓库创建,以提高可重用性,并打算将来开源。

当通过pip安装时,它将安装wheel,这意味着在做出更改后需要重新编译wheel。

仅此一次

pip install build
pip install twine

在最终提交后,更改pyproject.toml中的版本,然后运行

python3 -m build

然后提交更改并使用以下命令将更新发布到PyPI

twine upload dist/pluto_rt-0.1.2.tar.gz

(替换实际构建号)。

版本

0.1.0 初始版本

0.2.0 2023年9月1日:用我们自己的排队代码替换了已废弃的QR3库(使pluto自给自足)。引入了对包含模板部分而不是整个页面的支持。

0.3.0 2023年12月14日:添加了反向选项、complete()函数和演示应用程序。

0.4.0 2024年2月21日:服务器端事件,更灵活的模板

0.5.0 2024年3月3日:改进模板包含,添加演示

0.6.0 2024年3月9日:使用模式,添加替换

0.7.0 2024年4月9日:支持多个消费者

0.7.1 2024年4月19日:每10秒保持连接

0.7.2 2024年4月23日:使用设置来避免睡眠(针对pytest)

项目详情


下载文件

下载您平台上的文件。如果您不确定选择哪个,请了解更多关于安装包的信息。

源分发

pluto_rt-0.7.2.tar.gz (1.8 MB 查看散列值)

上传时间

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误日志 StatusPage StatusPage 状态页面