跳转到主要内容

WebSocket消息路由器

项目描述

https://circleci.com/gh/closeio/socketshark/tree/master.svg?style=svg

SocketShark 是一个基于Python/Redis/asyncio的WebSocket消息路由器。

(对类似项目感兴趣? Close 正在寻找 优秀的工程师 加入我们的团队)

摘要

SocketShark可以让您轻松构建基于WebSocket的服务,而无需这些服务知道WebSocket。相反,服务通过HTTP端点接收WebSocket客户端的消息,并通过Redis向WebSocket客户端发布消息,而SocketShark则负责处理长期运行的WebSocket连接以及在客户端和服务之间传递消息。

功能

  • 发布-订阅消息

    SocketShark允许构建依赖于发布-订阅模式的应用程序,而无需了解长期运行的WebSocket连接。WebSocket客户端的订阅和发布消息将通过HTTP转发到应用程序。消息可以通过将它们推送到Redis来同步地从应用程序推送到WebSocket客户端。

  • 灵活的WebSocket后端

    SocketShark附带Python 3的Websockets后端(websockets),但可以轻松地适配其他与asyncio兼容的框架。

  • 多个服务

    通过配置文件,SocketShark可以与任意数量的服务一起工作。

  • 过滤顺序错误的消息

    如果需要,可以从服务提供带有消息的内部顺序,SocketShark将自动过滤掉顺序错误的消息。

  • 消息节流

    如果需要,SocketShark可以对服务消息进行节流。

  • 身份验证

    SocketShark内置了票证认证功能。为了认证WebSocket连接,客户端从应用服务器请求一个临时令牌并在登录时提交该令牌。然后,该令牌被兑换为一个会话/用户ID,该ID可以由SocketShark支持的服务用于通过认证用户。

  • 授权

    可以使用自定义的HTTP端点授权发布-订阅订阅。SocketShark可以定期重新授权订阅,以确保未授权的订阅者取消订阅。

  • 自定义字段

    SocketShark支持自定义应用程序特定字段,用于认证和授权目的。

  • 指标

    SocketShark跟踪并报告指标,例如连接数和成功或失败的命令执行,并内置Prometheus和日志后端。

  • 连接维护

    SocketShark支持保持WebSocket连接活跃,并通过自动服务器端ping和客户端ping的处理程序自动发现其关闭。

快速入门

有关示例配置文件,请参阅example_config.py

要开始,请安装SocketShark(python setup.py install),创建自己的配置文件,并按照以下方式运行SocketShark:

PYTHONPATH=. socketshark -c my_config

客户端协议

SocketShark使用WebSocket作为客户端的传输协议。本节描述了Web客户端和SocketShark之间的协议结构。

客户端和服务器交换JSON消息。每个消息都是一个包含一个event字段的JSON字典,该字段指定了事件类型。SocketShark支持以下事件:

  • auth:认证

  • subscribe:订阅主题

  • message:向主题发送消息

  • unsubscribe:从主题取消订阅

  • ping:监控连接性

响应通常包含一个status字段,可以是okerror。在出错的情况下,会提供一个error字段,其中包含错误描述的字符串。

身份验证

WebSocket客户端可以使用auth事件通过票证认证进行认证。有关票证认证的更多信息,请参阅基于票证的会话应用程序认证部分。

认证事件(auth)可以接收一个可选的method参数(目前仅支持默认的ticket认证方式),以及一个包含登录票据的ticket参数。

示例客户端请求

{"event": "auth", "method": "ticket", "ticket": "SECRET_AUTH_TICKET"}

示例服务器响应(成功和失败的情况)

{"event": "auth", "status": "ok"}
{"event": "auth", "status": "error", "error": "Authentication failed."}

订阅

WebSocket客户端可以订阅任意数量的主题。在订阅主题的同时,客户端可以随时将消息传递给服务器,服务器也可以将消息推送给客户端。例如,客户端可以订阅一个对象ID,服务器在对象更新时可以发送消息。在订阅或取消订阅时,服务器可能包含额外的数据。例如,服务器可能在订阅时发送对象的当前状态。

主题是唯一的,客户端最多只能订阅每个主题一次。可以与订阅关联额外的字段,这些字段会随着所有订阅命令一起传递。例如,客户端可能需要指定特定对象订阅的组织ID,以便服务正确授权和处理消息。

订阅

subscribe事件订阅由subscription参数指定的主题,该参数由服务名称和主题组成,由点号分隔。服务可以定义额外的字段,并在订阅消息中直接指定。

示例客户端请求

{"event": "subscribe", "subscription": "books.book_1"}

示例服务器响应(成功和失败的情况)

{"event": "subscribe", "subscription": "books.book_1", "status": "ok"}
{
  "event": "subscribe",
  "subscription": "books.book_1",
  "status": "error",
  "error": "Book does not exist."
}

包含额外数据的示例服务器响应

{
  "event": "subscribe",
  "subscription": "books.book_1",
  "status": "ok",
  "data": {
    "title": "Everyone poops"
  }
}

包含额外字段的示例客户端请求

{"event": "subscribe", "subscription": "books.book_1", "author_id": "author_1"}

包含额外字段的示例成功服务器响应

{
  "event": "subscribe",
  "subscription": "books.book_1",
  "author_id": "author_1",
  "status": "ok"
}

请注意,订阅名称对于订阅是唯一的。当订阅books.book_1时,即使author_id不同,也不能订阅具有相同名称的另一个订阅。然而,服务器可以使用author_id来确保书籍与给定的作者ID匹配。

消息

一旦订阅,可以使用message事件传递消息。消息数据包含在data字段中,应为字典类型。数据结构由应用协议决定,服务决定消息是否被确认(成功或失败)。

示例消息(客户端到服务器或服务器到客户端)

{
  "event": "message",
  "subscription": "books.book_1",
  "data": {
     "action": "update",
     "title": "New book title"
  }
}

示例(可选)服务器对成功消息的额外数据确认

{
  "event": "message",
  "subscription": "books.book_1",
  "status": "ok",
  "data": {"status": "Book was updated."}
}

示例(可选)服务器对失败消息的确认

{
  "event": "message",
  "subscription": "books.book_1",
  "status": "error",
  "error": "Book could not be updated."
}

如果订阅时传递了额外字段,它们将包含在所有message事件中。

请注意,服务可能仅发送特定认证字段的消息(例如,限制为特定用户ID),因此订阅同一主题的多个会话不一定接收到相同的消息。

取消订阅

客户端可以使用unsubscribe事件取消对主题的订阅。

示例客户端请求

{"event": "unsubscribe", "subscription": "books.book_1"}

示例服务器响应(成功和失败的情况)

{"event": "unsubscribe", "subscription": "books.book_1", "status": "ok"}
{
  "event": "unsubscribe",
  "subscription": "books.book_1",
  "status": "error",
  "error": "Subscription does not exist."
}

ping

客户端可以发送ping消息,Socketshark会立即发送pong响应,而不联系任何服务。客户端可以选择发送ping并监控pong响应,例如检测失败的WebSocket连接、显示延迟指标等。此外,ping消息可以包含一些data数据,pong消息应将其重复回传。

示例客户端请求

{"event": "ping", "data": "foobar"}

示例服务器响应

{"event": "pong", "data": "foobar"}

服务协议

SocketShark使用HTTP向服务发送事件,并使用Redis PUBSUB从发布到订阅客户端的服务接收消息。本节描述了服务与SocketShark之间的协议结构。

HTTP回调

可以配置一个可选的HTTP端点来验证WebSocket会话。认证端点可以返回可以配置的认证相关字段(例如,用户ID和/或会话ID)。

以下可选的HTTP端点可以为每个SocketShark服务配置

  • 授权器:调用以授权新订阅的URL。

  • 在订阅之前:客户端尝试订阅时调用的URL。

  • 在订阅后:客户端订阅主题后调用的URL。

  • 在发送消息时:客户端向主题发送消息时调用的URL。

  • 在取消订阅之前:客户端尝试取消订阅时调用的URL。

  • 在取消订阅后:客户端从主题取消订阅后调用的URL。

  • 在授权变更后:如果在定期授权过程中任何授权器字段发生变化,则调用此URL。

每个HTTP端点通过包含JSON主体的POST请求访问。

特定于服务的端点接收为特定服务配置的任何客户端提供的额外字段以及由认证端点返回的认证相关字段。

HTTP端点应返回一个包含具有值okerrorstatus字段的JSON字典。如果发生错误,可以在error字段中指定错误文本。

身份验证

认证URL接收包含在ticket字段中提供的客户端票据的JSON字典。只有成功的响应才能验证用户。

示例请求体

{"ticket": "SECRET_AUTH_TICKET"}

示例服务器响应(成功且包含认证字段,不成功)

{"status": "ok", "user_id": "user_1", "session_id": "session_1"}
{"status": "error", "error": "Authentication failed."}

授权

如果为服务提供了授权器 URL,则每次用户尝试订阅主题时都会调用它。只有成功的响应才能授权订阅,触发在订阅之前回调(如果指定)。

如果没有为服务提供授权器,则所有主题都获得授权。

示例请求体(对于带有认证字段以及额外客户端字段的认证会话)

{
  "subscription": "books.book_1",
  "user_id": "user_1",
  "session_id": "session_1",
  "author_id": "author_1"
}

示例服务器响应(成功和失败的情况)

{"status": "ok"}
{"status": "error", "error": "Author ID does not match book ID."}

在活跃订阅期间,如果将authorization_renewal_period设置为秒数,SocketShark将定期查询授权器端点。如果授权不再有效,SocketShark将取消订阅用户,并将取消订阅消息发送到客户端,例如。

{
  "event": "unsubscribe",
  "subscription": "books.book_1",
  "error": "Unauthorized."
}

在订阅之前

订阅授权后,使用与授权器相同的参数调用在订阅之前回调。只有成功的响应确认订阅,触发在订阅后回调(如果指定)。

可以使用data字段返回额外数据,该字段将转发到客户端。如果返回,则data字段应是一个字典。

在订阅时

确认订阅后,使用与授权器相同的参数调用在订阅后回调。不成功的响应不会影响客户端的订阅。

在收到消息时

当客户端向服务发送消息时,将使用与授权器相同的参数以及data字段中的消息数据调用在发送消息时回调。

带有data字段的成功响应或不成功的响应会向客户端发送确认。

示例请求体(对于带有认证字段以及订阅期间提供的额外客户端字段的认证会话)

{
  "subscription": "books.book_1",
  "user_id": "user_1",
  "session_id": "session_1",
  "author_id": "author_1",
  "data": {
    "action": "update",
    "title": "New book title"
  }
}

示例服务器响应(成功,不触发响应)

{"status": "ok"}

示例服务器响应(成功,触发响应)

{"status": "ok", "data": {"status": "Book was updated."}

示例服务器响应(不成功,触发响应)

{"status": "error", "error": "Book could not be updated."}

在取消订阅之前

当客户端发出取消订阅事件时,使用与授权器相同的参数调用在取消订阅之前回调。只有成功的响应确认取消订阅,触发在取消订阅后回调(如果指定)。

可以使用data字段返回额外数据,该字段将转发到客户端。如果返回,则data字段应是一个字典。

在取消订阅时

在确认取消订阅后,on_unsubscribe 回调函数会以授权者相同的参数被调用。不成功的响应不会影响客户端的取消订阅。

向客户端发布消息

要发布一条消息,服务需要将一条 Redis 消息发布到相应的订阅。消息必须是 JSON 格式,并包含 subscription 字段、自由格式的 data 字典以及任何可选的过滤器(如果服务已配置过滤器字段)。通道名称对应订阅(service.topic),但可以可选地配置 Redis 通道前缀。

当指定过滤器字段时,消息只会被发布到与过滤器匹配的会话。例如,一条消息可能只能发送给匹配特定用户 ID 的会话。

Redis PUBLISH 命令示例

PUBLISH books.book_1 {
  "subscription": "books.book_1",
  "data": {
    "action": "update",
    "title": "New title"
  }
}

过滤顺序错误的消息

由于服务发布的消息可能不一定按预期顺序到达,SocketShark 支持消息过滤。例如,您可能正在将版本化对象的更新发布到 Redis,但由于网络延迟,它们可能到达顺序不正确。消息可以标记为顺序,并且 SocketShark 会在新消息首先到达时过滤掉旧消息。可以在 before_subscribe 回调函数的返回值和任何发布消息中使用 options 字典中的 order 选项提供浮点顺序。具有较低或等于最后收到的最高顺序的顺序的传入消息将被过滤掉。可以使用可选的 order_key 选项指定多个独立的顺序。

在以下示例中,“initiating”和“completed”消息,以及“h”和“hello”消息将被投递给订阅者。

PUBLISH calls.call_1 {
  "subscription": "calls.call_1",
  "options": {
      "order": 1,
      "order_key": "call_1.status",
  },
  "data": {
    "status": "initiating",
  }
}

PUBLISH calls.call_1 {
  "subscription": "calls.call_1",
  "options": {
      "order": 3,
      "order_key": "call_1.status",
  },
  "data": {
    "status": "completed",
  }
}

PUBLISH calls.call_1 {
  "subscription": "calls.call_1",
  "options": {
      "order": 2,
      "order_key": "call_1.status",
  },
  "data": {
    "status": "ringing",
  }
}

PUBLISH calls.call_1 {
  "subscription": "calls.call_1",
  "options": {
      "order": 1,
      "order_key": "call_1.note",
  },
  "data": {
    "note": "h",
  }
}

PUBLISH calls.call_1 {
  "subscription": "calls.call_1",
  "options": {
      "order": 3,
      "order_key": "call_1.note",
  },
  "data": {
    "note": "hello",
  }
}

PUBLISH calls.call_1 {
  "subscription": "calls.call_1",
  "options": {
      "order": 2,
      "order_key": "call_1.note",
  },
  "data": {
    "note": "hell",
  }
}

消息节流

可以在发布消息的 options 字典中使用 throttle 选项指定秒数来限制服务发布消息的速度。

对于短于限制周期的恒定消息流,客户端将立即接收到第一条消息,然后每隔限制周期接收一条消息,直到流结束,然后在另一个限制周期结束后发送最后一条消息。

可以使用可选的 throttle_key 选项指定多个独立的限制。限制是按订阅和会话进行的。

在以下示例中,如果三条消息同时发布,第一条将立即投递给订阅者,第二条将被忽略,第三条消息将在 100 毫秒后投递给订阅者。

PUBLISH calls.stats {
  "subscription": "calls.stats",
  "options": {
      "throttle" 0.1,
  },
  "data": {
    "n_calls": 1,
  }
}

PUBLISH calls.stats {
  "subscription": "calls.stats",
  "options": {
      "throttle" 0.1,
  },
  "data": {
    "n_calls": 2,
  }
}

PUBLISH calls.stats {
  "subscription": "calls.stats",
  "options": {
      "throttle" 0.1,
  },
  "data": {
    "n_calls": 3,
  }
}

使用模式

本节说明了如何使用 SocketShark 构建服务时的常见使用模式。

基于票据的身份验证用于基于会话的应用程序

大多数网络应用程序使用只包含 HTTP 的 cookie 存储用于认证的会话 ID。由于 WebSocket 连接是通过 JavaScript 初始化的,因此无法通过 cookie 访问会话 ID。为了便于 WebSocket 连接的认证,应使用一次性票据进行认证。

  • 在您的应用程序中实现一个公开的“ticket”端点。端点应验证用户的会话,并返回与用户会话 ID 关联的随机生成的短期票据。例如,可以计算 UUID4 票据并将其存储在 Redis 中,使用 SETEX 命令设置 30 秒的过期时间,其中键名对应于票据(UUID4),键值是用户的会话 ID。

  • 在您的应用程序中实现内部票据验证。端点应配置为 SocketShark 中的认证端点。它应检索并返回用户的会话 ID,同时使票据失效。还可以返回其他用户信息(例如,用户 ID)。应使用 Redis 管道检索和删除票据。

  • 当 JavaScript 代码连接到 SocketShark 时,它应首先通过公共票据端点请求票据,然后连接到 SocketShark 并使用获得的票据发出认证事件。

与微服务进行授权

假设用户可以从一组授权的组织ID访问产品。认证服务存储用户及其可访问的组织ID列表。产品服务存储具有相应组织ID的产品列表,但不知道用户是否有权访问特定组织(因此产品)。订阅是按产品进行的,并且只有在用户可以访问产品的组织时才应授权。为了在不要求服务直接通信的情况下解决这个问题,可以在SocketShark中使用额外字段。

  • 在SocketShark的认证配置中的 auth_fields 下添加用户ID,在产品服务的 extra_fields 下添加组织ID。

  • 在认证服务的授权端点返回用户ID。SocketShark将在所有后续请求中向服务端点提供它。

  • 当客户端订阅产品服务时(订阅示例:product.PROD_ID),它还必须作为额外字段提供产品的组织ID。

  • 为产品服务设置一个指向认证服务的 authorizer URL。如果给定用户有权访问给定组织,认证服务应授权订阅。由于授权器无法访问产品数据库,它不验证产品ID。

  • 为产品服务设置一个指向产品服务的 before_subscribe URL。如果订阅的产品ID与给定组织ID匹配,产品服务应允许订阅。由于组织ID已经通过授权器验证,因此不需要进一步验证。

项目详情


下载文件

下载适合您的平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源分布

socketshark-0.5.0.tar.gz (38.1 kB 查看哈希值)

上传时间

由以下机构支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面