资讯详情

spark源码分析:(一)spark-shell启动脚本时候过程

当我们在命令行中输入时spark-shell会自动转化spark shell界面。我们可以在这个界面中完成spark操作。那么,这个过程是怎么进行的呢?

当我们在命令行中输入时spark-shell调用时,调用是spark/bin/spark-shell脚本。以下是spark-shell脚本中的一些代码:

function main() {   if $cygwin; then     stty -icanonmin 1 -echo > /dev/null 2>&1     export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"     "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}" sttyicanon echo > /dev/null 2>&1   else     export SPARK_SUBMIT_OPTS     "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}" fi }

我们可以清楚地看到上面的代码,main()调用方法spark-submit脚本。我们前往spark-submit查看脚本,发现spark-submit又调用了spark-class脚本。

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" 

调用了spark-class脚本,并输入多个参数,其中一个是org.apache.spark.deploy.SparkSubmit。spark-class脚本接收输入参数并执行以下句子。

if [ -n "${JAVA_HOME}" ]; then   RUNNER="${JAVA_HOME}/bin/java" else   if [ `command -v java` ]; then     RUNNER="java"   else     echo "JAVA_HOME is not set" >&2     exit 1   fi fi   exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"

上述句子的最后一条表示启动java程序(org.apache.spark.deploy.SparkSubmit)。在这个时候,我们应该知道Spark启动了以SparkSubmit为主类的jvm进程。

以上是脚本之间的调用,然后进入spark shell在环境中,系统自动生成conf,sc是怎么得到的?

通过使用jvisualvm工具来dump main信息显示如下:

"main" - Thread t@1    java.lang.Thread.State: RUNNABLE   ...         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)         at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)   ...         at org.apache.spark.repl.Main.main(Main.scala)   ...         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)   ...  

从main程序的调用顺序可以在线程的栈信息中看到:SparkSubmit.main→repl.Main→SparkILoop.process。SparkILoop.process该方法将被调用initializeSpark方法,initializeSpark实现如下:

def initializeSpark() { intp.beQuietDuring {       command("""          @transient val sc = {            val _sc = org.apache.spark.repl.Main.interp.createSparkContext()            println("Spark context available as sc.")            _sc          }         """)       command("import org.apache.spark.SparkContext._")     }   }

我们可以从上述代码中找到它sc创建是通过的createSparkContext()完成方法。然后,输出Spark context available as sc.”,并将sc返回,同时导入SparkContext包下的所有类别或对象。我们再看下createSparkContext()方法,看看他的内部是如何实现的。

def createSparkContext(): SparkContext = {     val execUri = System.getenv("SPARK_EXECUTOR_URI")     val jars = SparkILoop.getAddedJars     val conf = new SparkConf()           .setMaster(getMaster())           .setAppName("Spark shell")           .setJars(jars)           .set("spark.repl.class.uri", intp.classServer.uri)     if (execUri != null) {         conf.set("spark.executor.uri", execUri)     }     sparkContext = new SparkContext(conf)     logInfo("Created spark context..")     sparkContext   }

我们创建了这种方法conf对象,并对conf做了一些配置,比如setMaster(),setAppName()等操作,然后调用new SparkContext(conf)来创建sc。然后打印出日志INFO SparkILoop: Created spark context.”,并将sc返回。

通过上述一系列操作,对spark shell初步完成了环境的初始化。如下图所示:

标签: 数控电阻fcua

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台