跳转到主要内容

Spark Structured Streaming多线程在IPython笔记本中

项目描述

Spark在IPython笔记本中的多线程。

现在在Jupyter笔记本中执行Spark Structured Streaming变得简单

安装

pip install nbthread_spark --process-dependency-links

使用

显示停止和UI的Spark按钮

from nbthread_spark.spark import SparkRunner

spark = SparkRunner.builder.getOrCreate() # same as original SparkSession

## you will see buttons ;)

给定一个Socket流

TCP_IP = "localhost"
TCP_PORT = 9005

from pyspark.sql.functions import from_json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType

schema = StructType([
    StructField("bip", IntegerType(), True),
    StructField("is_on", IntegerType(), True)
])

spark = SparkSession \
    .builder \
    .appName("IOTStreamApp") \
    .getOrCreate()

iot_stream = spark \
    .readStream \
    .format("socket") \
    .option("host", TCP_IP) \
    .option("port", TCP_PORT) \
    .load()

iot_expanded = iot_stream.withColumn('value_json',
                                    from_json('value', schema)
                                    ).drop('value').select('value_json.*')

query = iot_expanded \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("iot_table") \
    .start()

您可以使用此运行查询

from nbthread_spark.stream import StreamRunner

runner = StreamRunner(query)

runner.controls()
## you will see buttons ;)

runner.start() # start without controls

runner.status() # show stream status

runner.stop() # stop streaming and thread

对于流管理器,您可以轻松控制大量流

from nbthread_spark.manager import StreamManager

sm = StreamManager()

sm.append(runner)
sm.append(runner1)
sm.append(runner2)

sm.all_controls()
## you will see all buttons from streams ;)

sm.start_all() # start all streams

sm.stop_all() # stop all streams

特别感谢

此处 是为此模块做出贡献的学生名单。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解更多关于安装包的信息。

源分布

nbthread_spark-0.0.6.tar.gz (3.0 kB 查看哈希)

上传时间:

由以下支持

AWS AWS 云计算和安全赞助商 Datadog Datadog 监控 Fastly Fastly CDN Google Google 下载分析 Microsoft Microsoft PSF赞助商 Pingdom Pingdom 监控 Sentry Sentry 错误记录 StatusPage StatusPage 状态页面