1.1 Spark Streaming应用案例
Spark Streaming应用程序运行的时候,往往在短时间内会产生大量日志信息,不利于研究分析。可以通过加大批处理时间间隔(batch interval)来降低批处理频率,减少日志信息量,以便看清楚各个环节。
下面从一个Spark Streaming应用程序的开发入手,观察运行过程,以增强感性认识。以下是一个广告点击的在线黑名单过滤的Spark Streaming应用程序,程序中有详细注释,以方便初次接触Spark Streaming的读者理解。
源码1-1 OnlineBlackListFilter
package com.dt.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
object OnlineBlackListFilter {
def main(args: Array[String]){
/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 例如,通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1GB内存)的初学者
*/
// 创建SparkConf对象
val conf = new SparkConf()
// 设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setAppName("OnlineBlackListFilter")
// 此时,程序在Spark集群
conf.setMaster("spark://Master:7077")
val ssc = new StreamingContext(conf, Seconds(30))
/**
* 黑名单数据准备。实际上黑名单一般都是动态的,例如在Redis或者数据库中
* 黑名单的生成往往有复杂的业务逻辑,具体情况算法不同
* 但是在Spark Streaming进行处理的时候每次都能够访问完整的信息
*/
val blackList = Array(("Spy", true), ("Cheater", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
val adsClickStream = ssc.socketTextStream("Master", 9999)
/**
* 此处模拟的广告点击的每条数据的格式为:time、name
* 此处map操作的结果是name、(time, name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split("")(1), ads) }
adsClickStreamFormatted.transform(userClickRDD => {
// 通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,
// 又获得了相应点击内容是否在黑名单中
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
/**
* 进行filter过滤的时候,其输入元素是一个元组:(name, ((time, name), boolean))
* 其中,第一个元素是黑名单的名称
* 第二元素的第二个元素boolean是进行leftOuterJoin的时候是否存在的值。
* 如果存在,表明当前广告点击是黑名单,需要过滤掉,否则是有效点击内容
*/
val validClicked = joinedBlackListRDD.filter(joinedItem => {
if(joinedItem._2._2.getOrElse(false))
{
false
} else {
true
}
})
validClicked.map(validClick => {validClick._2._1})
}).print
ssc.start()
ssc.awaitTermination()
}
}
此程序接收Socket信息,过滤掉其中名称为Spy、Cheater的信息,并打印输出。
把程序的批处理时间间隔设置从30s改成300s:
val ssc = new StreamingContext(conf, Seconds(300))
然后重新生成一下jar包。
Spark集群有5台机器:Master、Worker1、Worker2、Worker3、Worker4。
启动Spark的History Server。
打开数据发送的端口:
nc -lk 9999
用spark-submit运行前面生成的jar包。
在数据发送端口输入若干数据,例如:
1375864674543 Tom 1375864674553 Spy 1375864674571 Andy 1375864688436 Cheater 1375864784240 Kelvin 1375864853892 Steven 1375864979347 John
每行第一项为时刻的毫秒数,第二项是程序中要过滤的名称。
打开浏览器,看History Server的日志信息,如图1-2所示。
图1-2 History Server日志信息示例
图中按时间顺序显示了曾经运行过的应用程序,第一列是App ID,有各应用程序执行信息的链接。
单击最新的应用,看目前运行的应用程序中有些什么Job,如图1-3所示。
图1-3 Spark Jobs页面示例
这样一个Spark Streaming应用程序运行时总共有5个Job。
观察这些Job的内容,可以揭示一些现象。
Job 0不体现应用程序的业务逻辑代码,如图1-4所示。其实此Job是Spark Streaming出于对后面计算的负载均衡的考虑而产生的。
图1-4 Details for Job 0示例
Job 0包含Stage 0、Stage 1。随便看一个Stage,比如Stage 0,看看其中的Aggregated Metrics by Executor部分,如图1-5所示。
图1-5 Aggregated Metrics by Executor页面示例
因为是分布式环境做负载均衡,所以Job 0的Stage 1是在4个Worker的Executor上运行。
Job 1的运行时间比较长,耗时1.5min,如图1-6所示。
图1-6 页面示例:Details for Job 1
单击Stage 2的链接,看看Aggregated Metrics By Executor部分,如图1-7所示。
图1-7 页面示例:Aggregated Metrics by Executor
可以知道,Stage 2只在Worker4上的一个Executor执行,而且执行了1.5min。
从业务处理的角度看,此前发送了很少的数据,这里却显示有一个运行1.5min的任务。这个任务是做什么呢?
从DAG Visualization部分可以知道此Job实际就是启动了一个接收数据的接收器(Receiver),如图1-8所示。
图1-8 页面示例:Details for Stage 2
原来Receiver是通过一个Job来启动的。
Tasks部分如图1-9所示。
图1-9 页面示例:Tasks
只有一个Worker运行此Job,用于接收数据。
Spark Streaming应用程序启动后,自己会启动一些Job。默认情况是启动一个Job接收数据,为后续处理做准备。
Locality Level是PROCESS_LOCAL。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。
从Job 2的Details可以发现程序的主要业务逻辑,体现在Stage 3、Stage 4、Stage 5中,如图1-10所示。
图1-10 页面示例:Details for Job 2
仔细观察Stage 3、Stage 4,可以知道这两个Stage都是用4个Executor执行的,所有数据处理是在4台机器上进行的,如图1-11所示。
图1-11 页面示例:Aggregated Metrics by Executor
Stage 5只在Worker4上,这是因为这个Stage有Shuffle操作。
Job 3有Stage 6、Stage 7、Stage 8,其中Stage 6、Stage 7显示灰色,说明被跳过,如图1-12所示。
图1-12 页面示例:Details for Job 3
由Stage 8的Aggregated Metrics by Executor部分可以看出,数据处理是在4台机器上进行的,如图1-13所示。
图1-13 页面示例:Aggregated Metrics by Executor
Job 4也体现了应用程序中的业务逻辑。Job 4有Stage 9、Stage 10、Stage 11,其中Stage 9、Stage 10被跳过,如图1-14所示。
图1-14 页面示例:Details for Job 4
细察Stage 11,可以看出,数据处理是在Worker2之外的其他3台机器上进行的,如图1-15所示。
图1-15 页面示例:Aggregated Metrics by Executor
从而可得出结论:一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。
让程序运行若干小时,观察没有停下来的Spark Streaming程序运行留下的信息,如图1-16所示。
图1-16 Spark Streaming应用程序运行界面示例
这个程序仍然在不断地循环运行。即使没有接收到新数据,日志中也不断循环显示着JobScheduler、BlockManager、MapPartitionsRDD、ShuffledRDD等的信息,其中有一部分是Spark Core相关的信息。