文章目录
- Spark内存计算框架
-
- Spark Core
-
- RDD 基本介绍
-
- 1. 什么是 RDD?
- 2. RDD五大特点
- 3. 基于词频统计剖析
- Spark中的算子
-
- 1. Transformation算子
- 2. Action算子
- 3. Shuffle算子
- RDD的创建方式
-
- 1. 通过现有的scala集合创建
- 2. 构建外部数据源
- 3. 从其他RDD新的转换RDD
- 常见算子介绍
-
- 1. map
- 2. mapPartitions
- 3. mapPartitionsWithIndex
- 4. filter
- 5. flatMap
- 6. coalesce(默认不存在shuffle)
- 7. repartition(存在shuffle)
- 8. glom
- 9. RDD交互(交并差笛拉)
- 10. distinct(存在shuffle)
- 11. sample
- 12. groupBy(存在shuffle)
- 13. sortBy(存在shuffle)
- 14. reduce(存在shuffle)
- 15. foreach、foreachPartition
- K-V常见类型算子介绍
-
- 1. partitionBy
- 2. 自定义分区器
- 3. reduceByKey
- 4. groupByKey
- 5. aggrateByKey
- 6. foldByKey
- 7. combineByKey
- 8. sortByKey
- 9. mapValues
- 10. join
- 11. cogroup
Spark内存计算框架
Spark Core
RDD 基本介绍
1. 什么是 RDD?
- Resilient Distributed DataSet:弹性分布式数据集Spark最基本的数据抽象可以理解为数据集合。
- :收集数据,存储大量数据。
- :RDD分布式存储内部元素,便于后期分布式计算。
- :表示弹性,RDD数据可以保存在内存或磁盘中。
- 它是代码中的抽象类,代表弹性、不可变、可分区,内部元素可并行计算。
2. RDD五大特点
-
- RDD分区很多,每个分区都包括在内RDD的部分数据
- 因为有多个分区,所以一个分区(Partition)列表可视为数据集的基本组成部分
- spark中任务是以task对于线程运行,RDD每个分区都将由一个计算任务处理, 一个分区对应一个task因此,分区决定了并行计算的粒度。
- 可创建用户RDD时,指定RDD如果没有指定分区数,则采用默认值(程序分配的CPU Core的数目)。
- 每个分配的存储是由的BlockManager每个分区都会实现逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。
-
- Spark中RDD计算以分区为单位,每个RDD都会实现compute实现这一目标的函数
-
- spark根据这一特点,任务的容错机制是(血统)而来。
- RDD每一个转换都会产生一个新的RDD,所以RDD在某些区域数据丢失时,会形成类似流水线的前后依赖关系,Spark丢失的分区数据可以通过这种依赖关系重新计算,而不是对RDD重新计算所有分区。
- ④
- 只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner。
- 非key-value的RDD(RDD[String])的Parititioner的值是None。
- Partitioner不但决定了RDD也决定了分区的数量parent RDD Shuffle输出时的分区数量
- 当前Spark实现了两种分区函数
- 以哈希为基础HashPartitioner,(key.hashcode % 分区数 = 分区号),是默认值。
- 另一个是基于范围的RangePartitioner。
-
- 比如一个HDFS就文件而言,这个列表保存每个列表Partition所在文件块的位置。
- 按照移动数据不如移动计算的理念,Spark在任务调度过程中,计算任务将尽可能分配到要处理的数据块的存储位置,以减少数据的网络传输,提高计算效率。
3. 基于词频统计分析
- 需求:HDFS上面有一个3000的大小M文件,通过spark实现文件单词统计,最终保存结果数据HDFS上。
- 代码实现:
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")
- 流程分析:
Spark中的算子
- Transformation 算子,也称转换算子,在没有 Action 算子(也称行动算子)去触发的时候,是不会执行的,可以理解为懒算子,而Action算子可以理解为触发算子。
- 还有一种 Shuffle类 算子,就是上面说到的洗牌算子。
1. Transformation算子
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 | |
返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 | |
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) | |
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] | |
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] | |
对源RDD和参数RDD求并集后返回一个新的RDD | |
对源RDD和参数RDD求交集后返回一个新的RDD | |
对源RDD进行去重后返回一个新的RDD | |
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD | |
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 | |
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD | |
与sortByKey类似,但是更灵活 | |
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD | |
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD | |
减少 RDD 的分区数到指定值。 | |
重新给 RDD 分区 | |
重新给 RDD 分区,并且每个分区内以记录的 key 排序 |
2. Action算子
reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。 | |
在驱动程序中,以数组的形式返回数据集的所有元素 | |
返回RDD的元素个数 | |
返回RDD的第一个元素(类似于take(1)) | |
返回一个由数据集的前n个元素组成的数组 | |
返回自然顺序或者自定义顺序的前 n 个元素 | |
将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 | |
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 | |
将数据集的元素,以 Java 序列化的方式保存到指定的目录下 | |
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 | |
在数据集的每一个元素上,运行函数func | |
在数据集的每一个分区上,运行函数func |
3. Shuffle算子
- 去重
def distinct()
def distinct(numPartitions: Int)
- 聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
- 排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
- 重分区
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
- 集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
RDD的创建方式
1. 通过已存在的scala集合创建
object RddDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
/* 传入分区数为1,结果顺序打印;传入分区数大于1,结果顺序不定,因为数据被打散在N个分区里 */
val rdd1: RDD[Int] = sc.parallelize(1.to(10), 1)
print("rdd1->:")
rdd1.foreach(x => print(x + "\t"))
println("")
val rdd2 = sc.parallelize(List(1, 2, 3, 4, 5), 2)
print("rdd2->:")
rdd2.foreach(x => print(x + "\t"))
println("")
val rdd3 = sc.parallelize(Array("hadoop", "hive", "spark"), 1)
print("rdd3->:")
rdd3.foreach(x => print(x + "\t"))
println("")
val rdd4 = sc.makeRDD(List(1, 2, 3, 4), 2)
print("rdd4->:")
rdd4.foreach(x => print(x + "\t"))
println("")
sc.stop()
}
}
2. 加载外部数据源构建
- 比如:WordCount 案例,读取 textFile 就是这种用法。
- 读取 json 文件
object RddDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd1: RDD[String] = sc.textFile(this.getClass.getClassLoader.getResource("word.json").getPath)
val rdd2: RDD[Option[Any]] = rdd1.map(JSON.parseFull(_))
rdd2.foreach(println)
}
}
- 读取 Object 对象文件
object RddDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5))
rdd1.saveAsObjectFile("hdfs://node01:8020/test")
val rdd2: RDD[Nothing] = sc.objectFile("hdfs://node01:8020/test")
rdd2.foreach(println)
}
}
3. 从其他RDD转换得到新的RDD
- 案例一:
object RddDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
/* map算子,一共有多少元数会执行多少次,和分区数无关,可以修改分区数进行测试 */
val rdd: RDD[Int] = sc.parallelize(1.to(5), 1)
val mapRdd: RDD[Int] = rdd.map(x => {
println("执行") // 一共被执行5次
x * 2
})
val result: Array[Int] = mapRdd.collect()
result.foreach(x => print(x + "\t"))
}
}
// 结果输出
执行
执行
执行
执行
执行
2 4 6 8 10
- 案例二:
object RddDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
/* mapPartitions算子,一个分区内处理,有几个分区就执行几次,优于map函数 常用于时间转换,数据库连接 */
val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
val mapRdd: RDD[Int] = rdd.mapPartitions(it => {
println("执行") // 分区2次,共打印2次
it.map(x => x * 2)
})
val result: Array[Int] = mapRdd.collect()
result.foreach(x => print(x + "\t"))
}
}
// 结果输出
执行
执行
2 4 6 8 10 12 14 16 18 20
- 案例三:
object RddDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(RddDemo.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
/* mapPartitionsWithIndex算子,一个分区内处理,有几个分区就执行几次 返回带有分区号的结果集 */
val rdd: RDD[Int] = sc.parallelize(1.to(10), 2)
val mapRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((idx, it) => {
println("执行") // 执行两次
it.map((idx, _))
})
val result: Array[(Int, Int)] = mapRdd.collect()
result.foreach(x => print(x + "\t"))
}
}
// 结果输出
执行
执行
(0,1) (0,2) (0,3) (0,4) (0,5) (1,6) (1,7) (1,8) (1,9) (1,10)