Spark Streaming技术内幕及源码剖析
上QQ阅读APP看书,第一时间看更新

1.1 Spark Streaming应用案例

Spark Streaming应用程序运行的时候,往往在短时间内会产生大量日志信息,不利于研究分析。可以通过加大批处理时间间隔(batch interval)来降低批处理频率,减少日志信息量,以便看清楚各个环节。

下面从一个Spark Streaming应用程序的开发入手,观察运行过程,以增强感性认识。以下是一个广告点击的在线黑名单过滤的Spark Streaming应用程序,程序中有详细注释,以方便初次接触Spark Streaming的读者理解。

源码1-1 OnlineBlackListFilter

    package com.dt.spark.streaming


    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds


    object  OnlineBlackListFilter {
        def main(args: Array[String]){
          /**
            * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
            * 例如,通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
            * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
            * 只有1GB内存)的初学者
            */
          // 创建SparkConf对象
          val conf = new SparkConf()
          // 设置应用程序的名称,在程序运行的监控界面可以看到名称
          conf.setAppName("OnlineBlackListFilter")
          // 此时,程序在Spark集群
          conf.setMaster("spark://Master:7077")
    val ssc = new StreamingContext(conf, Seconds(30))


    /**
      * 黑名单数据准备。实际上黑名单一般都是动态的,例如在Redis或者数据库中
      * 黑名单的生成往往有复杂的业务逻辑,具体情况算法不同
      * 但是在Spark Streaming进行处理的时候每次都能够访问完整的信息
      */
    val blackList = Array(("Spy", true), ("Cheater", true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)


    val adsClickStream = ssc.socketTextStream("Master", 9999)


    /**
      * 此处模拟的广告点击的每条数据的格式为:time、name
      * 此处map操作的结果是name、(time, name)的格式
      */
    val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split("")(1), ads) }
    adsClickStreamFormatted.transform(userClickRDD => {
      // 通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,
      // 又获得了相应点击内容是否在黑名单中
    val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)


      /**
        * 进行filter过滤的时候,其输入元素是一个元组:(name, ((time, name), boolean))
        * 其中,第一个元素是黑名单的名称
    * 第二元素的第二个元素boolean是进行leftOuterJoin的时候是否存在的值。
    * 如果存在,表明当前广告点击是黑名单,需要过滤掉,否则是有效点击内容
    */
  val validClicked = joinedBlackListRDD.filter(joinedItem => {
      if(joinedItem._2._2.getOrElse(false))
      {
        false
      } else {
        true
      }


  })


        validClicked.map(validClick => {validClick._2._1})
}).print


        ssc.start()
        ssc.awaitTermination()


        }
}

此程序接收Socket信息,过滤掉其中名称为Spy、Cheater的信息,并打印输出。

把程序的批处理时间间隔设置从30s改成300s:

val ssc = new StreamingContext(conf, Seconds(300))

然后重新生成一下jar包。

Spark集群有5台机器:Master、Worker1、Worker2、Worker3、Worker4。

启动Spark的History Server。

打开数据发送的端口:

nc -lk 9999

用spark-submit运行前面生成的jar包。

在数据发送端口输入若干数据,例如:

    1375864674543 Tom
    1375864674553 Spy
    1375864674571 Andy
    1375864688436 Cheater
    1375864784240 Kelvin
    1375864853892 Steven
    1375864979347 John

每行第一项为时刻的毫秒数,第二项是程序中要过滤的名称。

打开浏览器,看History Server的日志信息,如图1-2所示。

图1-2 History Server日志信息示例

图中按时间顺序显示了曾经运行过的应用程序,第一列是App ID,有各应用程序执行信息的链接。

单击最新的应用,看目前运行的应用程序中有些什么Job,如图1-3所示。

图1-3 Spark Jobs页面示例

这样一个Spark Streaming应用程序运行时总共有5个Job。

观察这些Job的内容,可以揭示一些现象。

Job 0不体现应用程序的业务逻辑代码,如图1-4所示。其实此Job是Spark Streaming出于对后面计算的负载均衡的考虑而产生的。

图1-4 Details for Job 0示例

Job 0包含Stage 0、Stage 1。随便看一个Stage,比如Stage 0,看看其中的Aggregated Metrics by Executor部分,如图1-5所示。

图1-5 Aggregated Metrics by Executor页面示例

因为是分布式环境做负载均衡,所以Job 0的Stage 1是在4个Worker的Executor上运行。

Job 1的运行时间比较长,耗时1.5min,如图1-6所示。

图1-6 页面示例:Details for Job 1

单击Stage 2的链接,看看Aggregated Metrics By Executor部分,如图1-7所示。

图1-7 页面示例:Aggregated Metrics by Executor

可以知道,Stage 2只在Worker4上的一个Executor执行,而且执行了1.5min。

从业务处理的角度看,此前发送了很少的数据,这里却显示有一个运行1.5min的任务。这个任务是做什么呢?

从DAG Visualization部分可以知道此Job实际就是启动了一个接收数据的接收器(Receiver),如图1-8所示。

图1-8 页面示例:Details for Stage 2

原来Receiver是通过一个Job来启动的。

Tasks部分如图1-9所示。

图1-9 页面示例:Tasks

只有一个Worker运行此Job,用于接收数据。

Spark Streaming应用程序启动后,自己会启动一些Job。默认情况是启动一个Job接收数据,为后续处理做准备。

Locality Level是PROCESS_LOCAL。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。

从Job 2的Details可以发现程序的主要业务逻辑,体现在Stage 3、Stage 4、Stage 5中,如图1-10所示。

图1-10 页面示例:Details for Job 2

仔细观察Stage 3、Stage 4,可以知道这两个Stage都是用4个Executor执行的,所有数据处理是在4台机器上进行的,如图1-11所示。

图1-11 页面示例:Aggregated Metrics by Executor

Stage 5只在Worker4上,这是因为这个Stage有Shuffle操作。

Job 3有Stage 6、Stage 7、Stage 8,其中Stage 6、Stage 7显示灰色,说明被跳过,如图1-12所示。

图1-12 页面示例:Details for Job 3

由Stage 8的Aggregated Metrics by Executor部分可以看出,数据处理是在4台机器上进行的,如图1-13所示。

图1-13 页面示例:Aggregated Metrics by Executor

Job 4也体现了应用程序中的业务逻辑。Job 4有Stage 9、Stage 10、Stage 11,其中Stage 9、Stage 10被跳过,如图1-14所示。

图1-14 页面示例:Details for Job 4

细察Stage 11,可以看出,数据处理是在Worker2之外的其他3台机器上进行的,如图1-15所示。

图1-15 页面示例:Aggregated Metrics by Executor

从而可得出结论:一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。

让程序运行若干小时,观察没有停下来的Spark Streaming程序运行留下的信息,如图1-16所示。

图1-16 Spark Streaming应用程序运行界面示例

这个程序仍然在不断地循环运行。即使没有接收到新数据,日志中也不断循环显示着JobScheduler、BlockManager、MapPartitionsRDD、ShuffledRDD等的信息,其中有一部分是Spark Core相关的信息。