Spark工作机制(二)

了解了RDD概念后,介绍下Spark的工作机制:

1、惰性计算

首先,值得一提的是,Spark的RDD的Transformation操作都是惰性计算的,也就是只有在执行Action操作的时候才会真正开始计算。转化操作不会立刻执行,而是在内部记录下所要执行的操作的相关标识,等到了Action操作后再执行,Spark的这一个特性也叫做惰性计算

这样有什么好处呢?举个例子,假如textFile读取文件数据,通过flatMap按照空格分隔后,再用fitler方法根据关键字过滤,最后调用first()返回数据集第一个元素,如果中途每个操作都要产生新的 RDD,那势必会浪费很多内存空间,所以Spark在调用first操作的时候,才会正常的执行计算,因此,我们调试Spark程序时,当断点打在Transformation操作的函数中时,有时候会无法进入,只有当调用了Action操作时,才会真正执行。

2、Spark应用执行组件

Spark应用的运行方式分为Cluster模式和Client模式,其中Client模式是指Driver Program在任务提交机上执行,适用于交互和调试,也就是希望快速看到计算结果;而Cluster模式运行在ApplicationMaster中,一般应用在生产过程中。

这里先解释一些涉及的基本组件的概念:

Application:用户自定义的Spark程序,包含1个Driver Program和若干个Executor进程。

Driver Program:运行Application的main()函数并且创建SparkContext,是应用的主控进程,负责应用的解析、切分Stage并调度Task到Executor进程上执行。

SparkContext:是spark应用程序的入口,负责调度各个运算资源,协调各个work node上的Executor。

Executor:是Application运行在work node上的一个进程,负责运行Task,一般每个Application都有各自独立的executors,不同application的executor若不通过外部存储,是无法进行数据交互的。

Master:集群中的含有Master进程的节点,用于控制、管理和监督整个spark集群。

Client:客户端节点,负责客户端进程,提交job到master。

Work Node:集群的工作节点,接受Master的指令,运行Application的代码,并向Master汇报进度。

Task:任务,一般一个RDD分区对应一个Task,是单个分区上最小的处理流程单元。

TaskSet:一组关联的,但相互之间没有Shuffle依赖关系的Task集合。

Stage:一个taskSet对应的调度阶段,每个job会根据RDD的宽依赖关系被切分很多Stage,每个stage都包含 一个TaskSet。

Job:由Spark Action操作触发的作业。

Spark几个运行组件的关系如图1所示:

3、Spark应用执行流程

Spark的应用提交有2种模式,一个是Driver进程在客户端运行,一个是Master节点指定Driver进程在某个Work节点运行。

(1)Driver进程在客户端运行

执行流程如图2所示,

  • Client端启动Driver进程,在Driver中启动或实例化DAGScheduler等组件。
  • Worker向Master注册,Master通过指令让Worker启动Executor。
  • Worker收到指令后创建ExecutorRunner线程,ExecutorRunner线程内部启动ExecutorBackend进程
  • ExecutorBackend启动后,向Client端Driver进程内的SchedulerBackend注册,这样Dirver进程就可以发现计算资源了。
  • Driver的DAGScheduler解析应用中的RDD DAG并生成相应的Stage,每个Stage包含的TaskSet通过TaskScheduler分配给Executor,在Exectutor内部启动线程池并行化执行Task
  • 当所有Stage被执行完了之后,各个Worker汇报给Driver,同时释放资源,Driver向Master汇报,同时由于Driver在Client上,Clinet也知道应用的执行进度。

(2)Driver进程在Work节点运行

执行流程如图3所示,

  • 客户端提交应用程序给Master
  • Master调度应用,指定一个Worker节点启动Driver。
  • Worker接收到Master命令后创建RriverRunner线程,在DriverRunner线程内创建SchedulerBackend进程,Dirver充当整个作业的主控进程。
  • Master指定其他Worker节点启动Exeuctor,Worker创建ExecutorRunner线程,启动ExecutorBackend进程,剩下流程与上面类似,不再赘述。

Spark工作机制(一)

要了解Spark工作机制,首先要知道几个概念,第一个就是RDD:

1、什么是RDD

RDD(Resilient Distributed Datasets) 是 Spark 的核心概念,中文名是弹性数据集,通俗的讲可以理解为是一种抽象的大规模数据集合,或者是一个大的数组,这个数组是分布在集群上的,Spark会在这个数据集合上做一系列的数据处理、计算,然后产生新的RDD,直到最后得到计算结果。

至于为什么叫弹性数据集,弹性如何解释?这里是指在任何时候都能进行重算。举个例子,当集群中的一台节点故障导致RDD丢失后,Spark还可以重新计算出这部分的分区的数据,所以,RDD是一种天生具有容错机制的特殊集合,不需要通过数据冗余的方式(比如检查点)实现容错,对用户来说,感觉不到这部分的内容曾经丢失,所以RDD数据集就像一个海绵一样,无论如何挤压都是完整的。

2、RDD的特性

Spark官网对RDD的描述:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

解释如下:

  • 是一组分区(partition),类似hadoop,能够被切分,从而实现并行计算,如图1所示,RDD1中有3个区分(p1,p3,p4),分别存储在3个节点上
  • 有一个函数计算分片,每个RDD都会实现compute函数,对每个分区内的数据进行计算
  • 依赖其他RDD的列表,例如由于RDD的转换生成一个新的RDD,这样RDD之间就会形成类似于流水线一样的前后依赖关系
  • 可选,一个分区函数。Spark中有HashPartitioner和RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None
  • 可选,一个存储存取每个Partition的优先位置的列表。对于HDFS文件来说,这个列表保存的就是每个Partition所在块的位置

3、RDD的操作分类

RDD有2种操作类型:

  • 转换(transformations) :从已经存在的数据集中创建一个新的数据集,会创建一个新的RDD,例如map操作,会针对将数据集的每个元素传给函数处理,并生成一个新的RDD
  • 动作(actions) :在数据集上进行计算之后返回一个值到驱动程序,例如reduce动作,使用函数聚合RDD所有元素,并将结果返回给驱动程序

常用的Transformation如下:

  • map(func):返回一个新的分布式数据集,该数据集由每一个输入元素经过func函数转换后组成
  • fitler(func):返回一个新的数据集,该数据集由经过func函数计算后返回值为true的输入元素组成
  • flatMap(func):类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func返回一个序列,而不是单一元素)
  • mapPartitions(func):类似于map,但独立地在RDD上每一个分片上运行,因此在类型为T的RDD上运行时,func函数类型必须是Iterator[T]=>Iterator[U]
  • mapPartitionsWithSplit(func):类似于mapPartitons,但func带有一个整数参数表示分片的索引值。因此在类型为T的RDD上运行时,func函数类型必须是(Int,Iterator[T])=>Iterator[U]
  • sample(withReplacement,fraction,seed):根据fraction指定的比例对数据进行采样,可以选择是否用随机数进行替换,seed用于随机数生成器种子
  • union(otherDataSet):返回一个新数据集,新数据集是由原数据集和参数数据集联合而成
  • distinct([numTasks]):返回一个包含原数据集中所有不重复元素的新数据集
  • groupByKey([numTasks]):在一个(K,V)数据集上调用,返回一个(K,Seq[V])对的数据集。注意默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数来改变它
  • reduceByKey(func,[numTasks]):在一个(K,V)对的数据集上调用,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同的key的值聚合到一起。与groupByKey类似,reduceByKey任务的个数是可以通过第二个可选参数来设置的
  • sortByKey([[ascending],numTasks]):在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定
  • join(otherDataset0,[numTasks]):在类型为(K,V)和(K,W)数据集上调用,返回一个相同的key对应的所有元素在一起的(K,(V,W))数据集
  • cogroup(otherDataset,[numTasks]):在类型为(K,V)和(K,W)数据集上调用,返回一个(K,Seq[V],Seq[W])元祖的数据集。这个操作也可以称为groupwith
  • cartesain(ohterDataset):笛卡尔积,在类型为T和U类型的数据集上调用,返回一个(T,U)对数据集(两两的元素对)

常用的Action如下:

  • reduce(func):通过函数func(接收两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的并行运行
  • collect():在驱动程序中,以数组形式返回数据集中的所有元素。通常在使用filter或者其他操作返回一个足够小的数据子集后再使用会比较有用
  • count():返回数据集元素个数
  • first():返回数据集第一个元素(类似于take(1))
  • take(n):返回一个由数据集前n个元素组成的数组,注意 这个操作目前并非并行执行,而是由驱动程序计算所有的元素
  • takeSample(withReplacement,num,seed):返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否由随机数替换不足的部分,seed用户指定随机数生成器种子
  • saveAsTextFile(path):将数据集的元素以textfile的形式保存到本地文件系统–HDFS或者任何其他Hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行
  • saveAsSequenceFile(path):将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是本地系统、HDFS或者任何其他的Hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者可以隐式的转换为Writable的RDD(Spark包括了基本类型转换,例如Int、Double、String等)
  • countByKey():对(K,V)类型的RDD有效,返回一个(K,Int)对的map,表示每一个key对应的元素个数
  • foreach(func):在数据集的每一个元素上,运行函数func进行更新。通常用于边缘效果,例如更新一个叠加器,或者和外部存储系统进行交互,如HBase