一个流式中心。有点像。
项目描述
他们说只有两种工作:你要么把信息从一个地方移动到另一个地方,要么把物质从一个地方移动到另一个地方。
Señor Octopus是一个移动数据的应用程序。它读取一个YAML配置文件,该文件描述了如何连接节点。例如,你可能希望每小时测量你的互联网速度并将其存储在数据库中
speedtest:
plugin: source.speedtest
flow: -> db
schedule: @hourly
db:
plugin: sink.db.postgresql
flow: speedtest ->
user: alice
password: XXX
host: localhost
port: 5432
dbname: default
节点通过flow属性连接。speedtest节点连接到db节点,因为它指向它
speedtest:
flow: -> db
另一方面,db节点监听来自speedtest节点的事件
db:
flow: speedtest ->
如果我们想使节点连接到所有其他节点,可以使用*作为通配符,或者指定节点列表
speedtest:
flow: -> db, log
db:
flow: "* ->"
请注意,在YAML中,我们需要引号来引用以星号开头的属性。
运行Señor Octopus
您可以将上述配置保存到名为speedtest.yaml的文件中,并运行
$ pip install senor-octopus
$ srocto speedtest.yaml
每小时speedtest源节点将运行,并将结果发送到db目标节点,该节点将它们写入Postgres数据库。
这些结果看起来像什么?
事件
Señor Octopus使用一个非常简单但灵活的数据模型来移动数据。我们有名为源节点的节点,它们创建一个事件流,每个事件都像这样
class Event(TypedDict):
timestamp: datetime
name: str
value: Any
事件与其关联的时间戳、名称和值。请注意,值可以是任何内容!
一个源会产生一系列事件。在上面的示例中,每小时一次,speedtest 源会产生如下事件
[
{
'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812083, tzinfo=datetime.timezone.utc),
'name': 'hub.speedtest.download',
'value': 16568200.018792046,
},
{
'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812966, tzinfo=datetime.timezone.utc),
'name': 'hub.speedtest.upload',
'value': 5449607.159468643,
},
{
'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 820369, tzinfo=datetime.timezone.utc),
'name': 'hub.speedtest.client',
'value': {
'ip': '173.211.12.32',
'lat': '37.751',
'lon': '-97.822',
'isp': 'Colocation America Corporation',
'isprating': '3.7',
'rating': '0',
'ispdlavg': '0',
'ispulavg': '0',
'loggedin': '0',
'country': 'US',
}
},
...
]
事件会被发送到接收器,接收器会消费这些流。在这个例子中,db 接收器将接收事件并将它们存储在Postgres数据库中。
事件驱动源
在上面的例子中,我们配置了 speedtest 源每小时运行一次。不过,并不是所有源都需要调度。我们可以有一个源,它监听 MQTT 中的一个特定主题,例如
mqtt:
plugin: source.mqtt
flow: -> db
topics:
- "srocto/feeds/#"
host: localhost
port: 1883
username: bob
password: XXX
message_is_json: true
上面的源会在主题通配符 srocto/feeds/# 中出现新消息时立即向 db 节点发送事件,因此可以将消息写入数据库——这是一个将消息队列持久化到磁盘的超级简单方法!
批量事件
上面的例子不是很高效,因为每次事件到达时都会写入数据库。相反,我们可以轻松地将事件进行批量处理,这样它们就可以在队列中累积,例如每5分钟处理一次。
db:
plugin: sink.db.postgresl
flow: speedtest, mqtt ->
batch: 5 minutes
user: alice
password: XXX
host: localhost
port: 5432
dbname: default
使用 batch 参数,任何传入的事件都会在配置的时间内存储在队列中,并由接收器一起处理。如果 srocto 优雅地终止(例如,使用 ctrl+C),队列中的任何挂起事件仍将被处理。
过滤事件
Señor Octopus 的许多灵活性来自于第三种类型的节点,即 过滤器。过滤器不仅可以用来过滤数据,还可以用来格式化数据。例如,假设我们想在日落时打开一些灯光。每当日落或日出发生时,sun 源会发送具有“sunset”或“sunrise”值的事件
{
'timestamp': ...,
'name': 'hub.sun',
'value': 'sunset',
}
tuya 接收器可以用来控制智能开关,但为了打开它,它期望一个类似于以下的事件
{
'timestamp': ...,
'name': ...,
'value': 'on',
}
我们可以使用 jinja 过滤器忽略“sunrise”事件,并将“sunset”值转换为“on”
sun:
plugin: source.sun
flow: -> sunset
latitude: 38.3
longitude: -123.0
sunset:
plugin: filter.jinja
flow: sun -> lights
template: >
{% if event['value'] == 'sunset' %}
on
{% endif %}
lights:
plugin: sink.tuya
flow: sunset ->
device: "Porch lights"
email: charlie@example.com
password: XXX
country: "1"
application: smart_life
使用这种配置,sunset 过滤器将丢弃任何没有“sunset”值的任何事件。对于那些具有“sunset”值的事件,其值将被替换为字符串“on”,以便在 lights 节点中激活灯光。
限制事件
有时我们想限制接收器消耗的事件数量。例如,假设我们想使用 Señor Octopus 通过 Awair Element 监测空气质量,当得分低于某个阈值时,发送给我们短信。我们希望短信最多每30分钟发送一次,并且只在早上8点到晚上10点之间发送。
以下是实现方法
awair:
plugin: source.awair
flow: -> bad_air
schedule: 0/10 * * * *
access_token: XXX
device_type: awair-element
device_id: 12345
bad_air:
plugin: filter.jinja
flow: awair -> sms
template: >
{% if
event['timestamp'].astimezone().hour >= 8 and
event['timestamp'].astimezone().hour <= 21 and
event['name'] == 'hub.awair.score' and
event['value'] < 80
%}
Air quality score is low: {{ event['value'] }}
{% endif %}
sms:
plugin: sink.sms
flow: bad_air ->
throttle: 30 minutes
account_sid: XXX
auth_token: XXX
from: "+18002738255"
to: "+15558675309"
在上面的例子中,awair 源会每10分钟获取空气质量数据,并将其发送到 bad_air。过滤器会检查小时,以防止晚上10点到早上8点发送短信,并检查空气质量得分——如果得分低于80,它将重新格式化事件的值,例如
“空气质量得分低:70”
然后,它被发送到 sms 接收器,该接收器有一个30分钟的 throttle 设置。该节流配置将防止接收器每30分钟运行一次,以避免在得分保持低的情况下频繁发送消息。
插件
Señor Octopus 支持越来越多的插件,并且添加新的插件很简单。每个插件都是一个简单的函数,用于产生、处理或消费流。
以下是 random 源,它产生随机数
async def rand(events: int = 10, prefix: str = "hub.random") -> Stream:
for _ in range(events):
yield {
"timestamp": datetime.now(timezone.utc),
"name": prefix,
"value": random.random(),
}
这是 jinja 过滤器的完整源代码。
async def jinja(stream: Stream, template: str) -> Stream:
_logger.debug("Applying template to events")
tmpl = Template(template)
async for event in stream:
value = tmpl.render(event=event)
if value:
yield {
"timestamp": event["timestamp"],
"name": event["name"],
"value": value,
}
这是 sms 沉淀。
async def sms(
stream: Stream, account_sid: str, auth_token: str, to: str, **kwargs: str
) -> None:
from_ = kwargs["from"]
client = Client(account_sid, auth_token)
async for event in stream:
_logger.debug(event)
_logger.info("Sending SMS")
client.messages.create(body=str(event["value"]).strip(), from_=from_, to=to)
如您所见,源是一个异步生成器,它产生事件。过滤器接收带有额外配置参数的流,并返回一个流。而沉淀接收一个带有额外参数的流,不返回任何内容。
源
当前源插件包括:
source.awair:从 Awair Element 监测器获取空气质量数据。
source.crypto:从 cryptocompare.com 获取加密货币价格。
source.mqtt:订阅一个或多个 MQTT 主题的消息。
source.rand:生成介于 0 和 1 之间的随机数。
source.speed:测量互联网速度。
source.sqla:从数据库读取数据。
source.static:生成静态事件。
source.stock:从 Yahoo! Finance 获取股票价格。
source.sun:在日出和日落时发送事件。
source.udp:监听指定端口的 UDP 消息。
source.weatherapi:从 weatherapi.com 获取天气预报数据。
source.whistle:获取 Whistle 宠物追踪器的设备信息和位置。
过滤器
现有的过滤器非常相似,主要区别在于它们的配置方式。
filter.format:使用 Python 字符串格式化格式化事件流。
filter.jinja:将 Jinja2 模板应用于事件。
filter.jsonpath:根据 JSON 路径过滤事件流。
沉淀
以下是当前沉淀:
sink.log:将事件发送到记录器。
sink.mqtt:将事件作为消息发送到 MQTT 主题。
sink.pushover:将事件发送到 Pushover 移动应用。
sink.slack:将消息发送到 Slack 频道。
sink.sms:通过 Twilio 发送短信。
sink.tuya:向 Tuya/Smart Life 设备发送命令。
项目详情
senor-octopus-0.2.0.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | 1d4648d138cb2939b21886b57eb5ee7472a1a908dba99fc618fdbac429b851bf |
|
MD5 | eff474b6bdfbef39ca9ac98dc1ea5ad1 |
|
BLAKE2b-256 | bb86b824939331a03634dddf003ebf83f5ec1ad2107187defb885426f479628e |
senor_octopus-0.2.0-py2.py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | c2595ae467e14dd34fc9e4b5cb253a1e5651a89ad1492488cba76ea19301957f |
|
MD5 | e4f185ff955a1211652e0204a74bf48e |
|
BLAKE2b-256 | cda5831ee65bc32006a894ea72bef94a1fae38d73dd010814da6aafaba486cdc |