跳转到主要内容

Django channels的消息处理

项目描述

信封

注意:虽然其中一些可能是正在进行的工作,但我们会确保在更改可能破坏内容的任何内容时增加主版本号。

我们没有为大多数事物编写文档,但源代码应该足够清晰。

简介

虽然channels处理异步域内消息的路由,但在传输内容方面非常开放。

信封基本上是一个用于结构化和处理不同消息的意义和它们所做的事情的系统。

一些核心原则

  • 可读性高于速度 消息被序列化和反序列化为包含有关其来源的元数据的真实对象,并且它们上有方法。如果您需要每秒处理1M消息,那么您根本不应该使用channels。
  • 尽可能将异步和同步世界分开 如果某物需要与数据库一起工作,请将其推迟到队列中。混合这些会使测试非常困难,并导致许多意外的错误。
  • 可插拔 允许其他应用程序在此基础上构建并注入功能。
  • 可预测性和类型检查 我们使用Pydantic和Python的类型注解。所有有效的消息类型必须在每个特定的通信渠道内注册为有效才能有效。
  • 在多个步骤中进行消息验证 首先验证基本消息结构,然后是有效载荷本身以及任何昂贵的验证(如数据库查询),这些验证不在异步消费者外部进行。
  • 无惊喜 如果有效负载不符合通过该通道应该通信的实际消息,它将被丢弃。

先决条件

  • 基本的Django知识。
  • 了解Django Channels及其功能 - 尤其是消费者。

依赖项 - 以及原因

  • RQdjango_rq处理队列和延迟操作。
  • async-signals按照该包所说的功能。从Django 5开始,它可以在Django默认的信号中工作,我们可能会稍后重构。
  • pydantic用于所有架构,但通过一点重构,可以使用任何序列化器。

核心概念

信封

跟踪要接受的消息类型以及如何处理它们。执行消息有效负载的序列化和反序列化以及基本验证。注册的封装总是通过单个传输类型进行单向通信,例如传入的WebSocket消息或传出的WebSocket消息。

封装有简短的键,其中只有t是必需的。

封装架构 - 键

  • t 消息类型。必须在消息注册表中作为键存在。
  • i 消息跟踪ID。由该消息引起的任何错误或响应都会返回相同的跟踪ID。将此跟踪ID传递给该消息将引起的行为是个好主意。(例如,一个进程队列)
  • p 负载。可以是空的,通常是字典,但封装架构定义了这一点。
  • s 状态 - 由另一个消息引起的通常带有状态的出站消息,以指示操作的最终结果。通过使用状态和i,可以构建请求/响应功能。

默认消息状态

  • a 已确认 - 消息已接收并通过至少基本验证。
  • q 已排队 - 在进程队列中等待。
  • r 运行中 - 处理工作进程开始处理此任务。(如果这是一个长期任务,发送带有此标记的定期更新可能是个好主意)
  • s 成功。任何类型的点赞回复,不必比ping/pong回复更夸张。
  • f 失败(通过异常或其他原因)

消息

一个知道如何处理特定消息的类。它可能在收到时执行其他操作,并且可以为其有效负载定义一个Pydantic架构。

反序列化的消息也有元数据,用于跟踪其来源和可能的跟踪ID。

消息注册表

它只是一个字典,其键是与消息类型对应的字符串,值是一个消息类。

注册表具有与它们通信通道对应的名称。它们总是单向的,但消息可以添加到不同的注册表中。

使用解释方向的名称,例如'websocket_incoming'。

连接信号

异步信号consumer_connectedconsumer_closed在连接被接受或关闭时立即触发。然后它们将使用同步信号connection_createdconnection_closed排队一个作业。这样,同步代码就不会阻塞连接。

>>> from envelope.async_signals import consumer_connected, consumer_closed
>>> from envelope.signals import connection_created, connection_closed

消息信号

任何消息的接收者都会生成一个事件,其他应用程序部分可以响应。这样,可以将功能添加到消息中。例如,任何继承自envelope.core.message.AsyncRunnable的消息都将调用run方法。

每个消息方向都有自己的异步信号。

>>> from envelope.async_signals import incoming_websocket_message
>>> from envelope.async_signals import outgoing_websocket_message
>>> from envelope.async_signals import outgoing_websocket_error

设置

大多数必需的设置将由django_rq和channels处理。但是,可以调整封装的一些方面。

请记住,消息注册表将默认包含一些消息 - 如果您不希望如此,请确保将其清除。

ENVELOPE_CONNECTIONS_QUEUE - 默认值:None

: 使用RQ队列创建连接对象的作业名称。None禁用功能。

ENVELOPE_TIMESTAMP_QUEUE - 默认:无

: 用于连接对象时间戳更新的 RQ 队列名称。使用 None 禁用功能。

ENVELOPE_CONNECTION_UPDATE_INTERVAL - 单位:秒,默认:180

: 多久应该排队一个时间戳任务?使用 None 禁用功能。

ENVELOPE_BATCH_MESSAGE - 默认:envelope.messages.common.BatchMessage

: 用于批处理消息的类。

ENVELOPE_SENDER_UTIL - 默认:envelope.utils.SenderUtil

: 用于发送工具的类。

ENVELOPE_ALLOW_UNAUTHENTICATED - 默认:False

: 实验

使用示例

在内容更改时发送消息

在这个示例中,我们将向用户自己的频道发出信号,当用户对象发生变化时。一个连接到同一服务器的多个标签页的用户将立即在所有标签页中看到更改。

确保 envelope.app.user_channel 已添加到 INSTALLED_APPS,以便此示例正常工作。

>>> from pydantic import BaseModel
>>> from django.contrib.auth import get_user_model
>>> from django.dispatch import receiver
>>> from django.db.models.signals import post_save
>>> from envelope.core.message import Message
>>> from envelope.app.user_channel.channel import UserChannel
>>> from envelope import WS_OUTGOING
>>> from envelope.decorators import add_message
    
>>> User = get_user_model()
    
>>> class UserSchema(BaseModel):
...     username: str
...     first_name: str = ""
...     last_name: str = ""
...     email: str = ""
    
    
>>> @add_message(WS_OUTGOING)
... class UserDetails(Message):
...     name = "user.details"
...     schema = UserSchema
    
    
>>> @receiver(post_save, sender=User)
... def send_user_details_on_change(instance: User, **kwargs):
...     data = {k: getattr(instance, k) for k in UserSchema.schema()['properties']}
...     msg = UserDetails(**data)
...     channel = UserChannel.from_instance(instance)
...     channel.sync_publish(msg)
    
# We'll mock the channels layer to catch the message
# By default, sync messages will only be sent when the transaction commits
# to avoid sending messages for things that may not happen.

>>> from json import loads
>>> from unittest.mock import patch
>>> from channels.layers import get_channel_layer
>>> layer = get_channel_layer()

# Test here is Djangos instance of TestCase
>>> with patch.object(layer, 'group_send') as mock_send:
...     with test.captureOnCommitCallbacks(execute=True):
...         new_user = User.objects.create(username="jane", first_name="Jane", last_name="Doe")
...         first = mock_send.called  # No message yet since db hasn't committed!
...     second = mock_send.called


>>> first
False

>>> second
True

>>> mock_send.mock_calls[0].args[0] == f"user_{new_user.pk}"
True

>>> data = loads(mock_send.mock_calls[0].args[1]['text_data'])
>>> data['p']
{'username': 'jane', 'first_name': 'Jane', 'last_name': 'Doe', 'email': ''}

项目详情


下载文件

下载适合您平台的应用程序。如果您不确定选择哪一个,请了解更多关于 安装包 的信息。

源分发

channels-envelope-1.0.3.tar.gz (33.8 kB 查看哈希值)

上传时间:

构建分发

channels_envelope-1.0.3-py3-none-any.whl (47.5 kB 查看哈希值)

上传时间: Python 3

由以下机构支持