Spark Structured Streaming多线程在IPython笔记本中
项目描述
Spark在IPython笔记本中的多线程。
现在在Jupyter笔记本中执行Spark Structured Streaming变得简单
安装
pip install nbthread_spark --process-dependency-links
使用
显示停止和UI的Spark按钮
from nbthread_spark.spark import SparkRunner
spark = SparkRunner.builder.getOrCreate() # same as original SparkSession
## you will see buttons ;)
给定一个Socket流
TCP_IP = "localhost"
TCP_PORT = 9005
from pyspark.sql.functions import from_json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType
schema = StructType([
StructField("bip", IntegerType(), True),
StructField("is_on", IntegerType(), True)
])
spark = SparkSession \
.builder \
.appName("IOTStreamApp") \
.getOrCreate()
iot_stream = spark \
.readStream \
.format("socket") \
.option("host", TCP_IP) \
.option("port", TCP_PORT) \
.load()
iot_expanded = iot_stream.withColumn('value_json',
from_json('value', schema)
).drop('value').select('value_json.*')
query = iot_expanded \
.writeStream \
.outputMode("update") \
.format("memory") \
.queryName("iot_table") \
.start()
您可以使用此运行查询
from nbthread_spark.stream import StreamRunner
runner = StreamRunner(query)
runner.controls()
## you will see buttons ;)
runner.start() # start without controls
runner.status() # show stream status
runner.stop() # stop streaming and thread
对于流管理器,您可以轻松控制大量流
from nbthread_spark.manager import StreamManager
sm = StreamManager()
sm.append(runner)
sm.append(runner1)
sm.append(runner2)
sm.all_controls()
## you will see all buttons from streams ;)
sm.start_all() # start all streams
sm.stop_all() # stop all streams
特别感谢
此处 是为此模块做出贡献的学生名单。
项目详情
关闭
nbthread_spark-0.0.6.tar.gz的哈希
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ce72ffb1492317162ebbaaeecc0a2140f111b108a33d9c000109e57a252d4e16 |
|
MD5 | d459619eadea51877c05ab2efd1b9fda |
|
BLAKE2b-256 | f97f14eb10642c82974337987ae0fbb93d1e56d6b160b83605102a1208c6a23a |