跳转到主要内容

响应式数据流图

项目描述

Reactive-Dataflow

Python的响应式处理图。

入门指南

安装

pip install reactivedataflow

此项目的关键依赖包括 rxnetworkx。这些在 pyproject.toml 依赖项部分中概述。

用法

import reactivex as rx
from reactivedataflow import (
	GraphAssembler, 
	GraphModel, 
	VerbNodeModel, 
	InputNodeModel, 
	InputModel,
	verb,
	InputPort,
	ConfigPort
)
#
# Define a processing verb
#
@verb(
	name="print",
	ports=[
		InputPort(name="values", required=True, parameter="val"),
		ConfigPort(name="prefix", required=False, parameter="prefix"),
	]
)
def print_verb(val, prefix=""):
	return f"{prefix}{val}"

#
# Define a simple graph
#
assembler = GraphAssembler()
assembler.load(
	GraphModel(
		inputs=[
			# This is an input stream of values we'll define on build
			InputNodeModel(id="input_values")
		],
		nodes=[
			# Here we define the processing nodes
			VerbNodeModel(
				id="verb1",
				verb="print",
				config={"prefix": "emitted: "},
				input={
					"values": InputModel(node="input_values")
				}
			),
		],
	),
)

#
# Build the graph and bind input streams
#
graph = assembler.build(
	inputs={
		"input_values": rx.of([1, 2, 3])
	}
)
#
# Watch graph outputs
#
graph.output("verb1").subscribe(print)
# Output:
# emitted: 1
# emitted: 2
# emitted: 3

开发

此项目使用 poetry 进行依赖项管理。您应该在系统上安装最新的Python版本(例如3.10+)和Poetry 1.8+。

# Install dependencies
poetry install

# Run tests
poetry run poe test

# Run static checks
poetry run poe check

项目详情


下载文件

下载适合您平台的文件。如果您不确定该选择哪个,请了解更多关于安装包的信息。

源代码分发

reactivedataflow-0.1.17.tar.gz (20.5 kB 查看哈希值)

上传时间 源代码

构建分发

reactivedataflow-0.1.17-py3-none-any.whl (32.3 kB 查看哈希值)

上传时间 Python 3

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF 赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面