Yige

Yige

Build

Spark進階-Spark調度系統

Spark 調度系統#

一、主要工作流程#

兩個核心:DAGSchedulerTaskScheduler
-w930

build operator DAG#

用戶提交的 Job 將首先被轉換為一系列RDD並通過 RDD 之間的依賴關係 (Dependency) 構建 DAG, 然後將 RDD 構成的 DAG 提交到調度系統

split graph into stage of tasks#

DAGScheduler負責接收由 RDD 構成的 DAG,將一系列 RDD 劃分到不同的Stage。根據 Stage 的不同類型 (ResultStage和 ShuffleMapStage),給 Stage 中未完成的 Partition 創建不同類型的 Task (ResultTask和 ShuffleMapTask)。每個 Stage 將因為未完成 Partition 的多少,創建零到多個 Task。DAGScheduler 最後將每個 Stage 中的 Task 以任務集合 (TaskSet) 的形式提交給TaskSchduler繼續處理

launch tasks via cluster manager#

使用集群管理器 (cluster manager) 分配資源與任務調度,對於失敗的任務還會有一定的重試與容錯機制。TaskScheduler負責從DAGScheduler接收TaskSet,創建TaskSetManager對 TaskSet 進行管理,並將此 TaskSetManager 添加到調度池中,最後將對 Task 的調度交給後端接口 (SchedulerBackend) 處理。SchedulerBackend 首先申請 TaskSheduler,按照 Task 調度算法 (FIFO和FAIR),對調度池中所有 TaskSetManager 進行排序,然後對 TaskSet 按照最大本地性原則分配資源,最後在各個分配的節點上運行 TaskSet 中的 Task

execute tasks#

執行任務,並將任務中間結果和最終結果存入存儲體系

二、RDD 詳解#

RDD 簡單總結#

參考RDD 總結

分區計算器Partitioner#

當存在 shuffle 依賴關係時,利用Partitioner來確定上下游 RDD 之間的分區依賴關係

abstract class Partitioner extends Serializable {
  # 用於獲取分區數量
  def numPartitions: Int
  # 用於將輸入的key映射到下游RDD的某一個分區
  def getPartition(key: Any): Int
}

三、Stage#

官方解釋:

 /**
 * A stage is a set of parallel tasks all computing the same function that need to run as part
 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
 * DAGScheduler runs these stages in topological order.
 *
 * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
 * other stage(s), or a result stage, in which case its tasks directly compute a Spark action
 * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
 * track the nodes that each output partition is on.
 */

Stage 是一系列並行計算且具有相同依賴的 task 的集合,在執行流程中DAGScheduler會將一系列 RDD 劃分到不同的Stage並構建它們之間的依賴關係,使不存在依賴關係的 Stage 並行執行,保證依賴關係的 Stage 順序執行。Stage 分為需要處理 Shuffle 的ShuffleMapStage和最下游的ResultStage,ResultStage 是最後執行的 Stage,比如執行count()action算子的任務

Job 中所有 Stage 的提交過程包括反向驅動與正向提交#

反向驅動#

所謂反向驅動,就是從最下游的 ResultStage 開始,由 ResultStage 驅動所有父 Stage 的執行,這個驅動過程不斷向祖先方向傳遞,直到最上游的 Stage 為止

正向提交#

正向提交,就是前代 Stage 先於後代 Stage 將 Task 提交給 TaskScheduler,“代代相傳”,直到 ResultStage 最後一個將 Task 提交給 TaskScheduler

四、DAGScheduler#

介紹#

DAGScheduler通過計算將 DAG 中一系列 RDD 劃分到不同的 Stage,然後構建這些 Stage 之間的關係,最後將每個 Stage 按照Partition切分為多個 Task,並以 Task 集合 (即TaskSet) 的形式提交給底層的TaskScheduler
DAGScheduler 依賴的一些組件: DAGSchedulerEventProcessLoop, JobListenerActiveJob

JobListener與JobWaiter#

JobListener用於監聽作業中每個 Task 執行成功或失敗,JobWaiter繼承實現了 JobListener,用於等待整個 Job 執行完畢,然後調用給定的處理函數對返回結果進行處理並最終確定作業的成功或失敗

源碼實現的一些思考:

  • 通過 Scala 的異步編程 Future/Promise 實現了任務狀態的檢測監聽,這塊還不太理解,等後面系統學習一下 Scala 再回顧一下
  • JobWaiter 中的定義了一個cancel()方法取消 Job 執行實際上調用了 DAGScheduler 的cancelJob()方法
  • JobWaiter 中繼承 JobListener 實現的 taskSucceeded () 方法中為了保證線程安全用了synchronized給對象加鎖,聯想到 RDD 中的stateLock,也用了 synchronized 加鎖

ActiveJob#

ActiveJob表示已經激活的 Job,即 DAGScheduler 接收處理的 Job

DAGSchedulerEventProcessLoop#

DAGSchedulerEventProcessLoopDAGScheduler內部的事件循環處理器,實現原理類似於 LiveListenerBus

DAGScheduler 與 Job 的提交#

簡要流程概括,這裡以執行 count 算子為例:

  • 首先執行 count 算子會創建調用 SparkContext 的 runJob () 方法

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    
  • 然後接著傳遞調用 DAGScheduler 的 runJob () 方法

    def runJob[T, U: ClassTag](...): Unit = {
        ....
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        ....
      }
    
  • DAGScheduler 通過執行 submitJob () 方法提交 Job

    def runJob[T, U](...): Unit = {
        val start = System.nanoTime
        // 執行submitJob()方法
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format(...))
          case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format(...))
            ...
        }
    }
    
  • submitJob () 生成一個 JobWaiter 的實例監聽 Job 的執行情況,向 DAGSchedulerEventProcessLoop 發送 JobSubmitted 事件

    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(....))
    
  • 當 eventProcessLoop 對象投遞了 JobSubmitted 事件之後,對象內的eventThread線程實例對事件進行處理,不斷從事件隊列中取出事件,調用onReceive函數處理事件,當匹配到 JobSubmitted 事件後,調用 DAGScheduler 的handleJobSubmitted函數並傳入 jobid、rdd 等參數來處理 Job

  • handleJobSubmitted 執行過程

    private[scheduler] def handleJobSubmitted(...) {
        var finalStage: ResultStage = null
        try {
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
            ....
        }
        .....
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    
        .....
    
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    
    

由源碼分析可以概括執行過程如下

  1. 首先調用 createResultStage 方法創建 ResultStage,這裡就開始了 Stage 的構建過程,詳見下節 Stage 的構建過程

  2. 創建 ActiveJob

  3. 將 JobId 與剛創建的 ActiveJob 的對應關係放入 jobIdToActiveJob 中

  4. 將剛創建的 ActiveJob 放入 activeJobs 集合中

  5. 使 ResultStage 的_activeJob 屬性持有剛創建的 ActiveJob

  6. 獲取當前 Job 的所有 Stage 對應的 StageInfo (即數組 stageInfos)

  7. 向 listenerBus (LiveListenerBus) 投遞 SparkListenerJobStart 事件,進而引發所有關注此事件的監聽器執行相應的操作

  8. 調用 submitStage 方法提交 Stage, 所有 parent Stage 都計算完成,才能提交
    submitStage 三種調用邏輯:
    submitMissingTasks(stage,jobId.get):如果所有 parent stage 已經完成,則提交 stage 所包含的 task

    `submitStage(parent)`:有parent stage未完成,則遞歸提交
    
    `abortStage`:無效stage,直接停止
    
  • DAGScheduler 完成任務提交後,在判斷哪些 Partition 需要計算,就會為 Partition 生成 Task,然後封裝成 TaskSet,提交至 TaskScheduler。等待 TaskScheduler 最終向集群提交這些 Task,監聽這些 Task 的狀態

構建 Stage#

前面我們看到 handleJobSubmitted 執行過程中調用 createResultStage 方法創建 ResultStage

private def createResultStage(....): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

根據源碼分析,詳細的處理步驟如下:

  • 調用getOrCreateParentStages方法獲取所有父 Stage 的列表,父 Stage 主要是寬依賴對應的 Stage

  • getOrCreateParentStages 處理步驟:

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
    }
    
    1. 調用 DAGScheduler 的getShuffleDependencies方法獲取當前給到的 RDD 的所有 ShuffleDependency 的序列,逐個訪問其依賴的非 Shuffle 的 RDD,獲取所有非 Shuffle 的 RDD 的 ShuffleDependency 依賴
    2. 調用 DAGScheduler 的getOrCreateShuffleMapStage方法為每一個 ShuffleDependency 獲取或者創建對應的 ShuffleMapStage,並返回得到的 ShuffleMapStage 列表
  • getOrCreateShuffleMapStage 方法處理步驟:

  • 生成 Stage 的身份標識 id,並創建 ResultStage

  • 將 ResultStage 註冊到 stageIdToStage 中

  • 調用 updateJobIdStageIdMaps 方法更新 Job 的身份標識與 ResultStage 及其所有祖先的映射關係

提交 ResultStage#

提交還未計算的 Task#

Task 執行結果的處理#

參考鏈接#

參考鏈接#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。