Spark知识点
一、基本特征
1、spark与MapReduce的不同
:mapreduce以磁盘维护为基础, 磁盘IO而且,序列化成本高 spark基于内存的维护 ,基于DAG计算模型,会减少Shaffer过程即磁盘IO减少。
:spark多线程运行任务,mapreduce这是过程操作任务。启动和关闭过程需要一定的时间。
:mapreduce一般都是on yarn模式;spark可以local部署、standalone 部署以及为on yarn模式
:MR离线计算只能做,spark离线计算可以实时计算、机器学习等
2、spark集群安装部署
- vim spark-env.sh #配置java的环境变量 #配置zk相关信息
- vim slaves 指定spark集群的worker节点
- vim /etc/profile 修改spark环境变量
- 环境变量生效 source /etc/profile
- 启动: 1、先启动zk ${ZK_HOME}/bin/zkServer.sh start 2、启动spark集群 $SPARK_HOME/sbin/start-all.sh
- 在高可用模式下,整个模式spark有很多集群master,只有一个master被zk活着的选举master,其他的多个master都处于standby,同时把整个spark通过集群的元数据信息zk保存中节点。
- 如果活着的。首先zk感觉活着master挂断,开始在多个地方standby中的master进行选举,再次产生一个活着的master;这个活着的master会读取保存zk节点中的spark集群元数据信息恢复到上次master的状态。
- 对已经运行的任务没有影响,因为它已经获得了计算资源,所以此时不需要master。
- 对即将到来的任务有影响,因为这个任务需要计算资源,这个时候会找到活着的master申请计算资源,因为没有人活着master,该任务不能获得计算资源,即任务不能运行。
- http://master主机名:8080
- 集群细节、总资源信息、已用资源信息、剩余资源信息 正在运行的任务信息、已经完成的任务信息
bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master spark://node01:7077,node02:7077,node03:7077 \ --executor-memory 1G \ --total-executor-cores 2 \ examples/jars/spark-examples_2.11-2.3.3.jar \ 10
spark有许多集群master,不知道哪一个master是活着的master,即使你知道哪一个master是活着的master,也有可能下一秒就挂掉,这里可以把一切都挂掉master都罗列出来
–master spark://node01:7077,node02:7077,node03:7077
整个程序将在后期轮训master列表,最后找到活着的master,然后申请计算资源,最终操作程序。
4、spark-shell使用
spark-shell --master local[2]
默认会产生一个SparkSubmit进程
,sc –master local[N] :表示本地采用N个线程
计算任务
sc.textFile("file:///home/words.txt") .flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_).collect
读取HDFS上文件: vim spark-env.sh export HADOOP_CONF_DIR=hdoop安装位置 //实现读取hdfs上文件之后,需要把计算的结果保存到hdfs上
sc.textFile("/words.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_).saveAsTextFile("/out")
import org.apache.spark.rdd.RDD
import org.apache.spark.{
SparkConf, SparkContext}
/todo: 利用scala语言开发spark程序实现单词统计
object WordCount {
def main(args: Array[String]): Unit = {
//1、构建sparkConf对象 设置application名称和master地址
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
//2、构建sparkContext对象,该对象非常重要,它是所有spark程序的执行入口,它内部会构建 DAGScheduler和 TaskScheduler 对象
val sc = new SparkContext(sparkConf)
//设置日志输出级别
sc.setLogLevel("warn")
//3、读取数据文件
val data: RDD[String] = sc.textFile("E:\\words.txt")
//4、 切分每一行,获取所有单词
val words: RDD[String] = data.flatMap(x=>x.split(" "))
//5、每个单词计为1
val wordAndOne: RDD[(String, Int)] = words.map(x => (x,1))
//6、相同单词出现的1累加
val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x,y)=>x+y)
//按照单词出现的次数降序排列 第二个参数默认是true表示升序,设置为false表示降序
val sortedRDD: RDD[(String, Int)] = result.sortBy( x=> x._2,false)
//7、收集数据打印
val finalResult: Array[(String, Int)] = sortedRDD.collect()
finalResult.foreach(println)
//8、关闭sc
sc.stop()
}
}
打成jar包提交到集群中运行
spark-submit \
--master spark://node01:7077,node02:7077 \
--class com.kaikeba.WordCountOnSpark \
--executor-memory 1g \
--total-executor-cores 4 \
original-spark_class01-1.0-SNAPSHOT.jar /words.txt /out jar包与输入输出
spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
/kkb/install/spark/examples/jars/spark-examples_2.11-2.3.3.jar 10 10是main方法里面的
参数
- executor-memory 小了,会把rdd一部分数据保存在内存中,一部分数据保存在磁盘;用该rdd时从内存和磁盘中获取,一定的磁盘io。需要设置的大一点,如10G/20G/30G等;
- total-executor-cores:表示任务运行需要总的cpu核数,它决定了任务并行运行的粒度,也会设置的大一点,如30个/50个/100个;
加大计算资源它是最直接、最有效果的优化手段。在计算资源有限的情况下,可以考虑其他方面,比如说代码层面,JVM层面等
5、spark on yarn
最大的区别就是。
-
yarn-cluster: Driver端运行在yarn集群中,与ApplicationMaster进程在一起。
-
yarn-client: Driver端运行在提交任务的客户端,与ApplicationMaster进程没关系,经常 用于进行测试
二、集群架构
1、spark集群架构
:主节点,负责任务资源的分配。
:从节点,负责任务计算的节点。
- ①Executor:是一个进程,它会在worker节点启动该进程(计算资源)
- ②Task:任务是以task线程的方式运行在worker节点对应的executor进程中;
:给程序提供计算资源的外部服务,standAlone模式整个任务的资源分配由spark集群的老大Master负责;把spark程序提交到yarn中运行,整个任务的资源分配由yarn中的老大ResourceManager负责
:是所有spark程序的执行入口,会执行客户端写好的main方法,它会构建一个名叫SparkContext对象 ,生成DAG,再根据算子的划分,把任务分到其他节点。
:是一个spark的应用程序,它是包含了客户端的代码和任务运行的资源信息
- 一个应用程序application就是提交到spark集群的一个job,会被分成很多个子任务job;
- 其中,一个子任务job中又划分成了许多stage阶段(根据算子间产生shuffe的宽依赖划分);
- 一 个stage中有存在很多分区 ,一个分区就是一个task,即一个stage中有很多个task;
- 一个action操作对应一个DAG有向无环图,即一个action操作就是一个job;
2、调度分配
:FIFO(先进先出)、FAIR(公平调度)
:尽量打散、尽量集中 尽量打散:一个Application尽可能多的分配到不同的节点,发挥数据的本地性,提升执行效率 尽量集中:尽量分配到尽可能少的节点
3、运行模式
该模式主要用作测试用,一般编写的 spark 程序,将 master 设置为 local 或者local[n],以本地模式运行,所有的代码都在一个 Jvm 里面。
该模式由 Spark 自带的集群管理模式,不依赖外部的资源管理器,由 Master 负责资源的分配管理,Worker 负责运行 Executor ,具体的运行过程可参考之前介绍 Spark 运行模式的篇章。
该模式,根据 yarn-client 和 yarn-cluster 的不同。
yarn-client 中 driver运行在本地客户端,负责调度application,会与yarn集群产生大量的网络通信,但本地可以看见日志。
yarn-cluster 中 driver运行在yarn集群中,看不见日志。
和 yarn 一样,Mesos 中,Spark 的资源管理从 Standalone 的 Master 转移到 Mesos Manager 中。
4、运行流程
spark集群模式,
-
先开始资源准备:
- 1)diver 拿到代码包,先到master那去,注册和申请计算资源;
- 2)master知道后,就通知woker启动好executor进程开始准备一下;
- 3)woker准备好进程后,就通知Driver自己好了,发送注册并且申请task请求;
-
接下来,diver就运行main方法,分任务,运行程序;
- 1)diver 先对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler;
- 2)diver再根据代码rdd算子的操作顺序,有向无环图;
- 3)DAGScheduler拿到DAG后,按照宽依赖进行,每一个stage内部有很多可以并行运行的task,最后封装在一个一个的中,并把taskSet发送给TaskScheduler;
- 4)TaskScheduler拿到taskSet集合后,依次遍历取出每一个task,节点上的executor进程中;
-
最后,程序运行完,diver通知Master注销,Master通知Worker关闭executor进程 ;
三、计算资源
1、RDD概念
RDD(Resilient Distributed Dataset)叫做弹性 分布式 数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.
- Resilient: 表示弹性,rdd的数据是可以保存在内存或者是磁盘中.
- Distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算.
- Dataset: 就是一个集合,存储很多数据.
- 1)A list of partitions:一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据
- 2)A function for computing each split:每个分区都会实现 计算函数
- 3)A list of dependencies on other RDDs:一个rdd会依赖于其他多个rdd
- 4)Optionally, a Partitioner for key-value RDDs :kv数据的分区函数基于哈希,非kv是None
- 5)Optionally, a list of preferred locations to compute each split on:有分区数据的节点会优先开启计算任务,数据的本地性。
自定义分区 RDD数据进行分区时,:对key进行哈希,然后对分区总数取模,
实现自定义partitioner大致分为3个步骤
- 继承org.apache.spark.Partitioner
- 重写numPartitions方法
- 重写getPartition方法
//5、对应上面的rdd数据进行自定义分区
val result: RDD[(String, Int)] = wordLengthRDD.partitionBy(new MyPartitioner(3))
2、RDD的创建
val rdd1=sc.parallelize(List(1,2,3,4,5))
val rdd2=sc.parallelize(Array("hadoop","hive","spark"))
val rdd3=sc.makeRDD(List(1,2,3,4))
val rdd1=sc.textFile("/words.txt")
val rdd2=rdd1.flatMap(_.split(" "))
val rdd3=rdd2.map((_,1))
3、RDD算子分类
- map 、mapPartitions、flatMap、
- filter、Union(求并)、intersection(求交)、distinct(去重)、
- join、 、、sortByKey、sortBy ;宽依赖
- repartition有shuffle、coalesce不shuffle
- reduce、
- collect :把RDD的数据进行收集之后,以数组的形式返回给Driver端
- count、first、take(n)
- foreach、foreachPartition
- saveAsTextFile、saveAsSequenceFile
collect操作注意:
- 默认Driver端的内存大小为1G,由参数 spark.driver.memory设置,某个rdd的数据量超过了Driver端默认的1G内存,
- 对rdd调用collect操作,这里会出现Driver端的,所有这个collect操作存在一定的风险,实际开发代码一般不会使用。
- new SparkConf().set(“spark.driver.memory”,“5G”)
4、RDD宽窄依赖
RDD和它依赖的父RDD的关系有两种不同的类型
:独生子女,每一个父RDD的Partition最多被子RDD的一个Partition使用,不会产生shuffle;等等,
:超生子女,多个子RDD的Partition会依赖同一个父RDD的Partition,会产生 shuffle;等等
join分为宽依赖和窄依赖,如果RDD有相同的partitioner,那么将不会引起shuffle,这种join是窄依赖,反之就是宽依赖
RDD是个数据集,会分到各个节点,分区的依据(分区器),默认KV数据的是hash分区,可以自定义
算子是操作,没有分区,(刚刚理解成分区是算子的,父RDD和子RDD对应两个算子,理解错误)
窄依赖类算子, 操作前数据源父RDD的每个分区的数据直接到操作后子RDD的对应一个分区(一分区对一分区),父RDD只有一个独生子女,在一个节点内捷就可以完成转换;Map、flatMap、filter、union等等;
宽依赖类算子,操作前数据源同一个父RDD的分区传入到不同的子RDD分区中,父RDD有多个子女,中间可能涉及,产生shuffle,reduceByKey、sortByKey、groupBy、groupByKey、join等等;
lineage保存了RDD的依赖关系,会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区
RDD只支持粗粒度转换,即只记录单个块上执行的单个操作
5、RDD缓存机制
把一个rdd的数据缓存起来,后续有其他的job需要用到该rdd的结果数据,可以直接从缓存中获取得到,。缓存是加快后续对该数据的访问操作。
RDD通过persist方法或cache方法,可以将前面的计算结果缓存,,才会被缓存在计算节点的内存中。
;缓存在内存一份,最终也是调用了persist方法 :可以把数据缓存在内存或者是磁盘,有丰富的缓存级别,这些缓存级别都被定义在StorageLevel这个object中。
checkpoint可以把多次使用到的rdd,是公共rdd进行持久化磁盘; (1)为了获取得到一个rdd的结果数据,经过了大量的算子操作或者是
(2)一个application应用,对应的缓存数据也就;调用rdd的unpersist方法
(3)把数据保存在,服务器挂掉或进程终止,会导致数据的丢失;存在本地磁盘中,操作删除了,或者是磁盘损坏,也有可能导致数据的丢失;
checkpoint把数据保存在分布式文件系统HDFS上。高可用性,高容错性(多副本)来最大程度保证数据的安全性。
//1、在hdfs上设置一个checkpoint目录
sc.setCheckpointDir("hdfs://node01:8020/checkpoint")
///2、对需要做checkpoint操作的rdd调用checkpoint方法
val rdd1=sc.textFile("/words.txt")
rdd1.checkpoint
val rdd2=rdd1.flatMap(_.split(" "))
//3、最后需要有一个action操作去触发任务的运行
rdd2.collect
6、广播变量
spark中分布式执行的代码需要传递到各个Executor的Task上运行。对于一些只读、固定的数据(比如从DB中读出的数据),每次都需要Driver广播到各个Task上,这样效率低下。
广播变量允许将变量广播给各个Executor。该Executor上的各个Task从所在节点的BlockManager获取变量,而不是从Driver获取变量,以减少通信的成本,减少内存的占用,从而提升了效率。
(1) 通过对一个类型T的对象调用 SparkContext.broadcast创建出一个Broadcast[T]对象。 任何可序列化的类型都可以这么实现 (2) 通过 value 属性访问该对象的值 (3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)
val conf = new SparkConf().setMaster("local[2]").setAppName("brocast")
val sc=new SparkContext(conf)
val rdd1=sc.textFile("/words.txt")
val word="spark"
//通过调用sparkContext对象的broadcast方法把数据广播出去
val broadCast = sc.broadcast(word)
//在executor中通过调用广播变量的value属性获取广播变量的值
val rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(broadCast.value))
rdd2.foreach(x=>println(x))
注意:
1、不能将一个RDD使用广播变量广播出去 2、广播变量只能在Driver端定义,不能在Executor端定义 3、在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值 4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本 5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。
累加器的一个常见用途是,在调试时对作业执行过程中的事件进行计数。可以使用累加器来进行全局的计数
7、DAG划分stage
原始的RDD通过一系列的转换就形成了DAG(Directed Acyclic Graph,有向无环图)
根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段);对于窄依赖,转换处理在一个Stage中完成计算;对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,
划分完stage之后,,可以实现流水线计算;,在同一个stage中就有很多可以并行运行的task。
一个Job会被拆分为多组Task,每组任务被称为一个stage。stage表示不同的调度阶段,一个spark job会对应产生很多个stage
- (1) 首先根据rdd的算子操作顺序生成DAG有向无环图,接下里从最后一个rdd往前推,创建一个新的stage,把该rdd加入到该stage中,它是最后一个stage。
- (2) 在往前推的过程中运行遇到了窄依赖就把该rdd加入到本stage中,如果遇到了宽依赖,就从宽依赖切开,那么最后一个stage也就结束了。
- (3)重新创建一个新的stage,按照第二个步骤继续往前推,一直到最开始的rdd,整个划分stage也就结束了
划分完stage之后,每一个stage中有很多可以并行运行的task,后期把每一个stage中的task封装在一个taskSet集合
中,最后把一个一个的taskSet集合提交到worker节点上的executor进程中运行。
rdd与rdd之间存在依赖关系,stage与stage之前也存在依赖关系,前面stage中的task先运行,运行完成了再运行后面stage中的task,也就是说后面stage中的task输入数据是前面stage中task的输出结果数据。
8、序列化
spark是分布式执行引擎,其核心抽象是弹性分布式数据集RDD,其代表了分布在不同节点的数据。
Spark的,故用户开发的关于RDD的map,flatMap,reduceByKey等transformation 操作(闭包)有如下执行过程: 1)代码中对象在driver本地序列化,对象序列化后传输到远程executor节点; 2)远程executor节点反序列化对象,最终远程节点执行。
对象在执行中,需要序列化通过网络传输,则必须经过序列化过程。
- 1)如果函数中使用了该类对象,该类要实现序列化,类 extends Serializable
- 2)如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化
- 3)对于不能序列化的成员变量使用@transient标注,告诉编译器不需要序列化
- 4)也可将依赖的变量,独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。
- 5)可以把对象的创建直接在该函数中构建这样避免需要序列化
9、spark的shuffle。。
Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂。
Stage阶段的划分:是,job会,每一个stage内部有很多task。stage与stage之间的过程就是shuffle阶段。
在Spark的中,负责shuffle过程的执行、计算和处理的组件,主要就是ShuffleManager。ShuffleManager分为HashShuffleManager
和SortShuffleManager
,因此spark的Shuffle有Hash Shuffle
和Sort Shuffle
两种。
- 在Spark 1.2以前,默认的shuffle是
HashShuffleManager
。该Shuffle 弊端,就是会产生大量的中间磁盘文件, 磁盘IO操作影响了性能。- HashShuffleManager 分两种:、;
- :每个task处理的数据按key进行“hash分区”,先缓存分区个32K的buffer,再创建分区个份磁盘文件,每个Executor上总共创建
task*分区数
个磁盘文件 。 - :主通过复用buffer来优化Shuffle过程中产生的小文件的数量,每个Executor只有一种类型的Key的数据,每个Executor上总共创建
分区数
个磁盘文件;
- 在Spark 1.2以后的版本中,默认的ShuffleManager改成了
SortShuffleManager
。- SortShuffleManager 主要就在于 shuffle时 ,先生成临时文件,最后将临时文件合并(merge)成一个磁盘文件,
每个Task就只有一个磁盘文件
。下一个stage的shuffle read task拉取自己的数据时,只要根据索引
读取每个磁盘文件中的部分数据即可。 - SortShuffleManager分两种,
- : 先根据key进行排序,会分批写入到磁盘临时文件中,最后merge合并所有临时文件,一次写入到最终文件,每个Executor上总共创建
1
个磁盘文件 - : 普通机制不会进行排序
- 在shuffleMapTask数量小于默认值200时,启用bypass模式的sortShuffle(原因是数据量本身比较少,没必要进行sort全排序,因为数据量少本身查询速度就快,正好省了sort的那部分性能开销。)
- 该机制与普通SortShuffleManager运行机制的不同在于: 第一: 磁盘写机制不同;第二: 不会进行sort排序;
- SortShuffleManager 主要就在于 shuffle时 ,先生成临时文件,最后将临时文件合并(merge)成一个磁盘文件,
四、SparkSQL
是apache Spark用来处理结构化数据的一个模块
- 1)易整合,将SQL查询与Spark程序无缝混合,可以使用不同的语言进行代码开发;
- 2)统一的数据源访问,可以采用一种统一的方式去对接任意的外部数据源;
val dataFrame = sparkSession.read.文件格式的方法名("该文件格式的路径");
- 3)兼容hive,sparksql可以支持hivesql语法 sparksql兼容hivesql;
- 4)支持标准的数据库连接,支持标准的数据库连接JDBC或者ODBC;
1、DataFrame
DataFrame,是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格; 带有,即所表示的二维表数据集的每一列都带有名称和类型。
RDD | DataFrame | DataSet |
---|---|---|
RDD数据量比较大时,由于需要存储在堆内存中,堆内存有限,容易出现频繁的垃圾回收(GC) | 引入了schema元信息和off-heap(堆外内存),大量的对象构建直接使用操作系统层面上的内存,堆内存就比较充足,不容易GC | 是在Spark1.6中添加的新的接口提供了强类型支持,在RDD的每行数据加了类型约束,可以用强大lambda函数,使用了Spark SQL优化的执行引擎。 |
RDD需要发送到其他服务器,序列化和反序列性能开销很大 | schema元信息代表数据结构的描述信息,可以省略掉对schema的序列化网络传输 ,只需对数据内容本身进行序列化,减小序列化和反序列性能开销 | 是在Spark1.6中添加的新的接口,DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集 |
开发时会进行类型的检查,保证编译时类型安全;具有面向对象编程的风格;而且 | 不会进行类型的检查,编译时类型不安全;不具有面向对象编程的风格 | 修改了DataSet的缺陷,DataSet可以在编译时检查类型,并且是面向对象的编程接口。 |
三者之间的转换:
1)DataFrame与DataSet互转 DataFrame转换成DataSet:
val dataSet=dataFrame.as[强类型]
DataSet转换成DataFrame:val dataFrame=dataSet.toDF
2)DataFrame、DataSet 与RDD互转: 从dataFrame和dataSet获取得到rdd:
val rdd1=dataFrame.rdd ;val rdd2=dataSet.rdd
3)RDD转换为DataFrame:
- 方法一:反射机制,定义一个样例类,后期直接映射成DataFrame的schema信息;
- 方法二:通过StructType直接指定Schema
3、DataFrame常用操作
- DSL风格语法:sparksql中的DataFrame自身提供了一套自己的Api,可以去使用这套api来做相应的处理
- SQL风格语法:可以把DataFrame注册成一张表,然后通过sparkSession.sql(sql语句)操作
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Column, DataFrame, Row, SparkSession} //todo:利用反射机制实现把rdd转成dataFrame case class Person(id:String,name:String,age:Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //1、构建SparkSession对象 val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate() //2、获取sparkContext对象 val sc: SparkContext = spark.sparkContext sc.setLogLevel("warn") //3、读取文件数据 val data: RDD[Array[String]] = sc.textFile("E:\\person.txt").map(x=>x.split(" ")) 方法一:反射机制,定义一个样例类,后期直接映射成DataFrame的schema信息 //4、定义一个样例类 case class Person(id:String,name:String,age:Int) //5、将rdd与样例类进行关联 val personRDD: RDD[Person] = data.map(x=>Person(x(0),x(1),x(2).toInt)) //6、将rdd转换成dataFrame //需要手动导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF 方法二:通过StructType直接指定Schema //4、将rdd与Row对象进行关联 val rowRDD: RDD[Row] = data.map(x=>Row(x(0),x(1),x(2).toInt)) //5、指定dataFrame的schema信息,这里指定的字段个数和类型必须要跟Row对象保持一致 val schema=StructType( StructField("id",StringType):: StructField("name",StringType):: StructField("age",IntegerType)::Nil ) val dataFrame: DataFrame = spark.createDataFrame(rowRDD,schema) //7、对dataFrame进行相应的语法操作 //todo:----------------- DSL风格语法-----------------start //打印schema personDF.printSchema() //展示数据 personDF.show() //获取第一行数据 val first: Row = personDF.first() println("first:"+first) //取出前3位数据 val top3: Array[Row] = personDF.head(3) top3.foreach(println) //获取name字段 personDF.select("name").show() personDF.select($"name").show() personDF.select(new Column("name")).show() personDF.select("name","age").show() //实现age +1 personDF.select($"name",$"age",$"age"+1).show() //按照age过滤 personDF.filter($"age" >