跳转到主要内容

一个流式中心。有点像。

项目描述

https://coveralls.io/repos/github/betodealmeida/senor-octopus/badge.svg?branch=main Cirrus CI - Base Branch Build Status https://badge.fury.io/py/senor-octopus.svg PyPI - Python Version

他们说只有两种工作:你要么把信息从一个地方移动到另一个地方,要么把物质从一个地方移动到另一个地方。

Señor Octopus是一个移动数据的应用程序。它读取一个YAML配置文件,该文件描述了如何连接节点。例如,你可能希望每小时测量你的互联网速度并将其存储在数据库中

speedtest:
  plugin: source.speedtest
  flow: -> db
  schedule: @hourly

db:
  plugin: sink.db.postgresql
  flow: speedtest ->
  user: alice
  password: XXX
  host: localhost
  port: 5432
  dbname: default

节点通过flow属性连接。speedtest节点连接到db节点,因为它指向它

speedtest:
  flow: -> db

另一方面,db节点监听来自speedtest节点的事件

db:
  flow: speedtest ->

如果我们想使节点连接到所有其他节点,可以使用*作为通配符,或者指定节点列表

speedtest:
  flow: -> db, log

db:
  flow: "* ->"

请注意,在YAML中,我们需要引号来引用以星号开头的属性。

运行Señor Octopus

您可以将上述配置保存到名为speedtest.yaml的文件中,并运行

$ pip install senor-octopus
$ srocto speedtest.yaml

每小时speedtest源节点将运行,并将结果发送到db目标节点,该节点将它们写入Postgres数据库。

这些结果看起来像什么?

事件

Señor Octopus使用一个非常简单但灵活的数据模型来移动数据。我们有名为源节点的节点,它们创建一个事件流,每个事件都像这样

class Event(TypedDict):
    timestamp: datetime
    name: str
    value: Any

事件与其关联的时间戳、名称和值。请注意,值可以是任何内容!

一个会产生一系列事件。在上面的示例中,每小时一次,speedtest 源会产生如下事件

[
    {
        'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812083, tzinfo=datetime.timezone.utc),
        'name': 'hub.speedtest.download',
        'value': 16568200.018792046,
    },
    {
        'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812966, tzinfo=datetime.timezone.utc),
        'name': 'hub.speedtest.upload',
        'value': 5449607.159468643,
    },
    {
        'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 820369, tzinfo=datetime.timezone.utc),
        'name': 'hub.speedtest.client',
        'value': {
            'ip': '173.211.12.32',
            'lat': '37.751',
            'lon': '-97.822',
            'isp': 'Colocation America Corporation',
            'isprating': '3.7',
            'rating': '0',
            'ispdlavg': '0',
            'ispulavg': '0',
            'loggedin': '0',
            'country': 'US',
        }
    },
    ...
]

事件会被发送到接收器,接收器会消费这些流。在这个例子中,db 接收器将接收事件并将它们存储在Postgres数据库中。

事件驱动源

在上面的例子中,我们配置了 speedtest 源每小时运行一次。不过,并不是所有源都需要调度。我们可以有一个源,它监听 MQTT 中的一个特定主题,例如

mqtt:
  plugin: source.mqtt
  flow: -> db
  topics:
    - "srocto/feeds/#"
  host: localhost
  port: 1883
  username: bob
  password: XXX
  message_is_json: true

上面的源会在主题通配符 srocto/feeds/# 中出现新消息时立即向 db 节点发送事件,因此可以将消息写入数据库——这是一个将消息队列持久化到磁盘的超级简单方法!

批量事件

上面的例子不是很高效,因为每次事件到达时都会写入数据库。相反,我们可以轻松地将事件进行批量处理,这样它们就可以在队列中累积,例如每5分钟处理一次。

db:
  plugin: sink.db.postgresl
  flow: speedtest, mqtt ->
  batch: 5 minutes
  user: alice
  password: XXX
  host: localhost
  port: 5432
  dbname: default

使用 batch 参数,任何传入的事件都会在配置的时间内存储在队列中,并由接收器一起处理。如果 srocto 优雅地终止(例如,使用 ctrl+C),队列中的任何挂起事件仍将被处理。

过滤事件

Señor Octopus 的许多灵活性来自于第三种类型的节点,即 过滤器。过滤器不仅可以用来过滤数据,还可以用来格式化数据。例如,假设我们想在日落时打开一些灯光。每当日落或日出发生时,sun 源会发送具有“sunset”或“sunrise”值的事件

{
    'timestamp': ...,
    'name': 'hub.sun',
    'value': 'sunset',
}

tuya 接收器可以用来控制智能开关,但为了打开它,它期望一个类似于以下的事件

{
    'timestamp': ...,
    'name': ...,
    'value': 'on',
}

我们可以使用 jinja 过滤器忽略“sunrise”事件,并将“sunset”值转换为“on”

sun:
  plugin: source.sun
  flow: -> sunset
  latitude: 38.3
  longitude: -123.0

sunset:
  plugin: filter.jinja
  flow: sun -> lights
  template: >
    {% if event['value'] == 'sunset' %}
      on
    {% endif %}

lights:
  plugin: sink.tuya
  flow: sunset ->
  device: "Porch lights"
  email: charlie@example.com
  password: XXX
  country: "1"
  application: smart_life

使用这种配置,sunset 过滤器将丢弃任何没有“sunset”值的任何事件。对于那些具有“sunset”值的事件,其值将被替换为字符串“on”,以便在 lights 节点中激活灯光。

限制事件

有时我们想限制接收器消耗的事件数量。例如,假设我们想使用 Señor Octopus 通过 Awair Element 监测空气质量,当得分低于某个阈值时,发送给我们短信。我们希望短信最多每30分钟发送一次,并且只在早上8点到晚上10点之间发送。

以下是实现方法

awair:
  plugin: source.awair
  flow: -> bad_air
  schedule: 0/10 * * * *
  access_token: XXX
  device_type: awair-element
  device_id: 12345

bad_air:
  plugin: filter.jinja
  flow: awair -> sms
  template: >
    {% if
       event['timestamp'].astimezone().hour >= 8 and
       event['timestamp'].astimezone().hour <= 21 and
       event['name'] == 'hub.awair.score' and
       event['value'] < 80
    %}
      Air quality score is low: {{ event['value'] }}
    {% endif %}

sms:
  plugin: sink.sms
  flow: bad_air ->
  throttle: 30 minutes
  account_sid: XXX
  auth_token: XXX
  from: "+18002738255"
  to: "+15558675309"

在上面的例子中,awair 源会每10分钟获取空气质量数据,并将其发送到 bad_air。过滤器会检查小时,以防止晚上10点到早上8点发送短信,并检查空气质量得分——如果得分低于80,它将重新格式化事件的值,例如

“空气质量得分低:70”

然后,它被发送到 sms 接收器,该接收器有一个30分钟的 throttle 设置。该节流配置将防止接收器每30分钟运行一次,以避免在得分保持低的情况下频繁发送消息。

插件

Señor Octopus 支持越来越多的插件,并且添加新的插件很简单。每个插件都是一个简单的函数,用于产生、处理或消费流。

以下是 random 源,它产生随机数

async def rand(events: int = 10, prefix: str = "hub.random") -> Stream:
    for _ in range(events):
        yield {
            "timestamp": datetime.now(timezone.utc),
            "name": prefix,
            "value": random.random(),
        }

这是 jinja 过滤器的完整源代码。

async def jinja(stream: Stream, template: str) -> Stream:
    _logger.debug("Applying template to events")
    tmpl = Template(template)
    async for event in stream:
        value = tmpl.render(event=event)
        if value:
            yield {
                "timestamp": event["timestamp"],
                "name": event["name"],
                "value": value,
            }

这是 sms 沉淀。

async def sms(
    stream: Stream, account_sid: str, auth_token: str, to: str, **kwargs: str
) -> None:
    from_ = kwargs["from"]
    client = Client(account_sid, auth_token)
    async for event in stream:
        _logger.debug(event)
        _logger.info("Sending SMS")
        client.messages.create(body=str(event["value"]).strip(), from_=from_, to=to)

如您所见,源是一个异步生成器,它产生事件。过滤器接收带有额外配置参数的流,并返回一个流。而沉淀接收一个带有额外参数的流,不返回任何内容。

当前源插件包括:

过滤器

现有的过滤器非常相似,主要区别在于它们的配置方式。

沉淀

以下是当前沉淀:

  • sink.log:将事件发送到记录器。

  • sink.mqtt:将事件作为消息发送到 MQTT 主题。

  • sink.pushover:将事件发送到 Pushover 移动应用。

  • sink.slack:将消息发送到 Slack 频道。

  • sink.sms:通过 Twilio 发送短信。

  • sink.tuya:向 Tuya/Smart Life 设备发送命令。

项目详情


下载文件

下载适合您平台的文件。如果您不确定要选择哪个,请了解更多关于 安装包 的信息。

源分布

senor-octopus-0.2.0.tar.gz (65.3 kB 查看哈希值)

上传时间 源代码

构建版本

senor_octopus-0.2.0-py2.py3-none-any.whl (37.3 kB 查看哈希值)

上传时间 Python 2 Python 3

由以下支持