2.1 Spark Core简介
为方便后面针对Spark Streaming的剖析,这里先对Spark Core的基础概念做简要的介绍,有Spark应用开发经验的读者可以跳过这些内容。
Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与其他现有的大数据处理的开源软件相比,Spark有如下优势:
(1)轻量级快速处理。
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,而基于硬盘的运算也要快10倍以上。
Spark是用Scala语言编写的,代码量较小,这得益于Scala的简洁和丰富表达力。Scala是一门以Java虚拟机(JVM)为目标运行环境并将面向对象和函数式编程的最佳特性结合在一起的静态类型编程语言。Spark把Scala语言的优势发挥得淋漓尽致。
Spark通过External DataSource API充分利用和集成Hadoop等其他第三方组件的能力。同时Spark基于内存计算,可通过将中间结果缓存在内存来减少磁盘I/O,以达到性能的提升。
(2)易于开发应用。
Spark支持Java、Python和Scala的API,自带了80多个算子,同时允许在Shell中进行交互式计算。用户可以利用Spark像书写单机程序一样书写分布式程序,轻松利用Spark搭建大数据内存计算平台并充分利用内存计算实现海量数据的实时处理。
(3)高通用性。
Spark提供了统一的解决方案,Spark可以用于批处理、交互式查询(通过Spark SQL)、实时流处理(通过Spark Streaming)、机器学习(通过Spark MLlib)和图计算(通过Spark GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。
(4)可融合性。
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark强大的处理能力。Spark也可以不依赖于第三方的资源管理器和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
(5)支持多数据源。
Spark可以独立运行,除了可以运行在当下的YARN集群管理之外,它还可以读取已有的任何Hadoop数据。它可以运行多种数据源,比如Parquet、Hive、HBase、HDFS等。这个特性让用户可以轻易迁移已有的持久化层数据。
(6)社区活跃度高。
有大量工程师在贡献代码,已经形成一个活跃的社区,有强大的生态系统的支持。
(7)实时高效的流处理。
既然本书是剖析Spark Streaming,就应该单独说说Spark Streaming的流处理的优势。
Hadoop的MapReduce只能处理离线数据,当然在YARN之后Hadoop也可以借助其他的工具进行流式计算。但Spark Streaming对数据进行实时的处理有以下优点:
● 简单。轻量级且具备功能强大的API, Spark Streaming允许开发人员快速开发流应用程序。
● 集成。支持多种来源的流数据。为流处理和批处理重用了同样的代码,甚至可以将流数据保存到历史数据中。能调用Spark SQL、MLlib、GraphX等其他子框架来实现多种数据处理功能。
● 容错。相比其他的流解决方案,Spark Streaming无须额外的代码和配置,就可以做大量的恢复和交付工作。
Spark Core中最重要的概念是RDD(弹性分布式数据集)。
RDD是由数据组成的不可变分布式集合,有以下组成部分:
● 一组分区(partition)。
● 一个函数,用于做每个分区上的计算。
● 一个对其他RDD的依赖关系列表。
● 一个对key-value(键值对)类型的RDD的分区器(可选)。
● 一个所有分区的优选位置的列表(可选)。
RDD可以快速和便捷地转换到集群中的主机上。这也就是RDD的弹性所在。RDD虽然不可变,但Spark能对RDD进行转换操作生成新的RDD。RDD是一个可分区的元素集合,其包含的元素可以分布在集群各个节点上,并且可以执行一些分布式并行操作。
这里给出Spark架构相关的术语及其解释:
● Application(应用程序)。Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。
● Driver(驱动器)。Spark Application的main函数在运行时会创建SparkContext。通常用SparkContext代表Driver。其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。
● Executor(执行器)。Application运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。每个Application都有各自独立的一批Executor。在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于Hadoop MapReduce中的YarnChild。一个CoarseGrainedExecutorBackend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task。每个CoarseGrainedExecutorBackend能并行运行Task的数量取决于分配给它的CPU的个数。
● ClusterManager(集群管理器)。指的是在集群上获取资源的外部服务,目前有以下两种:
■ Standalone。Spark原生的资源管理,由Master负责资源的分配。
■ Hadoop Yarn。由YARN中的ResourceManager负责资源的分配。
● Worker(工作者)。集群中任何可以运行Application代码的节点,类似于YARN中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点。
● Job(作业)。包含多个Task组成的并行计算,往往由Spark action催生,一个Job包含多个RDD及作用于相应RDD上的各种操作。
● Stage(阶段)。每个Job会被拆分为很多组Task,每组Task被称为Stage,也可称TaskSet,一个Job分为多个阶段。
● Task(任务)。被送到某个Executor上的工作任务。
● DAG(Directed Acyclic Graph,有向无环图)。RDD及其依赖关系构成的图。
● DAGScheduler(DAG调度器)。根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler。
● TaskScheduler(任务调度器)。将Taskset提交给Worker节点集群运行并返回结果。
可以通过图2-1所示的Spark架构加深对上面主要术语的理解。
图2-1 Spark架构
总体来说,每个Spark应用都包含一个Driver, Driver运行用户的main函数,并在集群上执行各种并行操作。
这里给出简单代码示例。
源码2-1 Spark应用程序WordCount
package com.example import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: <file>") System.exit(1) } val conf = new SparkConf() val sc = new SparkContext(conf) val line = sc.textFile(args(0)) line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect().foreach(println) sc.stop() } }
RDD通常是通过HDFS(或者其他Hadoop支持的文件系统)上的文件或者Driver中的Scala集合对象来创建或转换(transformation)得到的。用户也可以请求Spark将RDD持久化到内存里,以便在不同的并行操作里被复用。RDD具备容错性,可以从节点失败中自动恢复数据。
源码2-1的WordCount通过sc.textFile产生了第一个RDD,然后通过其他操作转换成其他RDD,最后一定有动作(action)操作。本例中action操作是collect、foreach。
RDD作为数据结构,本质上是一个只读的分区记录集合。一个RDD可以包含多个分区(partition),每个分区就是一个dataset片段。
Spark中对RDD的操作分为两类:转换(transformation)和动作(action)。
RDD的transformation操作如表2-1所示。
表2-1 RDD的transformation操作
续表
续表
RDD的这些转换操作是从父RDD转换为子RDD。注意,这里说的RDD的父子关系不是指RDD类的继承关系,而是指RDD对象的转换关系。Spark Streaming内部为了优化最终的分布式运算,把子RDD对父RDD的这种依赖分成宽依赖和窄依赖两类。
● 窄依赖(narrow dependency):父RDD的每个分区都只被子RDD的一个分区所依赖。
● 宽依赖(wide dependency):父RDD的每个分区被多个子RDD的分区所依赖。
RDD的转换操作中的groupByKey、reduceByKey、join等会导致宽依赖。
RDD的action操作如表2-2所示。
表2-2 RDD的action操作
转换操作不会立即执行,只有在action操作时,通过SparkContext执行提交作业的runJob操作,触发RDD DAG的执行。
RDD还有persist()、cache()等操作,持久化(或缓存)数据集到内存或磁盘。被缓存的RDD在被使用的时候,存取速度会被大大加速。RDD的persist()方法可以使用多种缓存策略。RDD的cache()方法其实调用的就是persist()方法,缓存策略为MEMORY_ONLY。
图2-2展示了一个Spark应用案例的计算方式。
图2-2 Spark应用案例的计算方式
宽依赖是划分出Stage的依据。图2-2所示的案例由于有groupBy、join导致宽依赖,所以划分出了3个Stage。Stage有非最终的Stage(Shuffle Map Stage)和最终的Stage(Result Stage)两种,Stage的边界就是发生shuffle的地方。
计算方式确定下来后,会根据Stage生成TaskSet, TaskSet被提交并执行。
Spark第二个重要的抽象概念是共享变量,共享变量是一种可以在并行操作之间共享使用的变量。默认情况下,当Spark把一系列任务调度到不同节点上运行时,Spark会同时把每个变量的副本和任务代码一起发送给各个节点。但有时候需要在任务之间或者任务和Driver之间共享一些变量。Spark提供了两种类型的共享变量:
(1)广播变量。在所有节点的内存中缓存一个值。广播变量是只读变量。
代码示例:
val broadcastVar = sc.broadcast("string for test")
val v = broadcastVar.value
println(v)
(2)累加器。用来执行跨节点的“累加”操作,例如计数和求和。
代码示例:
val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(1 to 1000000).foreach(x => accum+= 1)
println(accum.name + " : " + accum.value)
最后总结一下Spark应用程序的运行流程:
(1)构建Spark Application的运行环境,启动SparkContext。
(2)SparkContext向资源管理器(可以是Standalone、Mesos、Yarn)申请运行Executor资源。
(3)资源管理器分配Executor资源并启动ExecutorBackend, Executor运行情况将随着心跳发送到资源管理器上。心跳是周期性地发送给资源管理器的信息,以表示Executor仍然活着。
(4)SparkContext通过DAGScheduler根据RDD依赖关系构建DAG图,再将DAG图分解成Stage,并把Taskset发送给TaskScheduler。Executor向SparkContext申请Task, TaskScheduler将Task发放给Executor运行。
(5)Task在Executor上运行,运行完释放所有资源。
以上对Spark Core作了概括性的介绍。没有掌握这些基本知识的读者还可以参考其他资料。