资讯详情

Spark(四)— Spark Streaming

Spark(四)— Spark Streaming

      • 一.概述
      • 二.使用
        • 2.1 基础环境 (wordcount测试)
        • 2.2 DStream输出
        • 2.3 自定义收集器 — 对接Kafka
        • 2.4 DStream - 有状态转化
        • 2.5 DStream - 无状态操作 Transform
        • 2.6 DStream - 无状态操作 join
        • 2.7 滑动窗常用函数
          • 2.7.1 window
          • 2.7.2 countByWindow
          • 2.7.3 reduceByWindow
          • 2.7.4 reduceByKeyAndWindow - 1
          • 2.7.5 reduceByKeyAndWindow - 2
        • 2.8 优雅关闭DStream

一.概述

流式数据处理:逐个处理 批量数据处理:一起处理多个数据

实时处理数据:数据处理的延迟为毫秒级 离线数据处理:数据处理的延迟是小时甚至每天

Spark Streaming处理流式数据。Spark Streaming有许多数据输入源支持,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等。可在数据输入后使用 Spark 的高度抽象原语如:map、reduce、join、window 等待操作。结果可以保存在很多地方,比如 HDFS,数据库等

在这里插入图片描述 和Spark 基于 RDD 概念非常相似,Spark Streaming使用离散流(discretized stream) 称为抽象表示DStream。DStream是随时间而收到的数据的序列。在内部,每个时间间隔收到的数据都被用作RDD 存在,而 DStream是由这些RDD 组成的序列(因此得名为离散化)。所以简单来说,

Spark Streaming是一个准实时的数据处理框架(数据处理延迟在秒或分钟),微批次(数据处理几秒钟)

Spark 1.如果用户想限制之前版本的5,Receiver设置静态准备参数的数据接收率"spark.streaming.receiver.maxRate"为了实现这一这一举措可以通过限制接收率来适应当前的处理能力,防止内存溢出,但也会引入其他问题。比如:producer数据生产高于 maxRate,目前的集群处理能力也高于目前的集群处理能力maxRate,这将导致资源利用率下降等问题

为了更好的协调数据接收速率与资源处理能力,1.5 版本开始Spark Streaming集群数据处理能力可以动态控制数据接收率。背压机制(即Spark Streaming Backpressure):

通过属性"spark.streaming.backpressure.enabled"控制是否启用backpressure机制,默认值false,即不启用

二.使用

2.1 基础环境 (wordcount测试)

添加依赖

<dependency>  <groupId>org.apache.spark</groupId>  <artifactId>spark-streaming_2.12</artifactId>  <version>3.0.0</version> </dependency> 

因为我们是流式处理,,我们监控了当地的9999端口,并通过netcat发送数据,模拟流式数据

object WcDemo { 
           def main(args: Array[String]): Unit = { 
             //1.初始化 Spark 配置信息     val sparkCof = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext 采集周期为3秒
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 业务处理.....
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val words: DStream[(String, Int)] = line.flatMap(_.split("")).map((_, 1))

    val value: DStream[(String, Int)] = words.reduceByKey(_ + _)

    value.print()



    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }
}

对数据的操作也是按照RDD 为单位来进行的

2.2 DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD 中的惰性求值类似,

  • print():在运行流程序的驱动结点上打印DStream 中每一批次数据的。这用于开发和调试

  • saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个DStream的内容

  • saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为SequenceFiles

  • saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files

  • :这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个RDD中数据推送到外部系统,如将RDD 存入文件或者通过网络将其写入数据库 通用的输出操作foreachRDD(),它用来对DStream 中的 RDD 运行任意计算。这和 transform() 有些类似,都可以让我们访问任意RDD。在 foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中

2.3 自定义采集器 — 对接Kafka

使用 API 指定主题后和属性后,为了后续方便,我们会将接收到的数据封装到样例类

,这个策略会将分区(kafka的分区)分布到所有可获得的executor上。如果你的executor和kafkabroker在同一主机上的话,可以使用PreferBrokers,这样kafka leader会为此分区进行调度。最后,如果你加载数据有倾斜的话可以使用PreferFixed,这将允许你制定一个分区和主机的映射(没有指定的分区将使用PreferConsistent 策略)

,你可以使用它去订阅一个topics集合

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Stream")
      val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))

      // kafka消费者配置
      val kafkaParam = Map(
        "bootstrap.servers" -> "hadoop102:9092" ,
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "1000",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (true: java.lang.Boolean)
      )
      // 泛型是接收的kafka数据K-V
      val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set("topicx"), kafkaParam))
      // 封装点击数据到样例类
      val clickData: DStream[AdClickData] = kafkaDataDS.map(data => { 
        
        val datas: Array[String] = data.value().split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      })

      

      ssc.start()
      ssc.awaitTermination()
    }

    case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
  }
}

2.4 DStream - 有状态转化

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window 相关的原语

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如reduceByKey())要添加import StreamingContext._才能在 Scala中使用

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的

例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据

虽然都是hello,但是是无状态的,也是说,这两个统计是没有关系的,,下图,第一个三秒告诉我们hello为1,第二个3秒告诉我们hello为2,各个周期数据独立

需要DS的,注意原语只是个名称,类似RDD的算子,实际上就是方法而已

原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,

updateStateByKey() 的结果会是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的,

updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

1.定义状态,状态可以是一个任意的数据类型 2.定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新

以下程序中,由于ByKey操作已经帮我们区分好了key,所以键就是一个seq集合来记录每个单词的count,缓冲区就是一个记录总和的变量

object WcDemo { 
        
  def main(args: Array[String]): Unit = { 
        
    //1.初始化 Spark 配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext 采集周期为3秒
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")


    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val words: DStream[(String, Int)] = line.flatMap(_.split("")).map((_, 1))

    val value: DStream[(String, Int)] = words.updateStateByKey((seq: Seq[Int], opt: Option[Int]) => { 
        
      val newcount = opt.getOrElse(0) + seq.sum
      Option(newcount)
    })

    value.print()

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }
}

2.5 DStream - 无状态操作 Transform

先看代码,我们监视9999端口一行输入一个word,进行wordcount测试,有两种方法

val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

// 方法一 耳详能熟

// ** Driver代码
line.map(str =>{ 
        
	// 这里的代码只可以运行在Executor
  (str,1)
}).reduceByKey(_+_)

// 方法二 使用Transform原语 

// ** Driver代码 
val res: DStream[(String, Int)] = line.transform(rdd => { 
        
  println("我是Driver端的代码")  // 这里可以运行Driver端代码
  rdd.map(str => { 
        
  // 这里的代码只可以运行在Executor
    (str, 1)
  })
}).reduceByKey(_ + _)

也就是说,

2.6 DStream - 无状态操作 join

一句话:两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相通,类似MYSQL的内连接

object JoinTest { 
        
  def main(args: Array[String]): Unit = { 
        
    //1.初始化 Spark 配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext 采集周期为3秒
    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(3))

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val line1: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)

    val map9999: DStream[(String, Int)] = line.map((_, 1))

    val map8888: DStream[(String, Int)] = line1.map((_, 1))

    val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)

    joinDS.print()

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }
}

2.7 滑动窗口常用函数

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态,

  • 窗口时长:计算内容的时间范围

  • 滑动步长:隔多久触发一次计算

2.7.1 window

示例:window(Seconds(10),Seconds(5)) 5秒处理一次过去10秒的数据

注意:这两者都必须为采集周期大小的整数倍

object Window { 
        
  def main(args: Array[String]): Unit = { 
        
    //1.初始化 Spark 配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

    //2.初始化 SparkStreamingContext 采集周期为5秒
    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val windowData: DStream[String] = line.window(Seconds(10),Seconds(5))

    val wordToOne: DStream[(String, Int)] = windowData.map((_, 1))
    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

    wordToCount.print()

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()
  }

}

当窗口时长大于滑动步长时,

当窗口时长等于滑动步长时,

2.7.2 countByWindow

返回一个滑动窗口计数流中的元素个数

示例:countByWindow(Seconds(10), Seconds(5)) 每5秒计算过去10秒内的元素个数

2.7.3 reduceByWindow

reduceByWindow(func, windowLength, slideInterval) 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流

val count: DStream[String] = line.reduceByWindow(_ + _, Seconds(5), Seconds(5))

每5秒对5秒内的元素进行拼接

2.7.4 reduceByKeyAndWindow - 1

当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值,这里的函数必须写完整

2.7.5 reduceByKeyAndWindow - 2

这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于可逆的 reduce 函数,也就是这些 reduce函数有相应的函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置

重点 根据图片看这两种原语的区别

假设我们就输入这些数据

第一次计算 10秒内数据是 (a,1) (b,1) (c,1) (d,1)

第二次计算 (c,1) (e,1) (d,1) (a,1)

第三次计算 啥都没有 …

第一次计算 10秒内数据是 (a,1) (b,1) (c,1) (d,1)

第二次计算 (c,1) (e,1) (d,1) (a,1)

第三次计算 (c,0) (b,0) (e,0) (d,0) (a,0)

2.8 优雅关闭DStream

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了 使用外部文件系统来控制内部程序关闭

这里第二个参数的意思是,处理完当前已经接收的数据后再关闭

ssc.stop(true,true)

但是我们应该注意到

 // 启动采集器
ssc.start()
// 等待采集器的关闭
ssc.awaitTermination()  // 阻塞main线程

ssc.stop(true,true)

ssc.awaitTermination() 阻塞当前线程,也就是说stop根本执行不到

最好的办法是创建一个线程,根据外部文件的标志位来结束

class StopM (ssc: StreamingContext) extends Runnable{ 
        
  override def run(): Unit = { 
        

    val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "gzhu")

    while (true) { 
         
       try { 
        
         Thread.sleep(5000)
       }catch { 
        
         case e: InterruptedException => e.printStackTrace()
       }
      
      val state: StreamingContextState = ssc.getState
      val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark")) 
      if (bool) { 
        
        if (state == StreamingContextState.ACTIVE) { 
         ssc.stop(stopSparkContext = true, stopGracefully = true)
          System.exit(0)
        }
      }
    }
  }
}

object Resume { 
        
  def main(args: Array[String]): Unit = { 
        
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("cp", () => { 
        
      //1.初始化 Spark 配置信息
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

      //2.初始化 SparkStreamingContext 采集周期为3秒
      val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

      ssc
    })

    ssc.checkpoint("cp")

    // 启动采集器
    ssc.start()
    // 等待采集器的关闭
    ssc.awaitTermination()

    new Thread(new StopM(ssc)).start()
  }
}

标签: 肖特基二极管ssc53l

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台