腾讯 CSIG 应用研究员万字解读 Spark 部署与工作原理
一、Spark 概述
Spark 是 UC Berkeley AMP Lab 开源的通用分布式并行计算框架,目前已成为 Apache 软件基金会的顶级开源项目。Spark 支持多种编程语言,包括 Java、Python、R 和 Scala,同时 Spark 也支持 Hadoop 的底层存储系统 HDFS,但 Spark 不依赖 Hadoop。
(图片来源:Apache Spark™)
1.2 Spark 架构及生态Spark 除了 Spark Core 外,还有其它由多个组件组成,目前主要有四个组件:Spark SQL、Spark Streaming、MLlib、GraphX。这四个组件加上 Spark Core 组成了 Spark 的生态。通常,我们在编写一个 Spark 应用程序,需要用到 SparkCore 和其余 4 个组件中的至少一个。Spark 的整体构架图如下图所示:

Spark Core:是 Spark 的核心,主要负责任务调度等管理功能。SparkCore 的实现依赖于 RDDs(Resilient Distributed Datasets,弹性分布式数据集)的程序抽象概念。
Spark SQL:是 Spark 处理结构化数据的模块,该模块旨在将熟悉的 SQL 数据库查询与更复杂的基于算法的分析相结合,SparkSQL 支持开源 Hive 项目及其类似 SQL 的 HiveQL 查询语法。SparkSQL 还支持 JDBC 和 ODBC 连接,能够直接连接现有的数据库。
Spark Streaming:这个模块主要是对流数据的处理,支持流数据的可伸缩和容错处理,可以与 Flume(针对数据日志进行优化的一个系统)和 Kafka(针对分布式消息传递进行优化的流处理平台)等已建立的数据源集成。Spark Streaming 的实现,也使用 RDD 抽象的概念,使得在为流数据(如批量历史日志数据)编写应用程序时,能够更灵活,也更容易实现。
MLlib:主要用于机器学习领域,它实现了一系列常用的机器学习和统计算法,如分类、回归、聚类、主成分分析等算法。
GraphX:这个模块主要支持数据图的分析和计算,并支持图形处理的 Pregel API 版本。GraphX 包含了许多被广泛理解的图形算法,如 PageRank。
本地运行模式是 Spark 中最简单的一种模式,也可称作伪分布式模式。
独立运行模式为 Spark 自带的一种集群管理模式,Mesos 及 YARN 两种模式也是比较常用的集群管理模式。相比较 Mesos 及 YARN 两种模式而言,独立运行模式是最简单,也最容易部署的一种集群运行模式。
Kubernetes 是一个用于自动化部署、扩展和管理容器化应用程序的开源系统。
Spark 底层还支持多种数据源,能够从其它文件系统读取数据,如 HDFS、Amazon S3、Hypertable、HBase 等。Spark 对这些文件系统的支持,同时也丰富了整个 Spark 生态的运行环境。
Standalone模式为 Spark 自带的一种集群管理模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。它是 Spark 实现的资源调度框架,其主要的节点有 Driver 节点、Master 节点和 Worker 节点。Standalone模式也是最简单最容易部署的一种模式。
Spark on YARN模式,即 Spark 运行在Hadoop YARN框架之上的一种模式。Hadoop YARN(Yet Another ResourceNegotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度。
Spark on Mesos模式,即 Spark 运行在Apache Mesos框架之上的一种模式。Apache Mesos是一个更强大的分布式资源管理框架,负责集群资源的分配,它允许多种不同的框架部署在其上,包括YARN。它被称为是分布式系统的内核。
三种架构都采用了Master/Worker(Slave)的架构,Spark 分布式运行架构大致如下:
本文主要介绍 Spark 的Standalone模式的部署
三、环境准备出于学习的目的,本文将 Spark 部署在安装有 CentOS7 系统的 VirtualBox 虚拟机中。
搭建 Spark 集群,需要准备以下文件及环境:jdk-8u211-linux-x64.tar.gzspark-2.4.3-bin-hadoop2.7.tgz3 个独立的 CentOS7 虚拟机系统,机器集群规划如下:
四、安装
4.1. 配置 jdk 环境
解压文件:
tar -zxf jdk-8u211-linux-x64.tar.gz
配置环境变量:
export JAVA_HOME=/path/to/jdk1.8.0_211
export PATH=$PATH:$JAVA_HOME/bin
4.2. 配置 Spark 环境
解压文件:
tar -xf **park-2.4.3-bin-hadoop2.7.tgz**
配置环境变量:
export SPARK_HOME=/path/to/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
修改spark-env.sh 文件
cd spark-2.4.3-bin-hadoop2.7
cp conf/spark-env.sh.template conf/spark-env.sh
vim conf/spark-env.sh
1. 增加如下内容:
export JAVA_HOME=/path/to/jdk1.8.0_211
export SPARK_MASTER_HOST=192.168.56.106
修改slaves文件
cp conf/slaves.template conf/slaves
vim conf/slaves
1. 增加如下内容:
192.168.56.106
192.168.56.107
192.168.56.108
4.3. 配置 ssh 免密登录
配置 ssh 免密登录,是为了能够在master机器上来启动所有worker节点,如果不配置免密登录,则在启动每个worker时,都需要输入一遍密码,会很麻烦。当然,如果机器少的话,也可以登录到worker节点上,手动一个一个启动worker。
执行:ssh-keygen -t rsa,一直按回车即可。最后会生成类似这样的日志:
并且在用户目录下会自动生成.ssh目录执行ls ~/.ssh可以看到两个文件:
id_rsa生成的私钥文件
id_rsa.pub生成的公钥文件
将id_rsa.pub复制到其它机器上,执行以下几条命令:
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.106: # master所在的主机,如果master不做woker可以不需要。
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.107:
ssh-copy-id -i ~/.ssh/id_rsa.pub royran@192.168.56.108:
4.4 配置其它 worker 节点
当前已在master节点配置好了环境,还需要在其它worker节点上配置相类似的环境。
配置其它worker节点很简单,只需要将jdk1.8.0_211及spark-2.4.3-bin-hadoop2.7两个目录复制到其它worker节点机器上即可。但要注意,这两个目录在其它 worker 上的绝对路径需要与 master 上的绝对路径一致,不然无法直接在 master 上启动其它 worker 节点。
依次执行以下命令(如果已经配置好 ssh 免密,可以发现执行 scp 指令不需要两次输入密码):
scp -r /path/to/jdk1.8.0_211username@192.168.56.107:/path/to/jdk1.8.0_211
scp -r /path/to/jdk1.8.0_211username@192.168.56.108:/path/to/jdk1.8.0_211
scp -r /path/to/spark-2.4.3-bin-hadoop2.7username@192.168.56.107:/path/to/spark-2.4.3-bin-hadoop2.7
scp -r /path/to/spark-2.4.3-bin-hadoop2.7username@192.168.56.108:/path/to/spark-2.4.3-bin-hadoop2.7
4.5 启动master
执行:
sbin/start-master.sh
输入jps指令(该指令在
看到有Master字样,说明master进程启动成功了,启动master后,spark 默认会监听8080端口,并可以通过浏览器打开 web 界面,在地址栏输入http://192.168.56.106:8080,查看集群状态。如下图所示:

当前只启动了master,所以看不到任何worker信息。
4.6 启动 worker 节点
执行:
sbin/slaves.sh
会看到类似这样的输出:
再输入jps,会列出当前启动的java进程,显示Worker字样,说明worker进程启动成功了。

此时再刷新下打开的浏览器界面(http://192.168.56.106:8080),可以看到当前启动了三个Worker节点。

也许你会发现界面上显示的 Address 列,怎么是 10 开头的 ip 地址,并且都是一样的,而不是 192 开头的三个不同的 ip 地址。

这是因为虚拟机内有两块虚拟网卡,Spark 会读取环境变量SPARK_LOCAL_IP,如果没设置这个变量,Spark 就会使用getHostByName来获取 ip 地址,会得到10.0.2.15这个 ip 地址。

要解决这个问题,有两种方法:
(1) 将仅主机(Host-Only)网络设置为网卡 1,将网络地址转换(NAT)设置为网卡 2。不过如果使用这种方法,重启虚拟机后,如果是动态 ip,则 ip 地址会变化,会影响之前的配置。
(2) 另一种方法,可在conf/spark-env.sh中设置SPARK_LOCAL_IP这个变量,可以固定为一个 ip 地址,
vim conf/spark-env.sh
1. 添加一行:
export SPARK_LOCAL_IP=192.168.56.106
在其他机器上同样需要手动添加这一行,不过要修改为对应的机器 ip。觉得这样有点麻烦。可以通过脚本动态获取本机 ip 地址,在conf/spark-env.sh中添加这两行:
SPARK_LOCAL_IP=`python -c "import socket;import fcntl;import struct;print([(socket.inet_ntoa(fcntl.ioctl(s.fileno(),0x8915,struct.pack('256s', 'enp0s8'))[20:24]), s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][0])"`
export SPARK_LOCAL_IP
这样就可以自动获取本机的enp0s8这块网卡的 ip 地址。
最后将修改后的conf/spark-env.sh这个文件复制到其它机器上:
执行:
scp conf/spark-env.sh username@192.168.56.107:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh
scp conf/spark-env.sh username@192.168.56.108:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh
重新启动所有节点:
sbin/stop-all.sh
sbin/start-all.sh
最后刷新浏览器界面,可以看到有 3 个Woker启动了,并且在 Address 列也可以看到都变为 192 开头的 ip 地址了。
五、测试
在{SPARK_HOME}/examples/src/main目录下,有一些 spark 自带的示例程序,有 java、python、r、scala 四种语言版本的程序。这里主要测试 python 版的计算PI的程序。
cd ${SPARK_HOME}/examples/src/main/python
将pi.py程序提交到 spark 集群,执行:
spark-submit --master spark://192.168.56.106:7077 pi.py
最后可以看到输出这样的日志:
刷新浏览器界面,在Completed Applications栏可以看到一条记录,即刚才执行的计算PI的 python 程序。

另外,如果觉得在终端中输出的日志太多,可以修改日志级别:
cp ${SPARK_HOME}/conf/log4j.properties.template ${SPARK_HOME}/conf/log4j.properties
vim ${SPARK_HOME}/conf/log4j.properties
修改日志级别为WARN:
再重新执行:spark-submit --master spark://192.168.56.106:7077 pi.py,可以看到输出日志少了很多。

除了提交 python 程序外,spark-submit 还可以提交打包好的java、scala程序,可以执行spark-submit --help看具体用法。
Spark 配置文件说明在下载下来的spark-2.4.3-bin-hadoop2.7.tgz中,conf 目录下会默认存在这几个文件,均为 Spark 的配置示例模板文件:

这些模板文件,均不会被 Spark 读取,需要将.template后缀去除,Spark 才会读取这些文件。这些配置文件中,在 Spark 集群中主要需要关注的是log4j.properties、slaves、spark-defaults.conf、spark-env.sh这四个配置文件。
log4j.properties的配置,可以参考Apache Log4j官网上的 Propertities 属性配置说明。
slaves的配置,里面为集群的所有worker节点的主机信息,可以为主机名,也可以为 ip 地址。
spark-defaults.conf的配置,可以参考Spark 官网的属性配置页。比如指定 master 节点地址,可以设置spark.master属性;指定 executor 的运行时的核数,可以设置spark.executor.cores属性等。
spark-env.sh是 Spark 运行时,会读取的一些环境变量,在本文中,主要设置了三个环境变量:JAVA_HOME、SPARK_HOME、SPARK_LOCAL_IP,这是 Spark 集群搭建过程中主要需要设置的环境变量。其它未设置的环境变量,Spark 均采用默认值。其它环境变量的配置说明,可以参考Spark 官网的环境变量配置页。
至此,Spark 集群的Standalone模式部署全部结束。
对于 Spark 的学习,目前我掌握还比较浅,还在学习过程中。如果文章中有描述不准确,或不清楚的地方,希望给予指正,我会及时修改。谢谢!
关于 Spark 的学习,可以根据 Spark 官网上的指导快速入门:
https://spark.apache.org/docs/latest/quick-start.html
Application:基于 Spark 的用户程序,即由用户编写的调用 Spark API 的应用程序,它由集群上的一个驱动(Driver)程序和多个执行器(Executor)程序组成。其中应用程序的入口为用户所定义的 main 方法。
SparkContext:是 Spark 所有功能的主要入口点,它是用户逻辑与 Spark 集群主要的交互接口。通过SparkContext,可以连接到集群管理器(Cluster Manager),能够直接与集群 Master 节点进行交互,并能够向 Master 节点申请计算资源,也能够将应用程序用到的 JAR 包或 Python 文件发送到多个执行器(Executor)节点上。
Cluster Manager:即集群管理器,它存在于 Master 进程中,主要用来对应用程序申请的资源进行管理。
Worker Node:任何能够在集群中能够运行 Spark 应用程序的节点。
Task:由SparkContext发送到Executor节点上执行的一个工作单元。
Driver:也即驱动器节点,它是一个运行Application中main()函数并创建SparkContext的进程。Driver节点也负责提交Job,并将Job转化为Task,在各个Executor进程间协调 Task 的调度。Driver节点可以不运行于集群节点机器上。
Executor:也即执行器节点,它是在一个在工作节点(Worker Node)上为Application启动的进程,它能够运行 Task 并将数据保存在内存或磁盘存储中,也能够将结果数据返回给Driver。
根据以上术语的描述,通过下图可以大致看到 Spark 程序在运行时的内部协调过程:
(图片来源:Cluster Mode Overview)
除了以上几个基本概念外,Spark 中还有几个比较重要的概念。
对于 Spark 的 RDD 计算抽象过程描述如下:
makeRDD:可以通过访问外部物理存储(如 HDFS),通过调用 SparkContext.textFile()方法来读取文件并创建一个 RDD,也可以对输入数据集合通过调用 SparkContext.parallelize()方法来创建一个 RDD。RDD 被创建后不可被改变,只可以对 RDD 执行 Transformation 及 Action 操作。
Transformation(转换):对已有的 RDD 中的数据执行计算进行转换,并产生新的 RDD,在这个过程中有时会产生中间 RDD。Spark 对于Transformation采用惰性计算机制,即在 Transformation 过程并不会立即计算结果,而是在 Action 才会执行计算过程。如map、filter、groupByKey、cache等方法,只执行Transformation操作,而不计算结果。
Action(执行):对已有的 RDD 中的数据执行计算产生结果,将结果返回 Driver 程序或写入到外部物理存储(如 HDFS)。如reduce、collect、count、saveAsTextFile等方法,会对 RDD 中的数据执行计算。
窄依赖(NarrowDependency):每个父 RDD 的一个分区最多被子 RDD 的一个分区所使用,即 RDD 之间是一对一的关系。窄依赖的情况下,如果下一个 RDD 执行时,某个分区执行失败(数据丢失),只需要重新执行父 RDD 的对应分区即可进行数恢复。例如map、filter、union等算子都会产生窄依赖。
宽依赖(WideDependency,或 ShuffleDependency):是指一个父 RDD 的分区会被子 RDD 的多个分区所使用,即 RDD 之间是一对多的关系。当遇到宽依赖操作时,数据会产生Shuffle,所以也称之为ShuffleDependency。宽依赖情况下,如果下一个 RDD 执行时,某个分区执行失败(数据丢失),则需要将父 RDD 的所有分区全部重新执行才能进行数据恢复。例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖。
RDD 依赖关系如下图所示:
6.3 Partition6.3.1 基本概念
partition(分区)是 Spark 中的重要概念,它是RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。partition的数量决定了task的数量,每个task对应着一个partition。
例如,使用 Spark 来读取本地文本文件内容,读取完后,这些内容将会被分成多个partition,这些partition就组成了一个RDD,同时这些partition可以分散到不同的机器上执行。RDD 的 partition 描述如下图所示:
(图片来源:Spark簡易操作)
partition的数量可以在创建 RDD 时指定,如果未指定 RDD 的 partition 大小,则在创建 RDD 时,Spark 将使用默认值,默认值为spark.default.parallelism配置的参数。
如果 partition 数量太少,则直接影响是计算资源不能被充分利用。例如分配 8 个核,但 partition 数量为 4,则将有一半的核没有利用到。
如果 partition 数量太多,计算资源能够充分利用,但会导致 task 数量过多,而 task 数量过多会影响执行效率,主要是 task 在序列化和网络传输过程带来较大的时间开销。
根据Spark RDD Programming Guide上的建议,集群节点的每个核分配 2-4 个partitions比较合理。
Partition 调整:
Spark 中主要有两种调整 partition 的方法:coalesce、repartition
参考 pyspark 中的函数定义:
def coalesce(self, numPartitions, shuffle=False):
"""
Return a new RDD that is reduced into `numPartitions` partitions.
"""
def repartition(self, numPartitions):
"""
*Return a new RDD that has exactly numPartitions partitions.*
*Can increase or decrease the level of parallelism in this RDD.*
*Internally, this uses a shuffle to redistribute data.*
*If you are decreasing the number of partitions in this RDD, consider*
*using `coalesce`, which can avoid performing a shuffle.*
*"""
return self.coalesce(numPartitions, shuffle=True)
从函数接口可以看到,reparation是直接调用coalesce(numPartitions, shuffle=True),不同的是,reparation函数可以增加或减少 partition 数量,调用repartition函数时,还会产生shuffle操作。而coalesce函数可以控制是否shuffle,但当shuffle为False时,只能减小partition数,而无法增大。
而每个Job会分解成一系列可并行处理的Task,然后将Task分发到不同的Executor上运行,这也是 Spark 分布式执行的简要流程。
如下图所示,为一个复杂的 DAG Stage 划分示意图:
(图片来源:Job物理执行图)
上图为一个 Job,该 Job 生成的 DAG 划分成了 3 个 Stage。上图的 Stage 划分过程是这样的:从最后的Action开始,从后往前推,当遇到操作为NarrowDependency时,则将该操作划分为同一个Stage,当遇到操作为ShuffleDependency时,则将该操作划分为新的一个Stage。
Spark 中Task分为两种类型,ShuffleMapTask 和 ResultTask,位于最后一个 Stage 的 Task 为 ResultTask,其他阶段的属于 ShuffleMapTask。ShuffleMapTask 和 ResultTask 分别类似于 Hadoop 中的 Map 和 Reduce。
Spark 集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点。Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点。Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 Executors。
Driver 为用户编写的 Spark 应用程序所运行的进程。Driver 程序可以运行在 Master 节点上,也可运行在 Worker 节点上,还可运行在非 Spark 集群的节点上。
(图片来源:Core Services behind Spark Job Execution)
7.2.2 TaskSchedulerDAGScheduler 将一个 TaskSet 交给 TaskScheduler 后,TaskScheduler 会为每个 TaskSet 进行任务调度,Spark 中的任务调度分为两种:FIFO(先进先出)调度和 FAIR(公平调度)调度。
FIFO 调度:即谁先提交谁先执行,后面的任务需要等待前面的任务执行。这是 Spark 的默认的调度模式。
FAIR 调度:支持将作业分组到池中,并为每个池设置不同的调度权重,任务可以按照权重来决定执行顺序。
在 Spark 中使用哪种调度器可通过配置spark.scheduler.mode参数来设置,可选的参数有 FAIR 和 FIFO,默认是 FIFO。
FIFO 调度算法为 FIFOSchedulingAlgorithm,该算法的 comparator 方法的 Scala 源代码如下:
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority // priority实际为Job ID
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
根据以上代码,FIFO 调度算法实现的是:对于两个调度任务 s1 和 s2,首先比较两个任务的优先级(Job ID)大小,如果 priority1 比 priority2 小,那么返回 true,表示 s1 的优先级比 s2 的高。由于 Job ID 是顺序生成的,先生成的 Job ID 比较小,所以先提交的 Job 肯定比后提交的 Job 优先级高,也即先提交的 Job 会被先执行。
如果 s1 和 s2 的 priority 相同,表示为同一个 Job 的不同 Stage,则比较 Stage ID,Stage ID 小则优先级高。
FAIR 调度算法为 FairSchedulingAlgorithm,该算法的 comparator 方法的 Scala 源代码如下:
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare 0) {
false
} else {
s1.name < s2.name
}
}
由以上代码可以看到,FAIR 任务调度主要由两个因子来控制(关于 FAIR 调度的配置,可参考${SPARK_HOME}/conf/fairscheduler.xml.template文件):
weight:相对于其它池,它控制池在集群中的份额。默认情况下,所有池的权值为 1。例如,如果给定一个特定池的权重为 2,它将获得比其它池多两倍的资源。设置高权重(比如 1000)也可以实现池与池之间的优先级。如果设置为-1000,则该调度池一有任务就会马上运行。
minShare:最小 CPU 核心数,默认是 0,它能确保池总是能够快速地获得一定数量的资源(例如 10 个核),在权重相同的情况下,minShare 越大,可以获得更多的资源。
对以上代码的理解:
如果 s1 所在的任务池正在运行的任务数量比 minShare 小,而 s2 所在的任务池正在运行的任务数量比 minShare 大,那么 s1 会优先调度。反之,s2 优先调度。
如果 s1 和 s2 所在的任务池正在运行的 task 数量都比各自 minShare 小,那么 minShareRatio 小的优先被调度。
如果 s1 和 s2 所在的任务池正在运行的 task 数量都比各自 minShare 大,那么 taskToWeightRatio 小的优先被调度。
如果 minShareRatio 或 taskToWeightRatio 相同,那么最后比较各自 Pool 的名字。
(图片来源:Spark2.3.2 source code analysis)
八、Spark RDD 常用函数8.1 Transformation

Spark 调度模式-FIFO 和 FAIR
作者:royran,腾讯 CSIG 应用研究员