1.2 Spark初体验
本节通过Spark的基本使用,让读者对Spark能有初步的认识,以引导读者逐步深入学习。
1.2.1 运行spark-shell
图1-3中显示了很多信息,这里进行一些说明。
安装完Spark 2.1.0后,如果没有明确指定log4j的配置,那么Spark会使用core模块的org/apache/spark/目录下的log4j-defaults.properties作为log4j的默认配置。log4j-defaults.properties指定的Spark日志级别为WARN。用户可以到Spark安装目录的conf文件夹下,从log4j.properties.template复制一份log4j.properties文件,并在其中增加自己想要的配置。
除了指定log4j.properties文件外,还可以在spark-shell命令行中通过sc.setLog-Level(newLevel)语句指定日志级别。
SparkContext的Web UI的地址是:http://192.168.0.106:4040。192.168.0.106是作者本地安装Spark的机器的IP地址,4040是SparkContext的Web UI的默认监听端口。
指定的部署模式(即master)为local[*]。当前应用(Application)的ID为local-1497084620457。
可以在spark-shell命令行通过sc使用SparkContext,通过spark使用SparkSession。sc和spark实际分别是SparkContext和SparkSession在Spark REPL中的变量名,具体细节将在1.2.3节分析。
由于Spark Core的默认日志级别是WARN,所以看到的信息不是很多。现在我们将Spark安装目录的conf文件夹下的log4j.properties.template通过如下命令复制出一份:
cp log4j.properties.template log4j.properties
并将log4j.properties中的log4j.logger.org.apache.spark.repl.Main=WARN修改为log4j. logger.org.apache.spark.repl.Main=INFO,然后我们再次运行spark-shell,将打印出更丰富的信息,如图1-4所示。
图1-4 Spark启动过程打印的部分信息
从图1-4展示的启动日志中我们可以看到SecurityManager、SparkEnv、BlockManager-MasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlock-TransferService、BlockManager、BlockManagerMaster等信息。它们是做什么的?刚刚接触Spark的读者只需要知道这些信息即可,具体内容将在后边进行详细的介绍。
1.2.2 执行word count
这一节,我们通过word count这个耳熟能详的例子来感受下Spark任务的执行过程。启动spark-shell后,会打开Scala命令行,然后按照以下步骤输入脚本。
1)输入val lines = sc.textFile("../README.md", 2),以Spark安装目录下的README. md文件内容作为word count例子的数据源,执行结果如图1-5所示。
图1-5 步骤1执行结果
图1-5告诉我们,lines的实际类型是MapPartitionsRDD。
2)textFile方法对文本文件是逐行读取的,我们需要输入val words = lines.flatMap(line=> line.split(" ")),将每行文本按照空格分隔以得到每个单词,执行结果如图1-6所示。
图1-6 步骤2执行结果
图1-6告诉我们,lines在经过flatMap方法的转换后,得到的words的实际类型也是MapPartitionsRDD。
3)对于得到的每个单词,通过输入val ones = words.map(w => (w,1)),将每个单词的计数初始化为1,执行结果如图1-7所示。
图1-7 步骤3执行结果
图1-7告诉我们,words在经过map方法的转换后,得到的ones的实际类型也是MapPartitionsRDD。
4)输入val counts = ones.reduceByKey(_ + _),对单词进行计数值的聚合,执行结果如图1-8所示。
图1-8 步骤4执行结果
图1-8告诉我们,ones在经过reduceByKey方法的转换后,得到的counts的实际类型是ShuffledRDD。
5)输入counts.foreach(println),将每个单词的计数值打印出来,作业的执行过程如图1-9和图1-10所示。作业的输出结果如图1-11所示。
图1-9 步骤5执行过程第一部分
图1-10 步骤5执行过程第二部分
图1-11 步骤5输出结果
图1-9和图1-10展示了很多作业提交、执行的信息,这里挑选关键的内容进行介绍。
SparkContext为提交的Job生成的ID是0。
word count例子一共产生了4个RDD,被划分为ResultStage和ShuffleMapStage。ShuffleMapStage的ID为0,尝试号为0。ResultStage的ID为1,尝试号也为0。在Spark中,如果Stage没有执行完成,就会进行多次重试。Stage无论是首次执行还是重试,都被视为是一次Stage尝试(stage attempt),每次尝试都有一个唯一的尝试号(attempt number)。
由于Job有两个分区,所以ShuffleMapStage和ResultStage都有两个Task被提交。每个Task也会有多次尝试,因而也有属于Task的尝试号。从图中看出,Shuffle-MapStage中的两个Task和ResultStage中的两个Task的尝试号也都是0。
HadoopRDD则用于读取文件内容。
图1-11展示了单词计数的输出结果和最后打印的任务结束的日志信息。
笔者在本节介绍的word count例子是以SparkContext的API来实现的,读者也可以选择在spark-shell中通过运用SparkSession的API来实现。本书在第10章将介绍Spark源码自带的用SparkSession的API来实现的word count的Java应用程序。
1.2.3 剖析spark-shell
通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢?
1. 脚本分析
在Spark安装目录的bin文件夹下可以找到spark-shell,其中有代码清单1-1所示的一段脚本。
代码清单1-1 spark-shell脚本
function main() { if $cygwin; then stty -icanon min 1-echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" fi }
我们看到脚本spark-shell里执行了spark-submit脚本,那么打开spark-submit脚本,发现代码清单1-2中所示的脚本。
代码清单1-2 spark-submit脚本
if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
可以看到spark-submit中又执行了脚本spark-class。打开脚本spark-class,首先发现以下一段脚本:
# Find the java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi
上面的脚本是为了找到Java命令。在spark-class脚本中还会找到以下内容:
build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <(build_command "$@")
根据代码清单1-2,脚本spark-submit在执行spark-class脚本时,给它增加了参数SparkSubmit。所以读到这里,应该知道Spark启动了以SparkSubmit为主类的JVM进程。
2. 远程监控
为便于在本地对Spark进程进行远程监控,在spark-shell脚本中找到以下配置:
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"
并追加以下jmx配置:
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10207 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote. ssl=false
如果Spark安装在其他机器,那么在本地打开Java VisualVM后需要添加远程主机,如图1-12所示。
图1-12 添加远程主机
右击已添加的远程主机,添加JMX连接,如图1-13所示。
图1-13 添加JMX连接
如果Spark安装在本地,那么打开Java VisualVM后就会在应用程序窗口中看到org. apache.spark.deploy.SparkSubmit进程,只需双击即可。
选择右侧的“线程”选项卡,选择main线程,然后点击“线程Dump”按钮,如图1-14所示。
图1-14 查看Spark线程
从线程Dump的内容中找到线程main的信息,如代码清单1-3所示。
代码清单1-3 main线程的Dump信息
"main" #1 prio=5 os_prio=31 tid=0x00007fa012802000 nid=0x1303 runnable [0x0000000- 10d11c000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.read0(Native Method) at java.io.FileInputStream.read(FileInputStream.java:207) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246) at jline.internal.InputStreamReader.read(InputStreamReader.java:261) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.internal.InputStreamReader.read(InputStreamReader.java:198) - locked <0x00000007837a8ab8> (a jline.internal.NonBlockingInputStream) at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269) at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2. apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2. apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveR eader.scala:44) at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(Interactive- Reader.scala:37) at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28) at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop. scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClass Loader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:68) at org.apache.spark.repl.Main$.main(Main.scala:51) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl. java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$r unMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
从main线程的栈信息中可以看出程序的调用顺序:SparkSubmit.main→repl.Main→Iloop.process。
3. 源码分析
我们根据上面的线索,直接阅读Iloop的process方法的源码,如代码清单1-4所示。
代码清单1-4 process的实现
def process(settings: Settings): Boolean = savingContextLoader { this.settings = settings createInterpreter() // sets in to some kind of reader depending on environmental cues in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) globalFuture = future { intp.initializeSynchronous() loopPostInit() !intp.reporter.hasErrors } loadFiles(settings) printWelcome() try loop() match { case LineResults.EOF => out print Properties.shellInterruptedString case _ => } catch AbstractOrMissingHandler() finally closeInterpreter() true }
根据代码清单1-4, Iloop的process方法调用了loadFiles方法。Spark中的SparkILoop继承了Iloop并重写了loadFiles方法,其实现如下:
override def loadFiles(settings: Settings): Unit = { initializeSpark() super.loadFiles(settings) }
根据上面展示的代码,loadFiles方法调用了SparkILoop的initializeSpark方法,initialize Spark的实现如代码清单1-5所示。
代码清单1-5 initializeSpark的实现
def initializeSpark() { intp.beQuietDuring { processLine(""" @transient val spark = if (org.apache.spark.repl.Main.sparkSession ! = null) { org.apache.spark.repl.Main.sparkSession } else { org.apache.spark.repl.Main.createSparkSession() } @transient val sc = { val _sc = spark.sparkContext if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) if (proxyUrl ! = null) { println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_ sc.applicationId}") } else { println(s"Spark Context Web UI is available at Spark Master Public URL") } } else { _sc.uiWebUrl.foreach { webUrl => println(s"Spark context Web UI available at ${webUrl}") } } println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") _sc } """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } }
我们看到,initializeSpark向交互式shell发送了一大串代码,Scala的交互式shell将调用org.apache.spark.repl.Main的createSparkSession方法(见代码清单1-6),创建Spark-Session。我们看到常量spark将持有SparkSession的引用,并且sc持有SparkSession内部初始化好的SparkContext。所以我们才能够在spark-shell的交互式shell中使用sc和spark。
代码清单1-6 createSparkSession的实现
def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") conf.setIfMissing("spark.app.name", "Spark shell") conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) if (execUri ! = null) { conf.set("spark.executor.uri", execUri) } if (System.getenv("SPARK_HOME") ! = null) { conf.setSparkHome(System.getenv("SPARK_HOME")) } val builder = SparkSession.builder.config(conf) if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { if (SparkSession.hiveClassesArePresent) { sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") sparkSession = builder.getOrCreate() logInfo("Created Spark session") } } else { sparkSession = builder.getOrCreate() logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext sparkSession }
根据代码清单1-6所示,createSparkSession方法通过SparkSession的API创建Spark-Session实例。本书将有关SparkSession等API的内容放在第10章进行讲解,初次接触Spark的读者现在只需要了解即可。