大家好,我是冯哥~
最近看到各大厂裁员的消息,没办法。到了毕业季,总有一天你会分道扬镳。今天分享一些面试题,希望大家能得到更好的。offer!
来源:大数据真有趣
1)本地模式 ? Spark不一定要跑hadoop集群,可在本地指定多个线程。将Spark应用程序直接以多线程的方式在本地运行,通常是为了方便调试。本地模式分为三类 ? local:只启动一个executor ? local[k]:启动k个executor ? local[ * ]:启动跟cpu数目相同的 executor
2)standalone模式 ? 分布式部署集群,自带完整,资源管理和任务监控Spark这种模式也是其他模式的基础。
3)Spark on yarn模式 ? 分布式部署集群,监控资源和任务yarn但目前只支持粗粒度资源的分配,包括cluster和client运行模式,cluster适合生产,driver集群子节点运行,具有容错功能,client适合调试,dirver在客户端运行。
4)Spark On Mesos模式。? 官方推荐这种模式(当然,原因之一是血缘关系)。正是因为如此。Spark在开发之初,考虑到支持Mesos,因此,目前,Spark运行在Mesos会比运行在更好YARN它更灵活,更自然。
运行自己的应用程序可以选择两种调度模式之一:
(1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个应用程序组成Dirver和若干个Executor组成,其中,每一个Executor占用多个资源,可以在内部运行多个资源Task(对应多少slot”)。
在应用程序的所有任务正式运行之前,需要在运行环境中申请所有资源,并在运行过程中始终占用这些资源。即使没有,这些资源也应该在最终程序运行后回收。
(2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另一种调度模式:细粒度模式,类似于当前的云计算,思想是按需分配。
1)基于内存计算,减少低效磁盘交互;
2)基于高效调度算法的高效调度算法DAG;?
3)容错机制Linage,精华部分是DAG和Lingae
1)从 high-level 两者没有太大区别。都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 也许是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存为缓冲区,边缘 shuffle 边 aggregate 等数据 aggregate 好以后进行 reduce() (Spark 可能是后续的一系列操作)。?
2)从 low-level 从角度看,两者差别不小。Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样是 combine/reduce() 由于其输入数据可以通过外排处理大规模数据(mapper 先对每段数据进行排序,reducer 的 shuffle 合并排序的每段数据)。
目前的 Spark 默认选择是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,数据不会提前排序。如果用户需要排序数据,他们需要调用类似的数据 sortByKey() 如果你是Spark 1.1用户,可以spark.shuffle.manager设置为sort,对数据进行排序。在Spark 1.2中,sort作为默认Shuffle实现。?
3)从实现的角度来看,两者也有很多不同。Hadoop MapReduce 将处理过程分为几个明显的阶段:map(), spill, merge, shuffle, sort, reduce() 等。每一阶段各司其职,每一阶段的功能都可以按照过程编程思路逐一实现。在 Spark 没有这样一个功能明确的阶段,只有不同的阶段 stage 和一系列的 transformation(),所以 spill, merge, aggregate 需要包含其他操作 transformation() 中。如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 问题变成了怎样 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read处理逻辑?如何有效地实现两种处理逻辑?Shuffle write因为不需要有序的数据,shuffle write 任务非常简单:数据 partition 好,持久。一方面要降低内存存储空间的压力,另一方面也要持久 fault-tolerance。
① 构建Application,Driver创建一个SparkContext
② SparkContext向资源管理器(Standalone、Mesos、Yarn)申请Executor资源管理器启动StandaloneExecutorbackend(Executor)
③ Executor向SparkContext申请Task
④ SparkContext分发应用程序Executor
⑤ SparkContext就建成DAG图,DAGScheduler将DAG图解析成Stage,每个Stage有多个task,形成taskset发送给task Scheduler,由task Scheduler将Task发送给Executor运行
⑥ Task在Executor上运行,运行后释放所有资源
Spark调整比较复杂,但一般可以分为三个方面
1)平台层面整:防止不必要jar包分发,提高数据的本地性,选择高效的存储格式,如parquet
2)应用程序水平的优化:过滤操作符的优化减少了太小的任务,减少了单个记录的资源成本,处理数据倾斜,重用RDD缓存、并行执行作业等
3)JVM层次调优:设置合适的资源量,设置合理JVM,如启用高效的序列化方法kyro,增大off head内存等等
具体的task运行在那他机器上,dag划分stage确定时间
1)自动存储和切换内存和磁盘;
2)基于Lineage高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败自动进行特定次数的重试,且只计算失败的分片;
5)checkpoint和persist,数据计算后持久缓存;
6)数据调度弹性,DAG TASK调度与资源无关;
7)数据分片的高弹性。
1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的。所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读。
2)不支持增量迭代计算,Flink支持
从下面三点去展开
1)shuffle过程的划分
2)shuffle的中间结果如何存储
3)shuffle的数据如何拉取过来 可以参考这篇博文:http://www.cnblogs.com/jxhd1/p/6528540.html
1)客户端client向ResouceManager提交Application,ResouceManager接受Application并根据集群资源状况选取一个node来启动Application的任务调度器driver(ApplicationMaster)。
2)ResouceManager找到那个node,命令其该node上的nodeManager来启动一个新的 JVM进程运行程序的driver(ApplicationMaster)部分,driver(ApplicationMaster)启动时会首先向ResourceManager注册,说明由自己来负责当前程序的运行。
1)如果说HDFS是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式实时首选标准。
不一定,当数据规模小,Hash shuffle快于Sorted Shuffle数据规模大的时候;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。
1)如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传递数据的过程中reducer段,reduce会需要同时大量的记录进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃。
2)如果需要在分片内也进行排序,此时需要进行mapper段和reducer段的两次排序。
1)用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6,,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘;
2)如果持久化操作比较多,可以提高spark.storage.memoryFraction参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果shuffle的操作比较多,有很多的数据读写操作到JVM中,那么应该调小一点,节约出更多的内存给JVM,避免过多的JVM gc发生。在web ui中观察如果发现gc时间很长,可以设置spark.storage.memoryFraction更小一点。
Transformation(转化)算子和Action(执行)算子。
在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。
1)基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存 中的,然后Spark Streaming启动的job会去处理那些数据。
2)基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地 查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来 获取Kafka指定offset范围的数据。
1)使用程序中的集合创建rdd
2)使用本地文件系统创建rdd
3)使用hdfs创建rdd
4)基于数据库db创建rdd
5)基于Nosql创建rdd,如hbase
6)基于s3创建rdd
7)基于数据流,如socket创建rdd
spark并行度,每个core承载24个partition,如,32个core,那么64128之间的并行度,也就是设置64~128个partion,并行读和数据规模无关, 只和内存使用量和cpu使用时间有关。
将不能序列化的内容封装成object。
driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来, 抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。
会导致执行该job时候集群资源不足,导致执行job结束也没有分配足够的资源,分配了部分Executor,该job就开始执行task,应该是task的调度线程 和Executor资源申请是异步的;如果想等待申请完所有的资源再执行job的:需要将spark.scheduler.maxRegisteredResourcesWaitingTime设置的很大;spark.scheduler.minRegisteredResourcesRatio 设置为1,但是应该结合实际考虑,否则很容易出现长时间分配不到资源,job一直不能运行的情况。
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象。flatMap:对RDD每个元素转换,然后再扁平化。将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组对象,会抛弃值为null的值。
1)粗粒度:启动时就分配好资源, 程序启动,后续具体使用就使用分配好的资源,不需要再分配资源;优点:作业特别多时,资源复用率高,适合粗粒度;缺点:容易资源浪费,假如一个job有1000个task,完成了999个,还有一个没完成,那么使用粗粒度,999个资源就会闲置在那里,资源浪费。
2)细粒度分配:用资源的时候分配,用完了就立即回收资源,启动会麻烦一点,启动一次分配一次,会比较麻烦。
1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的入口点;
2)功能:负责向集群申请资源,向master注册信息,负责了作业的调度,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler, TaskScheduler。
可以画一个这样的技术栈图先,然后分别解释下每个组件的功能和场景 1)Spark core:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架, 是Spark的基础。
2)SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字) 进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业。
3)Spark sql:Shark是SparkSQL的前身,Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询, 同时进行更复杂的数据分析。
4)BlinkDB :是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度 被控制在允许的误差范围内。
5)MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。
6)GraphX是Spark中用于图和图并行计算。
主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头, 管理分配新进程,做计算的服务,相当于process服务。
需要注意的是:
1)worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work, 只有出现故障的时候才会发送资源。
2)worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。
Rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类, 但是都可以进行互相转换。rdd执行过程中会形成dag图,然后形成lineage保证容错性等。从物理的角度来看rdd存储的是block和node之间的映射。
RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency) 1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用 2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间 1)cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;2)executor执行的时候,默认60%做cache,40%做task操作,persist是最根本的函数,最底层的函数。
cache可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发cache。cache不是action操作。
不是,很多人都会以为是action,reduce rdd是action
1.rdd的属性
2.算子分为哪几类(RDD支持哪几种类型的操作)
3.创建rdd的几种方式
4.spark运行流程
5.Spark中coalesce与repartition的区别
6.sortBy 和 sortByKey的区别
7.map和mapPartitions的区别
8.数据存入Redis 优先使用map mapPartitions foreach foreachPartions哪个
9.reduceByKey和groupBykey的区别
10.cache和checkPoint的比较
11.spark streaming流式统计单词数量代码
12.简述map和flatMap的区别和应用场景
13.计算曝光数和点击数
14.分别列出几个常用的transformation和action算子
15.按照需求使用spark编写以下程序,要求使用scala语言
16.spark应用程序的执行命令是什么?
17.Spark应用执行有哪些模式,其中哪几种是集群模式
18.请说明spark中广播变量的用途
19.以下代码会报错吗?如果会怎么解决 val arr = new ArrayList[String]; arr.foreach(println)
20.写出你用过的spark中的算子,其中哪些会产生shuffle过程
21.Spark中rdd与partition的区别
22.请写出创建Dateset的几种方式
23.描述一下RDD,DataFrame,DataSet的区别?
24.描述一下Spark中stage是如何划分的?描述一下shuffle的概念
25.Spark 在yarn上运行需要做哪些关键的配置工作?如何kill -个Spark在yarn运行中Application
26.通常来说,Spark与MapReduce相比,Spark运行效率更高。请说明效率更高来源于Spark内置的哪些机制?请列举常见spark的运行模式?
27.RDD中的数据在哪?
28.如果对RDD进行cache操作后,数据在哪里?
29.Spark中Partition的数量由什么决定
30.Scala里面的函数和方法有什么区别
31.SparkStreaming怎么进行监控?
32.Spark判断Shuffle的依据?
33.Scala有没有多继承?可以实现多继承么?
34.Sparkstreaming和flink做实时处理的区别
35.Sparkcontext的作用
36.Sparkstreaming读取kafka数据为什么选择直连方式
37.离线分析什么时候用sparkcore和sparksql
38.Sparkstreaming实时的数据不丢失的问题
39.简述宽依赖和窄依赖概念,groupByKey,reduceByKey,map,filter,union五种操作哪些会导致宽依赖,哪些会导致窄依赖
40.数据倾斜可能会导致哪些问题,如何监控和排查,在设计之初,要考虑哪些来避免
41.有一千万条短信,有重复,以文本文件的形式保存,一行一条数据,请用五分钟时间,找出重复出现最多的前10条
42.现有一文件,格式如下,请用spark统计每个单词出现的次数
43.共享变量和累加器
44.当 Spark 涉及到数据库的操作时,如何减少 Spark 运行中的数据库连接数?
45.特别大的数据,怎么发送到excutor中?
46.spark调优都做过哪些方面?
47.spark任务为什么会被yarn kill掉?
48.Spark on Yarn作业执行流程?yarn-client和yarn-cluster有什么区别?
49.Flatmap底层编码实现?
50.spark_1.X与spark_2.X区别
51.说说spark与flink
52.spark streaming如何保证7*24小时运行机制?
53.spark streaming是Exactly-Once吗?