深入理解Spark:核心思想与源码分析. 3.7 创建和启动DAGScheduler

3.7 创建和启动DAGScheduler

DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAG-Scheduler的代码如下。

@volatile private[spark] var dagScheduler:
DAGScheduler = _

   
dagScheduler = new DAGScheduler(this)

DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,见代码清单3-32。

代码清单3-32 DAGScheduler维护的数据结构

private[scheduler] val nextJobId = new
AtomicInteger(0)

private[scheduler] def numTotalJobs: Int =
nextJobId.get()

private val nextStageId = new
AtomicInteger(0)

 

private[scheduler] val jobIdToStageIds =
new HashMap[Int, HashSet[Int]]

private[scheduler] val stageIdToStage = new
HashMap[Int, Stage]

private[scheduler] val shuffleToMapStage =
new HashMap[Int, Stage]

private[scheduler] val jobIdToActiveJob =
new HashMap[Int, ActiveJob]

 

   
// Stages we need to run whose parents aren't done

   
private[scheduler] val waitingStages = new HashSet[Stage]

   
// Stages we are running right now

   
private[scheduler] val runningStages = new HashSet[Stage]

   
// Stages that must be resubmitted due to fetch failures

   
private[scheduler] val failedStages = new HashSet[Stage]

 

   
private[scheduler] val activeJobs = new HashSet[ActiveJob]

 

   
// Contains the locations that each RDD's partitions are cached on

   
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

   
private val failedEpoch = new HashMap[String, Long]

 

   
private val dagSchedulerActorSupervisor =

       
env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))

 

   
private val closureSerializer =
SparkEnv.get.closureSerializer.newInstance()

在构造DAGScheduler的时候会调用initializeEventProcessActor方法创建DAGScheduler-EventProcessActor,见代码清单3-33。

代码清单3-33 DAGSchedulerEventProcessActor的初始化

   
private[scheduler] var eventProcessActor: ActorRef = _

private def initializeEventProcessActor() {

       
// blocking the thread until supervisor is started, which ensures
eventProcess-Actor is

       
// not null before any job is submitted

       
implicit val timeout = Timeout(30 seconds)

       
val initEventActorReply =

           
dagSchedulerActorSupervisor ? Props(new
DAGSchedulerEventProcessActor(this))

       
eventProcessActor = Await.result(initEventActorReply, timeout.duration).

           
asInstanceOf[ActorRef]

}

 

initializeEventProcessActor()

这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。从代码清单3-34可以看出,DAGScheduler-ActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAG-SchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册到ActorSystem,ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了dagScheduler,见代码清单3-35。从代码清单3-35我们还看到DAG-SchedulerEventProcessActor所能处理的消息类型,比如JobSubmitted、BeginEvent、CompletionEvent等。DAGScheduler-EventProcessActor接受这些消息后会有不同的处理动作。在本章,读者只需要理解到这里即可,后面章节用到时会详细分析。

代码清单3-34 DAGSchedulerActorSupervisor的监管策略

private[scheduler] class
DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)

   
extends Actor with Logging {

 

   
override val supervisorStrategy =

       
OneForOneStrategy() {

            case x: Exception =>

               
logError("eventProcesserActor failed; shutting down
SparkContext", x)

                try {

                   
dagScheduler.doCancelAllJobs()

                } catch {

                    case t: Throwable => logError("DAGScheduler
failed to cancel all jobs.", t)

                }

                dagScheduler.sc.stop()

                Stop

    }

 

def receive = {

       
case p: Props => sender ! context.actorOf(p)

       
case _ => logWarning("received unknown message in
DAGSchedulerActorSupervisor")

    }

}

代码清单3-35 DAGSchedulerEventProcessActor的实现

private[scheduler] class
DAGSchedulerEventProcessActor(dagScheduler: DAGS-cheduler)

   
extends Actor with Logging {

   
override def preStart() {

       
dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)

    }

   
/**

    *
The main event loop of the DAG scheduler.

   
*/

   
def receive = {

       
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties) =>

           
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions,
allowLocal, callSite,

                listener, properties)

       
case StageCancelled(stageId) =>

           
dagScheduler.handleStageCancellation(stageId)

       
case JobCancelled(jobId) =>

           
dagScheduler.handleJobCancellation(jobId)

       
case JobGroupCancelled(groupId) =>

           
dagScheduler.handleJobGroupCancelled(groupId)

       
case AllJobsCancelled =>

           
dagScheduler.doCancelAllJobs()

       
case ExecutorAdded(execId, host) =>

           
dagScheduler.handleExecutorAdded(execId, host)

       
case ExecutorLost(execId) =>

           
dagScheduler.handleExecutorLost(execId, fetchFailed = false)

       
case BeginEvent(task, taskInfo) =>

           
dagScheduler.handleBeginEvent(task, taskInfo)

       
case GettingResultEvent(taskInfo) =>

           
dagScheduler.handleGetTaskResult(taskInfo)

       
case completion @ CompletionEvent(task, reason, _, _, taskInfo,
taskMetrics) =>

           
dagScheduler.handleTaskCompletion(completion)

       
case TaskSetFailed(taskSet, reason) =>

           
dagScheduler.handleTaskSetFailed(taskSet, reason)

       
case ResubmitFailedStages =>

           
dagScheduler.resubmitFailedStages()

}

override def postStop() {

   
// Cancel any active jobs in postStop hook

   
dagScheduler.cleanUpAfterSchedulerStop()

}

时间: 2017-05-02

深入理解Spark:核心思想与源码分析. 3.7 创建和启动DAGScheduler的相关文章

《深入理解Spark:核心思想与源码分析》——3.9节启动测量系统MetricsSystem

3.9 启动测量系统MetricsSystemMetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D.MetricsSystem中有三个概念:Instance:指定了谁在使用测量系统:Source:指定了从哪里收集测量数据:Sink:指定了往哪里输出测量数据.Spark按照Instance的不同,区分为Master.Worker.Application.Driver和Executor.Spark目前提供的Sink有Consol

《深入理解Spark:核心思想与源码分析》——3.1节SparkContext概述

3.1 SparkContext概述 Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端.了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程. Spark Driver的初始化始终围绕着SparkContext的初始化.SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动.SparkContext初始化完毕,才能向Spark集群提交任务.在平坦的公路上,发动机只需以较低的转速.较低的功率

《深入理解Spark:核心思想与源码分析》——3.7节创建和启动DAGScheduler

3.7 创建和启动DAGSchedulerDAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等.创建DAG-Scheduler的代码如下. @volatile private[spark] var dagScheduler: DAGScheduler = _ dagScheduler = new DAGScheduler(this) DAGScheduler的数据结

《深入理解Spark:核心思想与源码分析》——2.3节Spark基本设计思想

2.3 Spark基本设计思想2.3.1 Spark模块设计 整个Spark主要由以下模块组成: Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application通过SparkContext提交).部署模式.存储体系.任务提交与执行.计算引擎等. Spark SQL:提供SQL处理能力,便于熟悉关系型数据库操作的工程师进行交互查询.此外,还为熟悉Hadoop的用户提供Hive SQL处理能力. Spark Streaming:提供流式计

《深入理解Spark:核心思想与源码分析》——第1章环境准备

第1章 环 境 准 备 凡事豫则立,不豫则废:言前定,则不跲:事前定,则不困. -<礼记·中庸> 本章导读 在深入了解一个系统的原理.实现细节之前,应当先准备好它的源码编译环境.运行环境.如果能在实际环境安装和运行Spark,显然能够提升读者对于Spark的一些感受,对系统能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型.部署模式等.当你通过一些途径知道了系统的原理之后,难道不会问问自己:"这是怎么做到的?"如果只是游走于系统使用.原理了解的层面,

《深入理解Spark:核心思想与源码分析》——第3章SparkContext的初始化

第3章 SparkContext的初始化 道生一, 一生二, 二生三, 三生万物. -<道德经> 本章导读 SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码.读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径.已经熟练使用Spark的开发人员可以选择跳过本章内容. 本章将在介绍SparkContext初始化过程的同时,向读者介绍

《深入理解Spark:核心思想与源码分析》——1.4节Spark源码编译与调试

1.4 Spark源码编译与调试 1.下载Spark源码 首先,访问Spark官网http://spark.apache.org/,如图1-18所示. 2.构建Scala应用 使用cmd命令行进到Spark根目录,执行sbt命令.会下载和解析很多jar包,要等很长时间,笔者大概花了一个多小时才执行完. 3.使用sbt生成Eclipse工程文件 等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费很长时间,笔者本地大致花了40分钟.完成时的状况如图1-21

《深入理解Spark:核心思想与源码分析》——1.5节小结

1.5 小结 本章通过引导大家在Linux操作系统下搭建基本的执行环境,并且介绍spark-shell等脚本的执行,来帮助读者由浅入深地进行Spark源码的学习.由于目前多数开发工作都在Windows系统下进行,并且Eclipse有最广大的用户群,即便是一些开始使用IntelliJ的用户对Eclipse也不陌生,所以在Windows环境下搭建源码阅读环境时,选择这些最常用的工具,能降低读者的学习门槛,并且替大家节省时间.

《深入理解Spark:核心思想与源码分析》——2.2节Spark基础知识

2.2 Spark基础知识 1.版本变迁 经过4年多的发展,Spark目前的版本是1.4.1.我们简单看看它的版本发展过程. 1)Spark诞生于UCBerkeley的AMP实验室(2009). 2)Spark正式对外开源(2010年). 3)Spark 0.6.0版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对Standalone部署模式进行了简化. 4)Spark 0.6.2版本发布(2013-02-07),解决了一些bug,并增强了系统的可用性. 5)Spa

《深入理解Spark:核心思想与源码分析》——3.6节创建任务调度器TaskScheduler

3.6 创建任务调度器TaskScheduler TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度.TaskScheduler也可以看做任务调度的客户端.创建TaskScheduler的代码如下. private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) createTaskSchedu