资讯详情

大数据高级开发工程师——Spark学习笔记(5)

文章目录

  • Spark内存计算框架
    • Spark Core
      • Spark读写HBase
        • 1. 通过newAPIHadoopRDD实现
        • 2. 通过spark on hbase实现
      • Spark序列化和反序列化
        • 1. transformation为什么操作需要序列化
        • 2. spark任务序列化异常
        • 3. spark序列化解决方案
        • 4. kyro序列化
      • 数据倾斜原理和现象分析
        • 1. 数据倾斜概述
        • 2. 数据倾斜发生时的现象
        • 3. 数据倾斜的原理
        • 4. 如何定位数据倾斜的原因
        • 5. 总结数据倾斜的原因
        • 6. 数据倾斜的后果
      • spark中数据倾斜的解决方案
        • 1. 解决方案1:使用Hive ETL预处理数据
        • 2. 解决方案2:过滤少数导致倾斜的过滤key
        • 3. 解决方案3:改进shuffle并行操作(效果差)
        • 4. 解决方案4:两阶段聚合(局部聚合) 全局聚合)
        • 5. 解决方案五:将reduce join转为map join
        • 6. 解决方案六:采样倾斜key并分拆join操作
        • 7. 解决方案7:使用随机前缀和扩容RDD进行join

Spark内存计算框架

Spark Core

Spark读写HBase

  • 我们可以通过 Spark 整合 HBase,实现通过 Spark 来读取 HBase 的数据。
  • 数据准备:创建 HBase 插入数据:
create 'spark_hbase','info' put 'spark_hbase','0001','info:name','tangseng' put 'spark_hbase','0001','info:age','30' put 'spark_hbase','0001','info:sex','0' put 'spark_hbase','0001','info:addr','beijing' put 'spark_hbase','0002','info:name','sunwukong' put 'spark_hbase','0002','info:age','508' put 'spark_hbase','0002','info:sex','0' put 'spark_hbase','0002','info:addr','shanghai' put 'spark_hbase','0003','info:name','zhubajie' put 'spark_hbase','0003','info:age','715' put 'spark_hbase','0003','info:sex','0' put 'spark_hbase','0003','info:addr','shenzhen' put 'spark_hbase','0004','info:name','bailongma' put 'spark_hbase','0004','info:age','1256' put 'spark_hbase','0004','info:sex','0' put 'spark_hbase','0004','info:addr','donghai' put 'spark_hbase','0005','info:name','shaheshang'
put 'spark_hbase','0005','info:age','1008'
put 'spark_hbase','0005','info:sex','0'
put 'spark_hbase','0005','info:addr','tiangong'

在这里插入图片描述

  • 创建输出 HBase 数据
create 'spark_hbase_out','info'

1. 通过newAPIHadoopRDD实现

  • 添加 pom.xml 依赖
<repositories>
  <!-- spark on hbase是cloudera提供的,所以这个地方添加了cdh仓库地址 -->
  <repository>
    <id>cloudera</id>
    <!-- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>-->
    <url>https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark</url>
  </repository>
</repositories>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.3</version>
</dependency>

<!-- hadoop-core 2009、7月更名为hadoop-common -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
<!-- <version>2.6.0-mr1-cdh5.14.2</version>-->
  <version>3.1.4</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-server</artifactId>
<!-- <version>1.2.0-cdh5.14.2</version>-->
  <version>2.2.6</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hbase</groupId>-->
<!-- <artifactId>hbase-common</artifactId>-->
<!-- <version>1.2.0-cdh5.14.2</version>-->
<!--&lt;!&ndash; <version>2.2.6</version>&ndash;&gt;-->
<!-- </dependency>-->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
<!-- <version>1.2.0-cdh5.14.2</version>-->
  <version>2.2.6</version>
</dependency>
<!-- 添加此依赖,解决spark hbase集成,低版本时报Base64找不到的问题 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-mapreduce</artifactId>
  <version>2.2.6</version>
</dependency>
<!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-spark</artifactId>
<!-- <version>1.2.0-cdh5.14.2</version>-->
  <version>2.1.0-cdh6.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.3.3</version>
</dependency>
  • 代码实现:
object Case09_SparkWithHBase { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 1. 创建HBase的环境参数
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "spark_hbase")

// // 2. 设置过滤器,还可以设置起始和结束rowkey
// val scan = new Scan
// scan.setFilter(new RandomRowFilter(0.5f))
// // 设置scan对象,让filter生效(序列化)
// hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

    // 3. 读取HBase数据,生成RDD
    val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])

    resultRDD.foreach(x => { 
        
      // 查询出来的结果集存在 (ImmutableBytesWritable, Result)第二个元素
      val result = x._2
      // 获取行键
      val rowKey = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
      val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
      println(rowKey + ":" + name + ":" + age)
    })

    // 4. 向HBase表写入数据
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "spark_hbase_out")
    val job = Job.getInstance(hbaseConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    // 5. 封装输出结果 resultRDD: RDD[(ImmutableBytesWritable, Result)]
    val outRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.mapPartitions(eachPartition => { 
        
      eachPartition.map(keyAndEachResult => { 
        
        val result = keyAndEachResult._2
        val rowKey = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
        val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))

        val put = new Put(Bytes.toBytes(rowKey))
        val immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))
        // 向 HBase 插入数据,需要 rowKey 和 put 对象
        (immutableBytesWritable, put)
      })
    })

    // 6. 调用API Output the RDD to any Hadoop-supported storage system with new Hadoop API
    outRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()
  }
}

2. 通过spark on hbase实现

  • 参考资料:
    • https://github.com/cloudera-labs/SparkOnHBase/blob/cdh5-0.0.2/src/main/scala/com/cloudera/spark/hbase/HBaseContext.scala
    • https://issues.apache.org/jira/browse/HBASE-13992
    • https://github.com/cloudera-labs/SparkOnHBase
  • 优势:
    • 无缝的使用 HBase Connection
    • 和 Kerberos 无缝集成
    • 通过 get 或 scan 直接生成 RDD
    • 利用 RDD 支持 HBase 的任何组合操作
    • 为通用操作提供简单的方法,同时通过 API 允许不受限制的未知高级操作
    • 支持 Java 和 Scala
    • 为 Spark 和 Spark Streaming 提供相似的 API
  • 由于 hbaseContext 是一个只依赖 hadoop、hbase、spark 的 jar 包的工具类,因此可以拿过来直接用
  • 添加依赖包:
<!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 -->
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-spark</artifactId>
  <version>2.1.0-cdh6.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.3.3</version>
</dependency>
  • 代码实现:
object Case10_SparkOnHBase { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val sc = new SparkContext(conf)

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "spark_hbase")

    val hbaseContext = new HBaseContext(sc, hbaseConf)
    val scan = new Scan()

    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = hbaseContext.hbaseRDD(TableName.valueOf("spark_hbase"), scan)

    hbaseRDD.map(eachResult => { 
        

      val result: Result = eachResult._2
      val rowKey = Bytes.toString(result.getRow)
      val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
      val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
      println(rowKey + ":" + name + ":" + age)
    }).foreach(println)
    sc.stop()
  }
}

Spark的序列化与反序列化

1. transformation操作为什么需要序列化

  • Spark 是分布式执行引擎,其核心抽象是弹性分布式数据集 RDD,其代表了分布在不同节点的数据。
  • Spark 的计算是在 Executor 上分布式执行的,故用户开发的关于 RDD 的 map、flatMap、reduceByKey 等 transformation 操作(闭包)有如下执行过程:
    • ① 代码中对象在 Driver 本地序列化;
    • ② 对象序列化后传输到远程 Executor 节点;
    • ③ 远程 Executor 节点反序列化对象;
    • ④ 最终远程节点执行。
  • 故对象在执行中需要序列化通过网络传输,则必须经过序列化过程。

2. spark的任务序列化异常

  • 在编写 spark 程序中,由于在 map、foreachPartition 等算子内部使用了外部定义的变量和函数,从而引发 Task 未序列化问题。
  • 然而 spark 算子在计算过程中使用外部变量在许多情形下确实在所难免:
    • 比如在 filter 算子根据外部指定的条件进行过滤;
    • map根据相应的配置进行变换。
  • 经常会出现“org.apache.spark.SparkException: Task not serializable”这个错误
    • 其原因就在于这些算子使用了外部的变量,但是这个变量不能序列化。
    • 当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现 Task 未序列化问题。

3. spark中解决序列化的方法

  • 如果函数中使用了该类对象,该类要实现序列化:类 extends Serializable
  • 如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化。
  • 对于不能序列化的成员变量使用==“@transient”==标注,告诉编译器不需要序列化。
  • 也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。
  • 可以把对象的创建直接在该函数中构建这样避免需要序列化。

4. kyro序列化

  • 在分布式应用中,经常会进行IO操作,传递对象,而网络传输过程中就必须要序列化。
  • Java序列化可以序列化任何类,比较灵活,但是相当慢,并且序列化后对象的提交也比较大。
  • Spark 出于性能考虑,在 2.0 以后,开始支持 kryo 序列化机制,速度是 Serializable 的 10 倍以上,当 RDD 在 Shuffle 数据的时候,简单数据类型,简单数据类型数组,字符串类型已经使用 kryo 来序列化。
  • 也可以通过 kyro 对我们需要序列化的对象,进行序列化标价
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
  • 举个例子:
case class MySearcher(val query: String) { 
        
  def getMatchRddByQuery(rdd: RDD[String]): RDD[String] = { 
        
    rdd.filter(x => x.contains(query))
  }
}

object Case11_Kyro { 
        
  def main(args: Array[String]): Unit = { 
        
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      // 替换默认序列化机制
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      // 注册需要使用kryo序列化自定义类
      .registerKryoClasses(Array(classOf[MySearcher]))
    val sc = new SparkContext(conf)

    val rdd1: RDD[String] = sc.parallelize(Array("hadoop yarn", "hadoop hdfs", "c"))
    val rdd2: RDD[String] = MySearcher("hadoop").getMatchRddByQuery(rdd1)
    rdd2.foreach(println)
  }
}

数据倾斜原理和现象分析

1. 数据倾斜概述

  • 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时 Spark 作业的性能会比期望差很多。
  • 数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证 Spark 作业的性能。

2. 数据倾斜发生时的现象

  • ① 绝大多数task执行得都非常快,但个别 task 执行极慢
    • 你的大部分的 task,都执行的特别快,很快就执行完了,剩下几个 task,执行的特别特别慢;
    • 前面的 task,一般 10s 可以执行完5个,最后发现某个task,要执行 1 个小时、2 个小时才能执行完一个 task;
    • 这个时候就出现数据倾斜了。这种方式还算好的,因为虽然老牛拉破车一样,非常慢,但是至少还能跑。
  • ② 绝大数 task 执行很快,有的 task 直接报OOM (Jvm Out Of Memory) 异常
    • 运行的时候,其他 task 都很快执行完了,也没什么特别的问题;
    • 但是有的 task,就是会突然间报了一个 OOM,内存溢出了,task failed、task lost、resubmitting task等日志异常信息。
    • 反复执行几次某个 task 就是跑不通,最后就挂掉。
    • 某个 task 就直接 OOM,那么基本上也是因为数据倾斜了,task 分配的数量实在是太大了!!!所以内存放不下,然后你的 task 每处理一条数据,还要创建大量的对象。内存爆掉了。

3. 数据倾斜发生的原理

  • 如上图所示:在进行任务计算 shuffle 操作的时候,第一个 task 和第二个 task 各分配到了 1 万条数据;需要 5 分钟计算完毕;第三个 task要 98万 条数据,98 * 5 = 490分钟 = 8个小时;
  • 本来另外两个 task 很快就运行完毕了(5分钟),第三个task数据量比较大,要 8 个小时才能运行完,就导致整个 spark 作业,也得 8 个小时才能运行完。最终导致整个 spark 任务计算特别慢。

4. 数据倾斜如何定位原因

  • 主要是根据log日志信息去定位:
    • 数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
    • 出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。因为某个或者某些 key 对应的数据,远远的高于其他的key。
  • 分析定位逻辑:
    • 由于代码中有大量的 shuffle 操作,一个 job 会划分成很多个 stage
    • 首先要看的,就是数据倾斜发生在第几个 stage 中。
    • 可以在任务运行的过程中,观察任务的 UI 界面,可以观察到每一个 stage 中运行的 task 的数据量,从而进一步确定是不是 task 分配的数据不均匀导致了数据倾斜。
    • 比如下图中,倒数第三列显示了每个 task 的运行时间。明显可以看到,有的 task 运行特别快,只需要几秒钟就可以运行完;而有的 task 运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。
    • 此外,倒数第一列显示了每个 task 处理的数据量,明显可以看到,运行时间特别短的 task 只需要处理几百 KB 的数据即可,而运行时间特别长的 task 需要处理几千 KB 的数据,处理的数据量差了 10 倍。此时更加能够确定是发生了数据倾斜。

    • 这种情况下去定位出问题的代码就比较容易了。
    • 建议直接看 yarn-client 模式下本地 log 的异常栈,或者是通过 YARN 查看 yarn-cluster 模式下的 log 中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有 shuffle 类算子,此时很可能就是这个算子导致了数据倾斜。
    • 但是需要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的 bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过 Spark Web UI 查看报错的那个 stage 的各个 task 的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。
    • 知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了 shuffle 操作并且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。
    • 这主要是为之后选择哪一种技术方案提供依据。针对不同的 key 分布与不同的 shuffle 算子组合起来的各种情况,可能需要选择不同的技术方案来解决。
    • 此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:
      • ① 如果是 Spark SQL 中的 group by、join 语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的 key 分布情况。
      • ② 如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 Spark 作业中加入查看 key 分布的代码,比如RDD.countByKey()。
    • 然后对统计出来的各个key出现的次数,collect/take 到客户端打印一下,就可以看到 key 的分布情况。
    • 举例来说,对于上面所说的单词计数程序,如果确定了是 stage1 的 reduceByKey 算子导致了数据倾斜,那么就应该看看进行reduceByKey 操作的 RDD 中的 key 分布情况,在这个例子中指的就是 pairs RDD。
    • 如下示例,我们可以先对 pairs 采样 10% 的样本数据,然后使用 countByKey 算子统计出每个 key 出现的次数,最后在客户端遍历和打印样本数据中各个 key的出现次数。
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

5. 数据倾斜原因总结

  • 数据本身问题
    • ① key 本身分布不均衡(包括大量的key为空)
    • ② key 的设置不合理
  • spark使用不当的问题
    • ① shuffle 时的并发度不够
    • ② 计算方式有误

6. 数据倾斜的后果

  • spark 中的 stage 的执行时间受限于最后那个执行完成的 task,因此运行缓慢的任务会拖垮整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。
  • 过多的数据在同一个task中运行,将会把 executor 内存撑爆,导致 OOM 内存溢出。

spark中数据倾斜的解决方案

1. 解决方案一:使用Hive ETL预处理数据

  • 适用场景:导致数据倾斜的是 Hive 表。如果该 Hive 表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用 Spark 对 Hive 表执行某个分析操作,那么比较适合使用这种技术方案。
  • 实现思路:此时可以评估一下,是否可以通过 Hive 来进行数据预处理(即通过 Hive ETL 预先对数据按照 key 进行聚合,或者是预先和其他表进行 join),然后在 Spark 作业中针对的数据源就不是原来的 Hive 表了,而是预处理后的 Hive 表。此时由于数据已经预先进行过聚合或 join 操作了,那么在 Spark 作业中也就不需要使用原先的 shuffle 类算子执行这类操作了。
  • 实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在 Spark 中执行 shuffle 类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以 Hive ETL 中进行 group by 或者 join 等 shuffle 操作时,还是会出现数据倾斜,导致 Hive ETL 的速度很慢。我们只是把数据倾斜的发生提前到了 Hive ETL 中,避免 Spark 程序发生数据倾斜而已。
  • 优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark 作业的性能会大幅度提升。
  • 缺点:治标不治本,Hive ETL 中还是会发生数据倾斜。
  • 实践经验:在一些 Java 系统与 Spark 结合使用的项目中,会出现 Java 代码频繁调用 Spark 作业的场景,而且对 Spark 作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上

标签: sc连接器挂掉的原因

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

 锐单商城 - 一站式电子元器件采购平台  

 深圳锐单电子有限公司