资讯详情

【Spark】(task7)PySpark Streaming入门

文章目录

  • 一、Spark Streaming入门
  • 二、Streaming 和 Structured Streaming区别
    • 2.1 流计算(Streaming)和批计算(Batch)
    • 2.2 Spark Streaming 和 Spark Structured Streaming
  • 三、基于Spark Streaming统计文本数栗子
  • 四、代码实践
  • Reference

一、Spark Streaming入门

在这里插入图片描述 spark streaming实时输入数据流(如上图所示kafka、HDFS、TCP socket的数据流等)的高吞吐量、容错性良好的数据处理,然后将处理完的数据推送到HDFS文件系统、数据库或dashboards上。在spark streaming机器学习或图处理算法可算法。spark streaming一般处理过程如下: 主流流流量计算工具有三种:

  • Storm延迟最低,一般为几毫秒到几十毫秒,但数据吞吐量较低,每秒可处理的事件在几十万左右,工成本较高。

  • Flink是目前国内互联网厂商主要使用的流量计算工具,延迟一般在几十到几百毫秒,数据吞吐量很高,每秒可以处理几百万事件,施工成本低。

  • Spark通过【Spark Streaming】或【Spark Structured Streaming】支持流量计算。

    • 但Spark流量计算是根据时间将流量数据分成小批次(mini-batch)延迟一般在1秒左右。吞吐量和Flink相当。
    • Spark Structured Streaming 现在也支持了Continous Streaming 模式,即数据到达时计算,但仍处于测试阶段,不是特别成熟。

二、Streaming 和 Structured Streaming区别

2.1 流计算(Streaming)和批计算(Batch)

批量计算或批量处理是处理离线数据。单处理数据量大,处理速度慢。

流量计算是处理在线实时生成的数据。单次处理的数据量较小,但处理速度较快。

2.2 Spark Streaming 和 Spark Structured Streaming

  • Spark在2.主要用于0之前Spark Streaming支持流量计算的数据结构模型是DStream,事实上,它是由小批量数据组成的RDD队列。

  • 目前,Spark主要推荐的流量计算模块是Structured Streaming,其数据结构模型为Unbounded DataFrame,即没有边界的数据表。

    • 相比于 Spark Streaming 建立在 RDD以上数据结构,Structured Streaming 是建立在 SparkSQL基础上,DataFrame的绝大部分API也能够用在流计算上,实现了流计算和批处理的一体化,并且由于SparkSQL性能更好,容错性更好。

三、基于Spark Streaming统计文本数栗子

场景数据服务器的场景:tcp socket统计接收数据中的文本数。

(1)创建StreamingContext。

from pyspark import SparkContext from pyspark.streaming import StreamingContext  # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1) 

(2)创建DStream,指定localhost的ip和port(即确定socket)。

# Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("localhost", 9999) 

DStream每个记录都是一行文本,现在需要根据句子中每个单词之间的空间分成单词:

# Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x   y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

以上只是transformation操作,接下来是action部分:

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

完整代码如下:

r""" Counts words in UTF8 encoded, '\n' delimited text received from the network every second. Usage: network_wordcount.py <hostname> <port> <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. To run this on your local machine, you need to first run a Netcat server `$ nc -lk 9999` and then run the example `$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999` """
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    # 创建StreamingContext
    ssc = StreamingContext(sc, 1)
    
    # 创建DStream,确定localhost和port
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    
    # action部分
    ssc.start()
    ssc.awaitTermination()

PS:跑上面代码前需要运行Netcat数据服务器,可以通过yum install -y nc下载Netcat(一个强大的网络工具);可以通过nc -lk 9999 测试使用,然后新开一个terminal,nc ip:9999

四、代码实践

  • 读取文件https://cdn.coggle.club/Pokemon.csv为textFileStream
  • 使用filter筛选行不包含Grass的文本
  • 使用flatmap对文本行进行拆分

Reference

[1] https://spark.apache.org/docs/latest/streaming-programming-guide.html

标签: 肖特基二极管ssc53l

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台