跳转到主要内容

基于Nats的事件驱动后台任务和微服务框架。

项目描述

walnats

Nats-基于事件驱动的后台任务和微服务框架。

功能

  • 事件驱动。
  • 100%类型安全。
  • 不可变且易于测试。
  • 显式API,无魔法。
  • 发布者和订阅者之间有严格的分离。
  • 基于Asyncio。
  • Nats驱动。
  • 精确一次投递。
  • 智能且可配置的重试。
  • 许多集成。
  • 请求/回复。
  • 定期任务。
  • 内置检查器。
  • 生成AsyncAPI规范和EventSourcing图表的生成器。
  • 支持CloudEvents。
  • 详细的文档。
  • 兼容。您可以使用walnats为非walnats服务发出事件或消费由非walnats服务发出的事件。该工具足够灵活,可以适应您使用的任何消息格式。

阅读 文档 以开始使用。

与其他工具相比

与其他大型Python后台作业框架(如celerydramatiqrqhuey等)相比,从实现角度来看,walnats较新,因此从一开始就有机会围绕现代技术进行设计。具体来说,由mypy驱动的类型安全、由async/await驱动的并发,以及由nats驱动的持久性和分布式。

与所有其他Python后台作业框架(包括基于async/await的新框架,如arqpytask-ioaiotasks)相比,主要区别在于walnats是事件驱动的。在这些框架中,作业调度在概念上是一个网络上的函数调用,而在walnats中,发布者会发出事件,任何订阅者都可以在任何时候订阅这些事件。这种方法被称为"告诉,不要问"。

例如,当您的网店向客户发送包裹时,您不需要像使用Celery那样直接调用send_emailsend_sms等actor,而是使用walnats发布者发出一个parcel-sent事件,该事件将由walnats传递给所有感兴趣的actor。这为您带来了一些好处

  1. 发布者只进行一次网络请求。
  2. 当您添加新的actor时,您不需要修改发布者。这对于微服务架构来说特别酷,因为发布者和actor可以是不同团队拥有的不同服务。
  3. 更容易推理。当您开发微服务时,您只需要知道有哪些事件可以订阅和发出您自己的事件,而不必过多考虑系统中的其他服务如何处理这些事件。
  4. 更容易观察。walnats直接将事件转换为Nats主题,将actor转换为Nats JetStream消费者。因此,任何Nats可观察性工具都将为您揭示系统中的情况。

如果您有一个大型分布式系统,walnats是您的选择。如果您只想从Django单体或小型爱好项目中发送后台邮件,您可能会发现其他框架更适合。

最后,与您直接使用nats.py从头开始编写服务相比,walnats在处理故障、负载峰值和边缘情况方面做得更好。walnats是"为失败而设计"的。分布式系统很困难,您不应该独自踏上这段旅程。

安装

python3 -m pip install walnats

walnats 30秒内完成

创建一个包含事件的模块,它应该在服务之间共享

import walnats

COUNT = walnats.Event('counts', int)
#                 name ⤴  type ⤴

创建发布者(一个生成事件的服务的服务)

import asyncio
import walnats
from .events import COUNT

async def run() -> None:
    events = walnats.Events(COUNT)
    async with events.connect() as conn:
        await conn.register()
        #     ↑ create Nats JetStream streams
        for value in range(1000):
            await conn.emit(COUNT,  value)
            #         event ⤴  payload ⤴
            print(f'emitted {value}')
            await asyncio.sleep(1)

asyncio.run(run())

创建订阅者(一个监听事件的服务的服务)

import asyncio
import walnats
from .events import COUNT

async def run() -> None:
    registry = walnats.Actors(
        walnats.Actor('print', COUNT,    print),
        #         name ⤴  event ⤴  handler ⤴
    )
    async with registry.connect() as conn:
        await conn.register()
        #     ↑ create Nats JetStream consumers
        await conn.listen()
        #     ↑ start all actors

asyncio.run(run())

这就对了!现在您可以运行这些服务了

  1. 运行发布者:python3 -m pub
  2. 在另一个终端窗口中运行订阅者:python3 -m sub
  3. 运行另一个订阅者以查看工作如何在多个实例之间分配:python3 -m sub
  4. 停止一个订阅者(ctrl+c)以查看没有消息丢失。

此代码可在examples/readme_demo中找到。

阅读文档以了解更多信息。

项目详情


下载文件

下载适合您平台的文件。如果您不确定选择哪个,请了解更多关于 安装包 的信息。

源代码分发

walnats-1.2.6.tar.gz (614.7 kB 查看哈希值)

上传时间 源代码

构建分发

walnats-1.2.6-py3-none-any.whl (46.9 kB 查看哈希值)

上传时间 Python 3

由...