Spark内核设计的艺术:架构设计与实现
上QQ阅读APP看书,第一时间看更新

1.2 Spark初体验

本节通过Spark的基本使用,让读者对Spark能有初步的认识,以引导读者逐步深入学习。

1.2.1 运行spark-shell

图1-3中显示了很多信息,这里进行一些说明。

安装完Spark 2.1.0后,如果没有明确指定log4j的配置,那么Spark会使用core模块的org/apache/spark/目录下的log4j-defaults.properties作为log4j的默认配置。log4j-defaults.properties指定的Spark日志级别为WARN。用户可以到Spark安装目录的conf文件夹下,从log4j.properties.template复制一份log4j.properties文件,并在其中增加自己想要的配置。

除了指定log4j.properties文件外,还可以在spark-shell命令行中通过sc.setLog-Level(newLevel)语句指定日志级别。

SparkContext的Web UI的地址是:http://192.168.0.106:4040。192.168.0.106是作者本地安装Spark的机器的IP地址,4040是SparkContext的Web UI的默认监听端口。

指定的部署模式(即master)为local[*]。当前应用(Application)的ID为local-1497084620457。

可以在spark-shell命令行通过sc使用SparkContext,通过spark使用SparkSession。sc和spark实际分别是SparkContext和SparkSession在Spark REPL中的变量名,具体细节将在1.2.3节分析。

由于Spark Core的默认日志级别是WARN,所以看到的信息不是很多。现在我们将Spark安装目录的conf文件夹下的log4j.properties.template通过如下命令复制出一份:

    cp log4j.properties.template log4j.properties

并将log4j.properties中的log4j.logger.org.apache.spark.repl.Main=WARN修改为log4j. logger.org.apache.spark.repl.Main=INFO,然后我们再次运行spark-shell,将打印出更丰富的信息,如图1-4所示。

图1-4 Spark启动过程打印的部分信息

从图1-4展示的启动日志中我们可以看到SecurityManager、SparkEnv、BlockManager-MasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlock-TransferService、BlockManager、BlockManagerMaster等信息。它们是做什么的?刚刚接触Spark的读者只需要知道这些信息即可,具体内容将在后边进行详细的介绍。

1.2.2 执行word count

这一节,我们通过word count这个耳熟能详的例子来感受下Spark任务的执行过程。启动spark-shell后,会打开Scala命令行,然后按照以下步骤输入脚本。

1)输入val lines = sc.textFile("../README.md", 2),以Spark安装目录下的README. md文件内容作为word count例子的数据源,执行结果如图1-5所示。

图1-5 步骤1执行结果

图1-5告诉我们,lines的实际类型是MapPartitionsRDD这里特别说明一点,在生成MapPartitionsRDD之前还实例化了HadoopRDD,作为MapPartitionsRDD的上游RDD。

2)textFile方法对文本文件是逐行读取的,我们需要输入val words = lines.flatMap(line=> line.split(" ")),将每行文本按照空格分隔以得到每个单词,执行结果如图1-6所示。

图1-6 步骤2执行结果

图1-6告诉我们,lines在经过flatMap方法的转换后,得到的words的实际类型也是MapPartitionsRDD。

3)对于得到的每个单词,通过输入val ones = words.map(w => (w,1)),将每个单词的计数初始化为1,执行结果如图1-7所示。

图1-7 步骤3执行结果

图1-7告诉我们,words在经过map方法的转换后,得到的ones的实际类型也是MapPartitionsRDD。

4)输入val counts = ones.reduceByKey(_ + _),对单词进行计数值的聚合,执行结果如图1-8所示。

图1-8 步骤4执行结果

图1-8告诉我们,ones在经过reduceByKey方法的转换后,得到的counts的实际类型是ShuffledRDD。

5)输入counts.foreach(println),将每个单词的计数值打印出来,作业的执行过程如图1-9和图1-10所示。作业的输出结果如图1-11所示。

图1-9 步骤5执行过程第一部分

图1-10 步骤5执行过程第二部分

图1-11 步骤5输出结果

图1-9和图1-10展示了很多作业提交、执行的信息,这里挑选关键的内容进行介绍。

SparkContext为提交的Job生成的ID是0。

word count例子一共产生了4个RDD这里没有算入最上游的HadoopRDD。,被划分为ResultStage和ShuffleMapStage。ShuffleMapStage的ID为0,尝试号为0。ResultStage的ID为1,尝试号也为0。在Spark中,如果Stage没有执行完成,就会进行多次重试。Stage无论是首次执行还是重试,都被视为是一次Stage尝试(stage attempt),每次尝试都有一个唯一的尝试号(attempt number)。

由于Job有两个分区,所以ShuffleMapStage和ResultStage都有两个Task被提交。每个Task也会有多次尝试,因而也有属于Task的尝试号。从图中看出,Shuffle-MapStage中的两个Task和ResultStage中的两个Task的尝试号也都是0。

HadoopRDD则用于读取文件内容。

图1-11展示了单词计数的输出结果和最后打印的任务结束的日志信息。

笔者在本节介绍的word count例子是以SparkContext的API来实现的,读者也可以选择在spark-shell中通过运用SparkSession的API来实现。本书在第10章将介绍Spark源码自带的用SparkSession的API来实现的word count的Java应用程序。

1.2.3 剖析spark-shell

通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢?

1. 脚本分析

在Spark安装目录的bin文件夹下可以找到spark-shell,其中有代码清单1-1所示的一段脚本。

代码清单1-1 spark-shell脚本

    function main() {
      if $cygwin; then
        stty -icanon min 1-echo > /dev/null 2>&1
        export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
        "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main  --name
            "Spark shell" "$@"
        stty icanon echo > /dev/null 2>&1
      else
        export SPARK_SUBMIT_OPTS
        "${SPARK_HOME}"/bin/spark-submit  --class  org.apache.spark.repl.Main  --name
            "Spark shell" "$@"
      fi
    }

我们看到脚本spark-shell里执行了spark-submit脚本,那么打开spark-submit脚本,发现代码清单1-2中所示的脚本。

代码清单1-2 spark-submit脚本

    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

可以看到spark-submit中又执行了脚本spark-class。打开脚本spark-class,首先发现以下一段脚本:

    # Find the java binary
    if [ -n "${JAVA_HOME}" ]; then
      RUNNER="${JAVA_HOME}/bin/java"
    else
      if [ "$(command -v java)" ]; then
        RUNNER="java"
      else
        echo "JAVA_HOME is not set" >&2
        exit 1
      fi
    fi

上面的脚本是为了找到Java命令。在spark-class脚本中还会找到以下内容:

    build_command() {
      "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
      printf "%d\0" $?
    }
    CMD=()
    while IFS= read -d '' -r ARG; do
      CMD+=("$ARG")
    done < <(build_command "$@")

根据代码清单1-2,脚本spark-submit在执行spark-class脚本时,给它增加了参数SparkSubmit。所以读到这里,应该知道Spark启动了以SparkSubmit为主类的JVM进程。

2. 远程监控

为便于在本地对Spark进程进行远程监控,在spark-shell脚本中找到以下配置:

    SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

并追加以下jmx配置:

    -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207
    -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote. ssl=false

如果Spark安装在其他机器,那么在本地打开Java VisualVM后需要添加远程主机,如图1-12所示。

图1-12 添加远程主机

右击已添加的远程主机,添加JMX连接,如图1-13所示。

图1-13 添加JMX连接

如果Spark安装在本地,那么打开Java VisualVM后就会在应用程序窗口中看到org. apache.spark.deploy.SparkSubmit进程,只需双击即可。

选择右侧的“线程”选项卡,选择main线程,然后点击“线程Dump”按钮,如图1-14所示。

图1-14 查看Spark线程

从线程Dump的内容中找到线程main的信息,如代码清单1-3所示。

代码清单1-3 main线程的Dump信息

    "main" #1 prio=5 os_prio=31 tid=0x00007fa012802000 nid=0x1303 runnable [0x0000000-
        10d11c000]
      java.lang.Thread.State: RUNNABLE
      at java.io.FileInputStream.read0(Native Method)
      at java.io.FileInputStream.read(FileInputStream.java:207)
      at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169)
      - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream)
      at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137)
      at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246)
      at jline.internal.InputStreamReader.read(InputStreamReader.java:261)
      - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream)
      at jline.internal.InputStreamReader.read(InputStreamReader.java:198)
      - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream)
      at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145)
      at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349)
      at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269)
      at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57)
      at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.
          apply(InteractiveReader.scala:37)
      at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.
          apply(InteractiveReader.scala:37)
      at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveR
          eader.scala:44)
      at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive-
          Reader.scala:37)
      at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28)
      at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404)
      at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.
          scala:923)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
    at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClass
        Loader.scala:97)
    at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
    at org.apache.spark.repl.Main$.doMain(Main.scala:68)
    at org.apache.spark.repl.Main$.main(Main.scala:51)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
        java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.
        java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$r
        unMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

从main线程的栈信息中可以看出程序的调用顺序:SparkSubmit.main→repl.Main→Iloop.process。

3. 源码分析

我们根据上面的线索,直接阅读Iloop的process方法的源码Iloop是Scala语言自身的类库中的用于实现交互式shell的实现类,提供对REPL(Read-eval-print-loop)的实现。,如代码清单1-4所示。

代码清单1-4 process的实现

    def process(settings: Settings): Boolean = savingContextLoader {
      this.settings = settings
      createInterpreter()
      // sets in to some kind of reader depending on environmental cues
      in =  in0.fold(chooseReader(settings))(r  =>  SimpleReader(r,  out,  interactive  =
          true))
      globalFuture = future {
        intp.initializeSynchronous()
        loopPostInit()
        !intp.reporter.hasErrors
      }
      loadFiles(settings)
        printWelcome()
      try loop() match {
        case LineResults.EOF => out print Properties.shellInterruptedString
        case _                   =>
      }
      catch AbstractOrMissingHandler()
      finally closeInterpreter()
      true
    }

根据代码清单1-4, Iloop的process方法调用了loadFiles方法。Spark中的SparkILoop继承了Iloop并重写了loadFiles方法,其实现如下:

    override def loadFiles(settings: Settings): Unit = {
      initializeSpark()
      super.loadFiles(settings)
    }

根据上面展示的代码,loadFiles方法调用了SparkILoop的initializeSpark方法,initialize Spark的实现如代码清单1-5所示。

代码清单1-5 initializeSpark的实现

    def initializeSpark() {
      intp.beQuietDuring {
        processLine("""
          @transient val spark = if (org.apache.spark.repl.Main.sparkSession ! = null) {
              org.apache.spark.repl.Main.sparkSession
            } else {
              org.apache.spark.repl.Main.createSparkSession()
            }
          @transient val sc = {
            val _sc = spark.sparkContext
            if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
              val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
              if (proxyUrl ! = null) {
                println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_
                  sc.applicationId}")
              } else {
                println(s"Spark Context Web UI is available at Spark Master Public URL")
                }
              } else {
                _sc.uiWebUrl.foreach {
                  webUrl => println(s"Spark context Web UI available at ${webUrl}")
                }
              }
              println("Spark context available as 'sc' " +
                s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
              println("Spark session available as 'spark'.")
              _sc
            }
            """)
          processLine("import org.apache.spark.SparkContext._")
          processLine("import spark.implicits._")
          processLine("import spark.sql")
          processLine("import org.apache.spark.sql.functions._")
          replayCommandStack = Nil // remove above commands from session history.
        }
      }

我们看到,initializeSpark向交互式shell发送了一大串代码,Scala的交互式shell将调用org.apache.spark.repl.Main的createSparkSession方法(见代码清单1-6),创建Spark-Session。我们看到常量spark将持有SparkSession的引用,并且sc持有SparkSession内部初始化好的SparkContext。所以我们才能够在spark-shell的交互式shell中使用sc和spark。

代码清单1-6 createSparkSession的实现

    def createSparkSession(): SparkSession = {
      val execUri = System.getenv("SPARK_EXECUTOR_URI")
      conf.setIfMissing("spark.app.name", "Spark shell")
      conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
      if (execUri ! = null) {
        conf.set("spark.executor.uri", execUri)
      }
      if (System.getenv("SPARK_HOME") ! = null) {
        conf.setSparkHome(System.getenv("SPARK_HOME"))
      }
      val builder = SparkSession.builder.config(conf)
      if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") {
        if (SparkSession.hiveClassesArePresent) {
          sparkSession = builder.enableHiveSupport().getOrCreate()
          logInfo("Created Spark session with Hive support")
        } else {
          builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
          sparkSession = builder.getOrCreate()
          logInfo("Created Spark session")
        }
      } else {
        sparkSession = builder.getOrCreate()
        logInfo("Created Spark session")
      }
      sparkContext = sparkSession.sparkContext
      sparkSession
    }

根据代码清单1-6所示,createSparkSession方法通过SparkSession的API创建Spark-Session实例。本书将有关SparkSession等API的内容放在第10章进行讲解,初次接触Spark的读者现在只需要了解即可。