资讯详情

大数据高级开发工程师——Spark学习笔记(2)

文章目录

  • Spark内存计算框架
    • Spark Core
      • RDD 基本介绍
        • 1. 什么是 RDD?
        • 2. RDD五大特点
        • 3. 基于词频统计剖析
      • Spark中的算子
        • 1. Transformation算子
        • 2. Action算子
        • 3. Shuffle算子
      • RDD的创建方式
        • 1. 通过现有的scala集合创建
        • 2. 构建外部数据源
        • 3. 从其他RDD新的转换RDD
      • 常见算子介绍
        • 1. map
        • 2. mapPartitions
        • 3. mapPartitionsWithIndex
        • 4. filter
        • 5. flatMap
        • 6. coalesce(默认不存在shuffle)
        • 7. repartition(存在shuffle)
        • 8. glom
        • 9. RDD交互(交并差笛拉)
        • 10. distinct(存在shuffle)
        • 11. sample
        • 12. groupBy(存在shuffle)
        • 13. sortBy(存在shuffle)
        • 14. reduce(存在shuffle)
        • 15. foreach、foreachPartition
      • K-V常见类型算子介绍
        • 1. partitionBy
        • 2. 自定义分区器
        • 3. reduceByKey
        • 4. groupByKey
        • 5. aggrateByKey
        • 6. foldByKey
        • 7. combineByKey
        • 8. sortByKey
        • 9. mapValues
        • 10. join
        • 11. cogroup

Spark内存计算框架

Spark Core

RDD 基本介绍

1. 什么是 RDD?

  • Resilient Distributed DataSet:弹性分布式数据集Spark最基本的数据抽象可以理解为数据集合。
    • :收集数据,存储大量数据。
    • :RDD分布式存储内部元素,便于后期分布式计算。
    • :表示弹性,RDD数据可以保存在内存或磁盘中。
  • 它是代码中的抽象类,代表弹性、不可变、可分区,内部元素可并行计算。

2. RDD五大特点

在这里插入图片描述

    • RDD分区很多,每个分区都包括在内RDD的部分数据
    • 因为有多个分区,所以一个分区(Partition)列表可视为数据集的基本组成部分
    • spark中任务是以task对于线程运行,RDD每个分区都将由一个计算任务处理, 一个分区对应一个task因此,分区决定了并行计算的粒度。
    • 可创建用户RDD时,指定RDD如果没有指定分区数,则采用默认值(程序分配的CPU Core的数目)。
    • 每个分配的存储是由的BlockManager每个分区都会实现逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。
    • Spark中RDD计算以分区为单位,每个RDD都会实现compute实现这一目标的函数
    • spark根据这一特点,任务的容错机制是(血统)而来。
    • RDD每一个转换都会产生一个新的RDD,所以RDD在某些区域数据丢失时,会形成类似流水线的前后依赖关系,Spark丢失的分区数据可以通过这种依赖关系重新计算,而不是对RDD重新计算所有分区。
    • 只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner。
    • 非key-value的RDD(RDD[String])的Parititioner的值是None
    • Partitioner不但决定了RDD也决定了分区的数量parent RDD Shuffle输出时的分区数量
    • 当前Spark实现了两种分区函数
      • 以哈希为基础HashPartitioner,(key.hashcode % 分区数 = 分区号),是默认值。
      • 另一个是基于范围的RangePartitioner。
    • 比如一个HDFS就文件而言,这个列表保存每个列表Partition所在文件块的位置。
    • 按照移动数据不如移动计算的理念,Spark在任务调度过程中,计算任务将尽可能分配到要处理的数据块的存储位置,以减少数据的网络传输,提高计算效率。

3. 基于词频统计分析

  • 需求:HDFS上面有一个3000的大小M文件,通过spark实现文件单词统计,最终保存结果数据HDFS上。
  • 代码实现:
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out") 
  • 流程分析:

Spark中的算子

  • Transformation 算子,也称转换算子,在没有 Action 算子(也称行动算子)去触发的时候,是不会执行的,可以理解为懒算子,而Action算子可以理解为触发算子。
  • 还有一种 Shuffle类 算子,就是上面说到的洗牌算子。

1. Transformation算子

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
对源RDD和参数RDD求并集后返回一个新的RDD
对源RDD和参数RDD求交集后返回一个新的RDD
对源RDD进行去重后返回一个新的RDD
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
与sortByKey类似,但是更灵活
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
减少 RDD 的分区数到指定值。
重新给 RDD 分区
重新给 RDD 分区,并且每个分区内以记录的 key 排序

2. Action算子

reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
在驱动程序中,以数组的形式返回数据集的所有元素
返回RDD的元素个数
返回RDD的第一个元素(类似于take(1))
返回一个由数据集的前n个元素组成的数组
返回自然顺序或者自定义顺序的前 n 个元素
将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
将数据集的元素,以 Java 序列化的方式保存到指定的目录下
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
在数据集的每一个元素上,运行函数func
在数据集的每一个分区上,运行函数func

3. Shuffle算子

  • 去重
def distinct()
def distinct(numPartitions: Int)
  • 聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
  • 排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
  • 重分区
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
  • 集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

RDD的创建方式

1. 通过已存在的scala集合创建

object RddDemo { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    /* 传入分区数为1,结果顺序打印;传入分区数大于1,结果顺序不定,因为数据被打散在N个分区里 */
    val rdd1: RDD[Int] = sc.parallelize(1.to(10), 1)
    print("rdd1->:")
    rdd1.foreach(x => print(x + "\t"))
    println("")

    val rdd2 = sc.parallelize(List(1, 2, 3, 4, 5), 2)
    print("rdd2->:")
    rdd2.foreach(x => print(x + "\t"))
    println("")

    val rdd3 = sc.parallelize(Array("hadoop", "hive", "spark"), 1)
    print("rdd3->:")
    rdd3.foreach(x => print(x + "\t"))
    println("")

    val rdd4 = sc.makeRDD(List(1, 2, 3, 4), 2)
    print("rdd4->:")
    rdd4.foreach(x => print(x + "\t"))
    println("")

    sc.stop()
  }
}

2. 加载外部数据源构建

  • 比如:WordCount 案例,读取 textFile 就是这种用法。
  • 读取 json 文件
object RddDemo { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.textFile(this.getClass.getClassLoader.getResource("word.json").getPath)
    val rdd2: RDD[Option[Any]] = rdd1.map(JSON.parseFull(_))
    rdd2.foreach(println)
  }
}
  • 读取 Object 对象文件
object RddDemo { 
        
    def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
    rdd1.saveAsObjectFile("hdfs://node01:8020/test")

    val rdd2: RDD[Nothing] = sc.objectFile("hdfs://node01:8020/test")
    rdd2.foreach(println)
  }
}

3. 从其他RDD转换得到新的RDD

  • 案例一:
object RddDemo { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)

    /* map算子,一共有多少元数会执行多少次,和分区数无关,可以修改分区数进行测试 */
    val rdd: RDD[Int] = sc.parallelize(1.to(5), 1)
    val mapRdd: RDD[Int] = rdd.map(x => { 
        
      println("执行") // 一共被执行5次
      x * 2
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t"))
  }
}

// 结果输出
执行
执行
执行
执行
执行
2	4	6	8	10
  • 案例二:
object RddDemo { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)

    /* mapPartitions算子,一个分区内处理,有几个分区就执行几次,优于map函数 常用于时间转换,数据库连接 */
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val mapRdd: RDD[Int] = rdd.mapPartitions(it => { 
        
      println("执行") // 分区2次,共打印2次
      it.map(x => x * 2)
    })
    val result: Array[Int] = mapRdd.collect()
    result.foreach(x => print(x + "\t"))
  }
}

// 结果输出
执行
执行
2	4	6	8	10	12	14	16	18	20
  • 案例三:
object RddDemo { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)

    /* mapPartitionsWithIndex算子,一个分区内处理,有几个分区就执行几次 返回带有分区号的结果集 */
    val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
    val mapRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((idx, it) => { 
        
      println("执行") // 执行两次
      it.map((idx, _))
    })

    val result: Array[(Int, Int)] = mapRdd.collect()
    result.foreach(x => print(x + "\t"))
  }
}

// 结果输出
执行
执行
(0,1)	(0,2)	(0,3)	(0,4)	(0,5)	(1,6)	(1,7)	(1,8)	(1,9)	(1,10)

标签: 02重载连接器he

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台