深入理解Spark:核心思想与源码分析. 3.6 创建任务调度器TaskScheduler

3.6 创建任务调度器TaskScheduler

TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler也可以看做任务调度的客户端。创建TaskScheduler的代码如下。

private[spark] var (schedulerBackend,
taskScheduler) =

   
SparkContext.createTaskScheduler(this, master)

createTaskScheduler方法会根据master的配置匹配部署模式,创建TaskSchedulerImpl,并生成不同的SchedulerBackend。本章为了使读者更容易理解Spark的初始化流程,故以local模式为例,其余模式将在第7章详解。master匹配local模式的代码如下。

master match {

   
case "local" =>

       
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES,
isLocal = true)

       
val backend = new LocalBackend(scheduler, 1)

       
scheduler.initialize(backend)

       
(backend, scheduler)

3.6.1 创建TaskSchedulerImpl

TaskSchedulerImpl的构造过程如下:

1)从SparkConf中读取配置信息,包括每个任务分配的CPU数、调度模式(调度模式有FAIR和FIFO两种,默认为FIFO,可以修改属性spark.scheduler.mode来改变)等。

2)创建TaskResultGetter,它的作用是通过线程池(Executors.newFixedThreadPool创建的,默认4个线程,线程名字以task-result-getter开头,线程工厂默认是Executors.default-ThreadFactory)对Worker上的Executor发送的Task的执行结果进行处理。

TaskSchedulerImpl的实现见代码清单3-29。

代码清单3-29 TaskSchedulerImpl的实现

var dagScheduler: DAGScheduler = null

var backend: SchedulerBackend = null

val mapOutputTracker =
SparkEnv.get.mapOutputTracker

var schedulableBuilder: SchedulableBuilder
= null

var rootPool: Pool = null

// default scheduler is FIFO

private val schedulingModeConf =
conf.get("spark.scheduler.mode", "FIFO")

val schedulingMode: SchedulingMode = try {

   
SchedulingMode.withName(schedulingModeConf.toUpperCase)

} catch {

   
case e: java.util.NoSuchElementException =>

       
throw new SparkException(s"Unrecognized spark.scheduler.mode:
$scheduling-ModeConf")

}

 

// This is a var so that we can reset it
for testing purposes.

private[spark] var taskResultGetter = new
TaskResultGetter(sc.env, this)

TaskSchedulerImpl的调度模式有FAIR和FIFO两种。任务的最终调度实际都是落实到接口SchedulerBackend的具体实现上的。为方便分析,我们先来看看local模式中SchedulerBackend的实现LocalBackend。LocalBackend依赖于LocalActor与ActorSystem进行消息通信。LocalBackend的实现参见代码清单3-30。

代码清单3-30 LocalBackend的实现

private[spark] class
LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)

   
extends SchedulerBackend with ExecutorBackend {

 

   
private val appId = "local-" + System.currentTimeMillis

   
var localActor: ActorRef = null

 

   
override def start() {

       
localActor = SparkEnv.get.actorSystem.actorOf(

           
Props(new LocalActor(scheduler, this, totalCores)),

           
"LocalBackendActor")

    }

 

   
override def stop() {

       
localActor ! StopExecutor

    }

 

   
override def reviveOffers() {

       
localActor ! ReviveOffers

    }

 

   
override def defaultParallelism() =

       
scheduler.conf.getInt("spark.default.parallelism", totalCores)

 

   
override def killTask(taskId: Long, executorId: String, interruptThread:
Boolean) {

       
localActor ! KillTask(taskId, interruptThread)

    }

 

   
override def statusUpdate(taskId: Long, state: TaskState,
serializedData: ByteBuffer) {

       
localActor ! StatusUpdate(taskId, state, serializedData)

    }

 

    override
def applicationId(): String = appId

}

3.6.2 TaskSchedulerImpl的初始化

创建完TaskSchedulerImpl和LocalBackend后,对TaskSchedulerImpl调用方法initialize进行初始化。以默认的FIFO调度为例,TaskSchedulerImpl的初始化过程如下:

1)使TaskSchedulerImpl持有LocalBackend的引用。

2)创建Pool,Pool中缓存了调度队列、调度算法及TaskSetManager集合等信息。

3)创建FIFOSchedulableBuilder,FIFOSchedulableBuilder用来操作Pool中的调度队列。

initialize方法的实现见代码清单3-31。

代码清单3-31 TaskSchedulerImpl的初始化

def initialize(backend: SchedulerBackend) {

   
this.backend = backend

   
rootPool = new Pool("", schedulingMode, 0, 0)

   
schedulableBuilder = {

       
schedulingMode match {

           
case SchedulingMode.FIFO =>

                new
FIFOSchedulableBuilder(rootPool)

           
case SchedulingMode.FAIR =>

                new FairSchedulableBuilder(rootPool, conf)

       
}

    }

   
schedulableBuilder.buildPools()

}

时间: 2017-05-02
Tags: 线程, 源码, spark

深入理解Spark:核心思想与源码分析. 3.6 创建任务调度器TaskScheduler的相关文章

《深入理解Spark:核心思想与源码分析》——3.6节创建任务调度器TaskScheduler

3.6 创建任务调度器TaskScheduler TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度.TaskScheduler也可以看做任务调度的客户端.创建TaskScheduler的代码如下. private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) createTaskSchedu

《深入理解Spark:核心思想与源码分析》——3.8节TaskScheduler的启动

3.8 TaskScheduler的启动3.6节介绍了任务调度器TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码如下.taskScheduler.start()TaskScheduler在启动的时候,实际调用了backend的start方法. override def start() { backend.start() } 以LocalBackend为例,启动LocalBackend时向actorSystem注册了LocalActor,见代码清单3-3

《深入理解Spark:核心思想与源码分析》——2.3节Spark基本设计思想

2.3 Spark基本设计思想2.3.1 Spark模块设计 整个Spark主要由以下模块组成: Spark Core:Spark的核心功能实现,包括:SparkContext的初始化(Driver Application通过SparkContext提交).部署模式.存储体系.任务提交与执行.计算引擎等. Spark SQL:提供SQL处理能力,便于熟悉关系型数据库操作的工程师进行交互查询.此外,还为熟悉Hadoop的用户提供Hive SQL处理能力. Spark Streaming:提供流式计

《深入理解Spark:核心思想与源码分析》——第1章环境准备

第1章 环 境 准 备 凡事豫则立,不豫则废:言前定,则不跲:事前定,则不困. -<礼记·中庸> 本章导读 在深入了解一个系统的原理.实现细节之前,应当先准备好它的源码编译环境.运行环境.如果能在实际环境安装和运行Spark,显然能够提升读者对于Spark的一些感受,对系统能有个大体的印象,有经验的技术人员甚至能够猜出一些Spark采用的编程模型.部署模式等.当你通过一些途径知道了系统的原理之后,难道不会问问自己:"这是怎么做到的?"如果只是游走于系统使用.原理了解的层面,

《深入理解Spark:核心思想与源码分析》——第3章SparkContext的初始化

第3章 SparkContext的初始化 道生一, 一生二, 二生三, 三生万物. -<道德经> 本章导读 SparkContext的初始化是Driver应用程序提交执行的前提,本章内容以local模式为主,并按照代码执行顺序讲解,这将有助于首次接触Spark的读者理解源码.读者朋友如果能边跟踪代码,边学习本章内容,也许是快速理解SparkContext初始化过程的便捷途径.已经熟练使用Spark的开发人员可以选择跳过本章内容. 本章将在介绍SparkContext初始化过程的同时,向读者介绍

《深入理解Spark:核心思想与源码分析》——1.4节Spark源码编译与调试

1.4 Spark源码编译与调试 1.下载Spark源码 首先,访问Spark官网http://spark.apache.org/,如图1-18所示. 2.构建Scala应用 使用cmd命令行进到Spark根目录,执行sbt命令.会下载和解析很多jar包,要等很长时间,笔者大概花了一个多小时才执行完. 3.使用sbt生成Eclipse工程文件 等sbt提示符(>)出现后,输入Eclipse命令,开始生成Eclipse工程文件,也需要花费很长时间,笔者本地大致花了40分钟.完成时的状况如图1-21

《深入理解Spark:核心思想与源码分析》——3.1节SparkContext概述

3.1 SparkContext概述 Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端.了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程. Spark Driver的初始化始终围绕着SparkContext的初始化.SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动.SparkContext初始化完毕,才能向Spark集群提交任务.在平坦的公路上,发动机只需以较低的转速.较低的功率

《深入理解Spark:核心思想与源码分析》——1.5节小结

1.5 小结 本章通过引导大家在Linux操作系统下搭建基本的执行环境,并且介绍spark-shell等脚本的执行,来帮助读者由浅入深地进行Spark源码的学习.由于目前多数开发工作都在Windows系统下进行,并且Eclipse有最广大的用户群,即便是一些开始使用IntelliJ的用户对Eclipse也不陌生,所以在Windows环境下搭建源码阅读环境时,选择这些最常用的工具,能降低读者的学习门槛,并且替大家节省时间.

《深入理解Spark:核心思想与源码分析》——2.2节Spark基础知识

2.2 Spark基础知识 1.版本变迁 经过4年多的发展,Spark目前的版本是1.4.1.我们简单看看它的版本发展过程. 1)Spark诞生于UCBerkeley的AMP实验室(2009). 2)Spark正式对外开源(2010年). 3)Spark 0.6.0版本发布(2012-10-15),进行了大范围的性能改进,增加了一些新特性,并对Standalone部署模式进行了简化. 4)Spark 0.6.2版本发布(2013-02-07),解决了一些bug,并增强了系统的可用性. 5)Spa

《深入理解Spark:核心思想与源码分析》——3.2节创建执行环境SparkEnv

3.2 创建执行环境SparkEnv SparkEnv是Spark的执行环境对象,其中包括众多与Executor执行相关的对象.由于在local模式下Driver会创建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend进程中也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中.创建SparkEnv 主要使用Sp