Yige

Yige

Build

Sparkの進階 - Sparkスケジューリングシステム

Spark スケジューリングシステム#

一、主要な作業フロー#

二つのコア:DAGSchedulerTaskScheduler
-w930

build operator DAG#

ユーザーが提出したジョブは、最初に一連のRDDに変換され、RDD 間の依存関係(Dependency)を通じて DAG を構築し、次に RDD で構成された DAG をスケジューリングシステムに提出します。

split graph into stage of tasks#

DAGSchedulerは、RDD で構成された DAG を受け取り、一連の RDD を異なるStageに分割します。Stage の異なるタイプ(ResultStageShuffleMapStage)に基づいて、未完了のPartitionに異なるタイプのタスク(ResultTaskShuffleMapTask)を作成します。各 Stage は、未完了の Partition の数に応じて、ゼロから複数のタスクを作成します。DAGScheduler は最後に、各 Stage のタスクをタスクセット(TaskSet)の形式でTaskSchedulerに提出して処理を続けます。

launch tasks via cluster manager#

クラスターマネージャー(cluster manager)を使用してリソースとタスクのスケジューリングを割り当て、失敗したタスクには一定の再試行とフォールトトレランスのメカニズムがあります。TaskSchedulerは、DAGSchedulerからTaskSetを受け取り、TaskSetManagerを作成して TaskSet を管理し、この TaskSetManager をスケジューリングプールに追加し、最後にタスクのスケジューリングをバックエンドインターフェース(SchedulerBackend)に処理させます。SchedulerBackend は最初に TaskScheduler を申請し、タスクスケジューリングアルゴリズム(FIFOFAIR)に従って、スケジューリングプール内のすべての TaskSetManager をソートし、次に最大ローカリティの原則に従って TaskSet にリソースを割り当て、最後に各割り当てられたノードで TaskSet 内のタスクを実行します。

execute tasks#

タスクを実行し、タスクの中間結果と最終結果をストレージシステムに保存します。

二、RDD の詳細#

RDD の簡単なまとめ#

参考文献:RDD まとめ

分区計算器Partitioner#

シャッフル依存関係が存在する場合、Partitionerを利用して上下流の RDD 間の分区依存関係を決定します。

abstract class Partitioner extends Serializable {
  # 分区数を取得するためのメソッド
  def numPartitions: Int
  # 入力のkeyを下流のRDDの特定の分区にマッピングするためのメソッド
  def getPartition(key: Any): Int
}

三、Stage#

公式の説明:

 /**
 * A stage is a set of parallel tasks all computing the same function that need to run as part
 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
 * DAGScheduler runs these stages in topological order.
 *
 * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
 * other stage(s), or a result stage, in which case its tasks directly compute a Spark action
 * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
 * track the nodes that each output partition is on.
 */

Stage は、一連の並列計算で同じ依存関係を持つタスクの集合であり、実行フローの中でDAGSchedulerは一連の RDD を異なるStageに分割し、それらの間の依存関係を構築します。依存関係のない Stage は並行して実行され、依存関係のある Stage は順序通りに実行されます。Stage は、シャッフルを処理する必要があるShuffleMapStageと最下流のResultStageに分かれ、ResultStage は最後に実行される Stage であり、例えばcount()などのaction算子のタスクを実行します。

Job 内のすべての Stage の提出プロセスには、逆ドライブと正ドライブが含まれます。#

逆ドライブ#

逆ドライブとは、最下流の ResultStage から始まり、ResultStage がすべての親 Stage の実行を駆動することを指します。この駆動プロセスは、最上流の Stage に達するまで祖先方向に伝達されます。

正ドライブ#

正ドライブとは、前代の Stage が後代の Stage よりも先に Task を TaskScheduler に提出することを指し、「代代相伝」となり、最終的に ResultStage が最後の Task を TaskScheduler に提出します。

四、DAGScheduler#

イントロダクション#

DAGSchedulerは、計算を通じて DAG 内の一連の RDD を異なる Stage に分割し、次にこれらの Stage 間の関係を構築し、最後に各 Stage をPartitionに基づいて複数のタスクに分割し、タスクセット(TaskSet)の形式で下層のTaskSchedulerに提出します。
DAGScheduler が依存するいくつかのコンポーネント:DAGSchedulerEventProcessLoopJobListenerおよびActiveJob

JobListenerとJobWaiter#

JobListenerは、ジョブ内の各タスクの実行成功または失敗を監視するために使用され、JobWaiterは JobListener を継承して実装され、全体のジョブが完了するのを待ち、指定された処理関数を呼び出して返された結果を処理し、最終的にジョブの成功または失敗を確定します。

ソースコード実装に関するいくつかの考察:

  • Scala の非同期プログラミングFuture/Promiseを通じてタスクの状態を監視する実装が行われており、この部分はまだあまり理解していません。後で Scala を学んだ後に再確認します。
  • JobWaiter 内で定義されたcancel()メソッドは、実際には DAGScheduler のcancelJob()メソッドを呼び出してジョブの実行をキャンセルします。
  • JobWaiter 内で JobListener を継承して実装された taskSucceeded () メソッドでは、スレッドセーフを保証するためにsynchronizedを使用してオブジェクトにロックをかけています。RDD 内のstateLocksynchronizedでロックをかけています。

ActiveJob#

ActiveJobは、すでにアクティブなジョブを表し、DAGScheduler が受け取って処理するジョブです。

DAGSchedulerEventProcessLoop#

DAGSchedulerEventProcessLoopは、DAGScheduler内部のイベントループ処理器であり、実装原理は LiveListenerBus に似ています。

DAGScheduler とジョブの提出#

簡単なフローの概要、ここでは count 算子を実行する例を挙げます:

  • まず count 算子を実行すると、SparkContext の runJob () メソッドが呼び出されます。

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    
  • 次に、DAGScheduler の runJob () メソッドが呼び出されます。

    def runJob[T, U: ClassTag](...): Unit = {
        ....
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        ....
      }
    
  • DAGScheduler は submitJob () メソッドを実行してジョブを提出します。

    def runJob[T, U](...): Unit = {
        val start = System.nanoTime
        // submitJob()メソッドを実行
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("ジョブ %d 完了: %s, 時間 %f s".format(...))
          case scala.util.Failure(exception) =>
            logInfo("ジョブ %d 失敗: %s, 時間 %f s".format(...))
            ...
        }
    }
    
  • submitJob () は JobWaiter のインスタンスを生成し、ジョブの実行状況を監視し、DAGSchedulerEventProcessLoop に JobSubmitted イベントを送信します。

    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(....))
    
  • eventProcessLoop オブジェクトが JobSubmitted イベントを投げると、オブジェクト内のeventThreadスレッドインスタンスがイベントを処理し、イベントキューからイベントを取り出し、onReceive関数を呼び出してイベントを処理します。JobSubmitted イベントに一致すると、DAGScheduler のhandleJobSubmitted関数が呼び出され、jobid、rdd などのパラメータが渡されてジョブが処理されます。

  • handleJobSubmitted の実行プロセス

    private[scheduler] def handleJobSubmitted(...) {
        var finalStage: ResultStage = null
        try {
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
            ....
        }
        .....
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    
        .....
    
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    
    

    ソースコードの分析から、実行プロセスを以下のように要約できます。

    1. まず createResultStage メソッドを呼び出して ResultStage を作成します。ここから Stage の構築プロセスが始まります。詳細は次のセクションの Stage の構築プロセスを参照してください。

    2. ActiveJob を作成します。

    3. JobId と作成した ActiveJob の対応関係を jobIdToActiveJob に格納します。

    4. 作成した ActiveJob を activeJobs 集合に追加します。

    5. ResultStage の_activeJob 属性が作成した ActiveJob を保持します。

    6. 現在のジョブのすべての Stage に対応する StageInfo(すなわち配列 stageInfos)を取得します。

    7. listenerBus(LiveListenerBus)に SparkListenerJobStart イベントを投げ、これによりこのイベントに関心を持つすべてのリスナーが対応する操作を実行します。

    8. submitStage メソッドを呼び出して Stage を提出します。すべての親 Stage が計算を完了する必要があります。submitStage には三つの呼び出しロジックがあります:
      submitMissingTasks(stage,jobId.get):すべての親 Stage が完了している場合、Stage に含まれるタスクを提出します。

      submitStage(parent):親 Stage が未完了の場合、再帰的に提出します。

      abortStage:無効な Stage であり、直接停止します。

  • DAGScheduler がタスクの提出を完了した後、どの Partition が計算を必要とするかを判断し、Partition にタスクを生成し、タスクセットに封装して TaskScheduler に提出します。TaskScheduler が最終的にこれらのタスクをクラスタに提出し、これらのタスクの状態を監視します。

Stage の構築#

前述の handleJobSubmitted の実行プロセスで createResultStage メソッドが呼び出され、ResultStage が作成されます。

private def createResultStage(....): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

ソースコードの分析に基づく詳細な処理手順は以下の通りです:

  • getOrCreateParentStagesメソッドを呼び出して、すべての親 Stage のリストを取得します。親 Stage は主に広い依存関係に対応する Stage です。

  • getOrCreateParentStages の処理手順:

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
    }
    
    1. DAGScheduler のgetShuffleDependenciesメソッドを呼び出して、現在の RDD に与えられたすべての ShuffleDependency のシーケンスを取得し、依存する非 Shuffle の RDD を一つずつ訪問して、すべての非 Shuffle の RDD の ShuffleDependency 依存を取得します。
    2. DAGScheduler のgetOrCreateShuffleMapStageメソッドを呼び出して、各 ShuffleDependency に対して対応する ShuffleMapStage を取得または作成し、得られた ShuffleMapStage のリストを返します。
  • getOrCreateShuffleMapStage メソッドの処理手順:

  • ああ

  • Stage の識別子 ID を生成し、ResultStage を作成します。

  • ResultStage を stageIdToStage に登録します。

  • updateJobIdStageIdMaps メソッドを呼び出して、ジョブの識別子と ResultStage およびそのすべての祖先のマッピング関係を更新します。

ResultStage の提出#

まだ計算されていないタスクの提出#

タスク実行結果の処理#

参考リンク#

参考リンク#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。