1.3.1 由Spark组件组成的一站式软件栈
如图 1-3 所示,针对各种各样的作业类型,Spark 提供了 4 个组件:Spark SQL、Spark Structured Streaming、Spark MLlib,以及 GraphX。这些组件都独立于 Spark 具有容错性的内核引擎。通过使用这些组件提供的 API,你可以实现自己的 Spark 应用。Spark 会将它转为 DAG,然后交给内核引擎执行。这样一来,无论你使用的是 Java、R、Scala、SQL、Python 中的哪种语言,只要使用了提供的这些结构化 API(第 3 章将对其做进一步介绍),底层代码就都会转为高度紧凑的字节码,然后交给集群内工作节点上的 JVM 去执行。
图 1-3:Spark 的组件和 API 栈
接下来分别介绍这些组件。
Spark SQL
Spark SQL 非常适合处理结构化数据。关系数据库的表或用文件(CSV、文本文件、JSON、Avro、ORC、Parquet 等文件格式)存储的结构化数据都可以被读取并构建为 Spark 的永久表或临时表。另外,当使用 Spark 提供的 Java、Python、Scala 或 R 等语言的结构化 API 时,你可以直接用 SQL 查询来访问刚读入为 Spark DataFrame 的数据。至撰写本书时,Spark SQL 语法已经兼容于 ANSI SQL:2003 标准,功能也和纯正的 SQL 引擎相同。
举例来说,在以下 Scala 代码片段中,你可以从存储于亚马逊云 S3 存储桶内的 JSON 文件读取数据、创建临时表,并且执行类似 SQL 的查询语句。查询结果由内存中的 Spark DataFrame 对象表示。
// Scala代码 // 从亚马逊云S3存储桶加载数据为Spark DataFrame spark.read.json("s3://apache_spark/data/committers.json") .createOrReplaceTempView("committers") // 发起SQL查询,并以Spark DataFrame的形式返回结果 val results = spark.sql("""SELECT name, org, module, release, num_commits FROM committers WHERE module = 'mllib' AND num_commits > 10 ORDER BY num_commits DESC""")
你也可以用 Python、R、Java 等语言写出类似的代码片段,因为执行时所生成的字节码相同,所以执行性能也是一样的。
Spark MLlib
Spark 自带一个包含常见机器学习算法的库——MLlib。从 Spark 发布第一个版本至今,这个组件的性能已经有了天翻地覆的提升,尤其是 Spark 2.x 底层引擎的改进。
MLlib 提供了很多常见的机器学习算法,这些算法基于高级的 DataFrame API 构建,可用于搭建模型。
从 Spark 1.6 开始,MLlib 项目分成了两个包:
spark.mllib
和spark.ml
。后者提供基于 DataFrame 的 API,而前者包含基于 RDD 的 API,现在前者处于维护模式(不会开发新特性,只提供原有特性维护)。所有的新特性只存在于spark.ml
中。本书所提到的“MLlib”是 Spark 中机器学习库的总称。这些 API 允许你提取或转化特征,构建用于训练和评估模型的流水线,并在不同环境间持久化模型(包括保存和重新加载)。其他功能包括常见线性代数运算和统计函数的使用。MLlib 包含其他一些低级的机器学习原语,比如通用的梯度下降优化。以下的 Python 代码片段封装了数据科学家在构建模型时的一些基本操作(第 10~11 章将讨论更详细的示例)。
# Python代码 from pyspark.ml.classification import LogisticRegression ... training = spark.read.csv("s3://...") test = spark.read.csv("s3://...") # 加载训练数据 lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) # 拟合模型 lrModel = lr.fit(training) # 预测 lrModel.transform(test) ...
Spark Structured Streaming
基于 Spark SQL 引擎和 DataFrame API,Spark 2.0 引入了实验性的连续流处理模型和结构化流处理 API。到 Spark 2.2 版本,结构化流处理(Structured Streaming)已经到了一般可用的状态,也就是说,开发人员已经可以在生产环境中使用它了。
为了让大数据开发人员能够将来自 Apache Kafka 或其他流式数据源的数据流和静态数据结合起来,并实时响应,新模型将数据流视为持续增长的表,新的数据记录不断追加到表的最后。开发人员只需将数据流当作结构化的表,像静态表那样用查询语句直接进行查询即可。
在结构化流处理模型下,内部的 Spark SQL 核心引擎会处理包括容错和迟到数据(late- data)语义在内的方方面面,以便开发人员从容地将注意力放在流处理应用本身的实现上。这种新模型取代了 Spark 1.x 版本中基于 DStream 的旧模型,第 8 章将对其进行详细介绍。此外,Spark 2.x 和 Spark 3.0 将流式数据源的范围扩大到包括 Apache Kafka、Kinesis、基于 HDFS 的数据,以及云存储的数据。
以下代码片段展示了结构化流处理应用的典型结构。这个示例从本机的套接字读取数据,并将单词计数结果写入 Apache Kafka。
# Python代码 # 从本机读取一个数据流 from pyspark.sql.functions import explode, split lines = (spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()) # 执行转化操作 # 将每行字符切分为单词 words = lines.select(explode(split(lines.value, " ")).alias("word")) # 生成流式单词统计 word_counts = words.groupBy("word").count() # 将输出写入Kafka数据流 query = (word_counts .writeStream .format("kafka") .option("topic", "output"))
GraphX
顾名思义,GraphX 是用于操作图(如社交网络图、路线和连接点图、网络拓扑图等)和执行并行图计算的库。社区用户为它贡献了标准的图算法,可以用于分析、连通、遍历。可供使用的算法包括 PageRank(网页排名)、Connected Components(连通分支),以及 Triangle Counting(三角计数)等。2
以下代码片段展示了如何用 GraphX 的 API 连接两个图。
// Scala代码 val graph = Graph(vertices, edges) val messages = spark.textFile("hdfs://...") val graph2 = graph.joinVertices(messages) { (id, vertex, msg) => ... }
2Databricks 向社区贡献的开源项目 GraphFrames 是一个和 Spark 中的 GraphX 类似的通用图计算库,区别在于 GraphFrames 使用基于 DataFrame 的 API。