Yige

Yige

Build

Spark Advanced - Spark Scheduling System

Spark Scheduling System#

I. Main Workflow#

Two cores: DAGScheduler and TaskScheduler
-w930

build operator DAG#

The Job submitted by the user will first be converted into a series of RDDs and a DAG will be constructed through the dependencies (Dependency) between RDDs, and then the DAG composed of RDDs will be submitted to the scheduling system.

split graph into stage of tasks#

DAGScheduler is responsible for receiving the DAG composed of RDDs, dividing a series of RDDs into different Stages. Depending on the type of Stage (ResultStage and ShuffleMapStage), it creates different types of Tasks (ResultTask and ShuffleMapTask) for the unfinished Partitions in the Stage. Each Stage will create zero to multiple Tasks based on the number of unfinished Partitions. Finally, DAGScheduler submits the Tasks in each Stage to TaskScheduler for further processing in the form of a task set (TaskSet).

launch tasks via cluster manager#

Using the cluster manager to allocate resources and schedule tasks, there will also be a certain retry and fault tolerance mechanism for failed tasks. TaskScheduler is responsible for receiving TaskSet from DAGScheduler, creating TaskSetManager to manage the TaskSet, and adding this TaskSetManager to the scheduling pool, finally delegating the scheduling of Tasks to the backend interface (SchedulerBackend). SchedulerBackend first applies for TaskScheduler, sorts all TaskSetManagers in the scheduling pool according to the Task scheduling algorithm (FIFO and FAIR), then allocates resources to the TaskSet based on the maximum locality principle, and finally runs the Tasks in the TaskSet on the allocated nodes.

execute tasks#

Execute tasks and store the intermediate results and final results in the storage system.

II. Detailed Explanation of RDD#

Simple Summary of RDD#

Refer to RDD Summary

Partitioner#

When there is a shuffle dependency, use Partitioner to determine the partition dependency between upstream and downstream RDDs.

abstract class Partitioner extends Serializable {
  # Used to get the number of partitions
  def numPartitions: Int
  # Used to map the input key to a certain partition of the downstream RDD
  def getPartition(key: Any): Int
}

III. Stage#

Official explanation:

 /**
 * A stage is a set of parallel tasks all computing the same function that need to run as part
 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
 * DAGScheduler runs these stages in topological order.
 *
 * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
 * other stage(s), or a result stage, in which case its tasks directly compute a Spark action
 * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
 * track the nodes that each output partition is on.
 */

A Stage is a collection of parallel tasks that have the same dependencies. In the execution flow, DAGScheduler will divide a series of RDDs into different Stages and build the dependencies between them, allowing Stages without dependencies to execute in parallel while ensuring that Stages with dependencies execute in order. Stages are divided into ShuffleMapStage that need to handle Shuffle and the downstream ResultStage. The ResultStage is the last Stage executed, such as executing tasks like count() and other action operators.

The submission process of all Stages in a Job includes reverse driving and forward submission.#

Reverse Driving#

Reverse driving refers to starting from the downstream ResultStage, driving the execution of all parent Stages from the ResultStage. This driving process continuously passes towards the ancestors until it reaches the upstream Stage.

Forward Submission#

Forward submission means that the predecessor Stage submits Tasks to TaskScheduler before the successor Stage, "passing down through generations," until the last ResultStage submits its Task to TaskScheduler.

IV. DAGScheduler#

Introduction#

DAGScheduler divides a series of RDDs into different Stages through computation, then builds the relationships between these Stages, and finally splits each Stage into multiple Tasks based on Partitions, submitting them to the underlying TaskScheduler in the form of a Task set (i.e., TaskSet). Some components that DAGScheduler depends on include: DAGSchedulerEventProcessLoop, JobListener, and ActiveJob.

JobListener and JobWaiter#

JobListener is used to listen for the success or failure of each Task in the job, while JobWaiter, which inherits from JobListener, is used to wait for the entire Job to complete, then calls a given processing function to handle the return results and ultimately determine the success or failure of the job.

Some thoughts on the source code implementation:

  • The detection and listening of task status are implemented through Scala's asynchronous programming Future/Promise. This part is not well understood yet; I will review it after studying Scala later.
  • The cancel() method defined in JobWaiter to cancel Job execution actually calls the cancelJob() method of DAGScheduler.
  • The taskSucceeded() method implemented in JobWaiter, which inherits JobListener, uses synchronized to lock the object for thread safety, reminiscent of stateLock in RDD, which also uses synchronized locking.

ActiveJob#

ActiveJob represents an activated Job, i.e., a Job that DAGScheduler is processing.

DAGSchedulerEventProcessLoop#

DAGSchedulerEventProcessLoop is the internal event loop processor of DAGScheduler, and its implementation principle is similar to LiveListenerBus.

Submission of DAGScheduler and Job#

A brief overview of the process, taking the execution of the count operator as an example:

  • First, executing the count operator will create a call to the runJob() method of SparkContext.

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    
  • Then it continues to call the runJob() method of DAGScheduler.

    def runJob[T, U: ClassTag](...): Unit = {
        ....
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        ....
      }
    
  • DAGScheduler submits the Job by executing the submitJob() method.

    def runJob[T, U](...): Unit = {
        val start = System.nanoTime
        // Execute submitJob() method
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format(...))
          case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format(...))
            ...
        }
    }
    
  • submitJob() generates an instance of JobWaiter to listen to the execution status of the Job and sends a JobSubmitted event to DAGSchedulerEventProcessLoop.

    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(....))
    
  • After the eventProcessLoop object posts the JobSubmitted event, the eventThread thread instance within the object processes the event, continuously fetching events from the event queue and calling the onReceive function to handle the event. When it matches the JobSubmitted event, it calls the handleJobSubmitted function of DAGScheduler, passing in jobId, rdd, and other parameters to process the Job.

  • The execution process of handleJobSubmitted is as follows:

    private[scheduler] def handleJobSubmitted(...) {
        var finalStage: ResultStage = null
        try {
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
            ....
        }
        .....
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    
        .....
    
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    
    

    From the source code analysis, the execution process can be summarized as follows:

    1. First, call the createResultStage method to create the ResultStage, this is where the Stage construction process begins, see the next section for the Stage construction process.

    2. Create ActiveJob.

    3. Place the mapping relationship between JobId and the newly created ActiveJob into jobIdToActiveJob.

    4. Add the newly created ActiveJob to the activeJobs collection.

    5. Make the ResultStage's _activeJob attribute hold the newly created ActiveJob.

    6. Get the StageInfo for all Stages corresponding to the current Job (i.e., the array stageInfos).

    7. Post the SparkListenerJobStart event to listenerBus (LiveListenerBus), which triggers all listeners interested in this event to perform corresponding actions.

    8. Call the submitStage method to submit the Stage, ensuring that all parent Stages are completed before submission.
      The submitStage method has three calling logics:
      submitMissingTasks(stage, jobId.get):If all parent stages have completed, submit the tasks contained in the stage.

      submitStage(parent):If there are parent stages that are not completed, submit recursively.

      abortStage:Invalid stage, stop directly.

  • After DAGScheduler completes the task submission, it determines which Partitions need to be computed, generates Tasks for the Partitions, and then packages them into a TaskSet to submit to TaskScheduler. It waits for TaskScheduler to ultimately submit these Tasks to the cluster and listens to the status of these Tasks.

Building Stages#

Earlier, we saw that the handleJobSubmitted execution process calls the createResultStage method to create the ResultStage.

private def createResultStage(....): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

Based on the source code analysis, the detailed processing steps are as follows:

  • Call the getOrCreateParentStages method to get the list of all parent Stages, where parent Stages are mainly the Stages corresponding to wide dependencies.

  • The processing steps of getOrCreateParentStages are:

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
    }
    
    1. Call the DAGScheduler's getShuffleDependencies method to obtain the sequence of all ShuffleDependencies for the given RDD, and access the dependencies of non-shuffle RDDs one by one to obtain all non-shuffle RDD's ShuffleDependency dependencies.
    2. Call the DAGScheduler's getOrCreateShuffleMapStage method to get or create the corresponding ShuffleMapStage for each ShuffleDependency and return the list of obtained ShuffleMapStages.
  • The processing steps of getOrCreateShuffleMapStage:

  • Ah

  • Generate the identity id of the Stage and create the ResultStage.

  • Register the ResultStage in stageIdToStage.

  • Call the updateJobIdStageIdMaps method to update the mapping relationship between the Job's identity and the ResultStage and all its ancestors.

Submitting ResultStage#

Submitting Tasks that have not been computed#

Handling Task Execution Results#

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.