资讯详情

Spark Streaming 键值操作

部门团建去游山玩水,我在家看书,硬啃Spark Streaming,你应该知道眼前的幸福是短暂的,提高你的能力是正确的方式。就像玩游戏一样,有些人可以赚钱,但有些人只是花时间~

:把DStream每个键的值分组计算每个用户的评论数量,最后根据评论数量对作者进行排序~

    val topAuthors = comments.map(rec => ((parse(rec) \ "author").values.toString, 1))       .groupByKey()       .map(r => (r._2.sum, r._1))       .transform(rdd => rdd.sortByKey(ascending = false))     topAuthors.print(3) 

结果如下:[deleted]发表2571条评论,lupin_sansei发表154条评论,mnemonicsloth发表104条评论~

(2571,[deleted]) (154,lupin_sansei) (104,mnemonicsloth) ... 

:reduce它意味着减少。这个函数使用更多,可以视为一个函数groupByKey,然后是一个聚合操作,我可以根据评论的数量对作者进行排序~

    val topAuthors2 = comments.map(rec => ((parse(rec) \ "author").values.toString, 1))       .reduceByKey(_   _)       .map(r => (r._2, r._1))       .transform(rdd => rdd.sortByKey(ascending = false))     topAuthors2.print(3) 

结果如下:以上groupByKey操作一毛~

(2571,[deleted]) (154,lupin_sansei) (104,mnemonicsloth) ... 

:根据Key”将“Value组合起来。

  1. createCombiner:创建组合器函数
  2. mergeValue:每个分区的函数累函数
  3. mergeCombiner:两个累加器的函数跨多个分区合并
  4. partitioner:使用的分区函数
  • 每当遇到新的Key,就调用createCombiner生成新组合器实例的方法
  • 然后遇到,调用mergeValue方法累加Key的值
  • 一旦耗尽一切Key,将调用mergeCombiner跨多个分区合并这些累加器的方法

最后第一个map计算用户评论的平均长度;第二map操作,改变长度和用户名的顺序;transform调用sortByKey对Key,也就是说,逆序输出是评论的平均长度~

    val topAuthorsByAvgContent = comments.map(rec => ((parse(rec) \ "author").values.toString, (parse(rec) \ "body").values.toString.split(" ").length))       .combineByKey(         (v) => (v, 1),//createCombiner:创建组合器函数         //mergeValue:累加每个分区的值的函数         (accValue: (Int, Int), v) => (accValue._1   v, accValue._2   1),         //mergeCombiner:两个累加器的函数跨多个分区合并         (accCombine1: (Int, Int), accCombine2: (Int, Int)) => (accCombine1._1   accCombine2._1, accCombine1._2   accCombine2._2),         //partitioner:使用的分区函数         new HashPartitioner(ssc.sparkContext.defaultParallelism))       .map({ case (k, v) => (k, v._1 / v._2.toFloat) })       .map(r => (r._2, r._1))       .transform(rdd => rdd.sortByKey(ascending = false))     topAuthorsByAvgContent.print(3) 

结果如下:jlaudun平均长度为551个单词,alaris的评论的平均长度是432个单词,glw907评论的平均长度是378个单词~

(551.0,jlaudun) (432.0,alaris) (378.0,glw907) ... 

完整代码:

943104702fe2dc952d34fcebdf6f4ded.png
package org.apress.prospark  import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} import org.apache.hadoop.io.{IntWritable, LongWritable, Text} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark.streaming.dstream.DStream import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} import org.apache.spark.streaming.dstream.PairDStreamFunctions import org.apache.log4j.LogManager import org.json4s._ import org.json4s.native.JsonMethods._ import java.text.SimpleDateFormat import java.util.Date  object Study3Dstream {   def main(args: Array[String]) {     val inputPath = "C:\\study\\pro_spark_streaming\\3\\bz\\"      val conf = new SparkConf()       .setAppName("AILX10")       .setJars(SparkContext.jarOfClass(this.getClass).toSeq)       .setMaster("local")      val ssc = new StreamingContext(conf, Seconds(1))      val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)      val topAuthors = comments.map(rec => ((parse(rec) \ "author").values.toString, 1))       .groupByKey()       map(r => (r._2.sum, r._1))
      .transform(rdd => rdd.sortByKey(ascending = false))
    topAuthors.print(3)

    val topAuthors2 = comments.map(rec => ((parse(rec) \ "author").values.toString, 1))
      .reduceByKey(_ + _)
      .map(r => (r._2, r._1))
      .transform(rdd => rdd.sortByKey(ascending = false))
    topAuthors2.print(3)

    val topAuthorsByAvgContent = comments.map(rec => ((parse(rec) \ "author").values.toString, (parse(rec) \ "body").values.toString.split(" ").length))
      .combineByKey(
        (v) => (v, 1),
        (accValue: (Int, Int), v) => (accValue._1 + v, accValue._2 + 1),
        (accCombine1: (Int, Int), accCombine2: (Int, Int)) => (accCombine1._1 + accCombine2._1, accCombine1._2 + accCombine2._2),
        new HashPartitioner(ssc.sparkContext.defaultParallelism))
      .map({ case (k, v) => (k, v._1 / v._2.toFloat) })
      .map(r => (r._2, r._1))
      .transform(rdd => rdd.sortByKey(ascending = false))
    topAuthorsByAvgContent.print(3)

    ssc.start()
    ssc.awaitTermination()

  }
}

本篇完,谢谢大家~

标签: glw20液体流量传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台