Yige

Yige

Build

Livyシリーズ - Livyセッションの詳細解説

Livy シリーズ - Livy Session の詳細#

内容整理自:

  1. 簡書著者 - 牛肉丸粉不加葱 Apache Livy 文集

概要#

Livy には 2 種類のジョブがあり、それぞれセッションとバッチです。セッションとバッチの作成プロセスは非常に異なり、バッチの作成は対応する Spark アプリの起動を終点とします。一方、セッションは対応する Spark アプリを起動するだけでなく、共有 SparkContext をサポートして、個々のステートメントの提出と実行を受け入れる必要があります。私はセッションの作成を 2 つの大きなステップに分けます:

  • クライアント側:LivyServer 内で実行され、Spark アプリを起動するまでリクエストを受け付けます(注意、ここではクライアント側と呼ばれていますが、LivyServer 内で実行されています)
  • サーバー側(ドライバー内部):セッションに対応する Spark アプリのドライバーの起動

クライアント側#

全体の流れ#

image.png

  • セッションに対応する Spark アプリを起動
  • ドライバーとの接続を確立
  • セッションの作成と初期化

セッションに対応する Spark アプリの起動#

核心的なメソッドは:ContextLauncher#startDriverで、2 つの大きなステップに分けられます:

  • Spark アプリを起動
  • SparkSubmit の終了を待つ

Spark アプリの起動#

セッションに対応する Spark アプリの mainClass はRSCDriverBootstrapperです。
image.png

startDriver()メソッドを呼び出し、新しい SparkLauncher オブジェクトを作成し、設定、リソース、mainClass などの設定を行い、その後 launch () メソッドを呼び出して SparkSubmit プロセスの対応する Process オブジェクト process を取得します。

SparkSubmit の終了を待つ#

SparkLauncher#launch () が返すプロセスは SparkSubmit プロセスで、process を返した後、新しい ContextLauncher.ChildProcess オブジェクトを作成します。このプロセス内で新しいスレッドが起動され、SparkSubmit プロセスの終了を待ち続けます。このスレッド内のロジックは以下の通りです:

public void run() {
  try {
    // スレッドはこのプロセスが終了するまでブロックされます
    int exitCode = child.waitFor();
    // 異常終了の場合、Sparkアプリの起動に失敗したことを示し、例外をスローします
    if (exitCode != 0) {
      LOG.warn("子プロセスがコード {} で終了しました。", exitCode);
      fail(new IOException(String.format("子プロセスがコード %d で終了しました。", exitCode)));
    }
  } catch (InterruptedException ie) {
    LOG.warn("待機スレッドが中断されました。子プロセスを終了します。");
    Thread.interrupted();
    child.destroy();
  } catch (Exception e) {
    LOG.warn("子プロセスの待機中に例外が発生しました。", e);
  }
}

ドライバーとの接続を確立#

セッションの最大の特徴は、SparkContext を共有できることです。これにより、ユーザーが提出した複数のコードスニペットが 1 つの SparkContext 上で実行できるようになります(IO の多重化、HTTP の長接続を連想)。これには 2 つの利点があります:

  • タスクの起動速度が大幅に向上します。Yarn 上でアプリを起動するのは時間がかかりますが、セッションを使用することで、セッションの起動に相応の時間がかかることを除けば、その後提出されたコードスニペットはすぐに実行されます(HTTP の長接続を連想)。
  • RDD、テーブルの共有:永続化された RDD、テーブルはその後のコードスニペットで使用でき、異なるユーザーが同じ RDD、テーブル上で計算を行う必要があるシナリオでは非常に便利です(IO の多重化、ネットワークの再利用を連想)。

ドライバーは Yarn によって任意のノードにスケジュールされる可能性があるため、LivyServer がドライバーに積極的に接続することはできません。そのため、事前にクライアント側で RpcServer を作成し、ドライバーの接続を待機します。したがって、最初にクライアント側で RpcServer を作成し、ドライバー側の接続を待ち、接続が成功した後、クライアントはドライバー側の情報を取得してドライバー側の RpcServer に接続します。まとめると、3 つのステップになります:

  • クライアントがその RpcServer 情報をドライバーに渡します。
    時系列図の第 (5) ステップ:RSCClientFactory#createClient、この呼び出しでorg.apache.livy.rsc.rpc.RpcServerオブジェクトを作成し、メンバー server に割り当てます。

  • ドライバーがクライアントに接続し、その RpcServer 情報を渡します(RSCDriver#initializeServerで実装)。
    image.png

  • クライアントがドライバーの rpcServer アドレス情報を受け取り、接続します。

セッションの作成と初期化#

ドライバーとの接続が確立された後、rscClient、livyConf などの情報を使用して InteractiveSession オブジェクトを作成し、初期化します。プロセスは以下の通りです:
image.png

重要なステップ:

  • セッション情報を state store に保存し、livy サーバーがクラッシュした後にリカバリーできるようにします。
  • ドライバーに空の PingJob を送信して、ドライバーの状態が正常かどうかを確認します。PingJob が正常に実行されれば、ドライバーの状態は正常であり、セッションを running 状態に設定します。エラーや失敗があれば、ドライバーに問題が発生したことを示し、セッションの状態を error に設定します。

セッションの作成と初期化が成功裏に完了した後、セッションはSessionManagerに追加され、統一管理されます。SessionManager の主な責任は以下の通りです:

  • すべてのセッションを保持
  • 期限切れのセッションをクリーンアップ
  • state store からセッションを復元

サーバー側(ドライバー内部)#

概要#

image.png

図のように、ドライバー内部の起動プロセスは以下の 5 つのステップに分けられます:

  • ReplDriver インスタンスを作成
  • サーバーを初期化
  • SparkContext を初期化
  • JobContextImpl インスタンスを作成し、ジョブを実行
  • 終了を待つ

ReplDriver インスタンスの作成#

ReplDriver は InteractiveSession に対応する Spark アプリのドライバーで、Livy サーバーからのさまざまなリクエストを受け取り、処理します。また、RSCDriver のサブクラスでもあります。RSCDriver は:

  • RSCClient の接続を待つ RpcServer サーバーを保持
  • SparkContext を初期化
  • 様々なリクエストを処理:CancelJob、EndSession、JobRequest、BypassJobRequest、SyncJobRequest、GetBypassJobStatus
  • add file リクエストを処理

サーバーの初期化#

このステップは RSCDriver#initializeServer () で呼び出され、クライアントに接続し、サーバー側の rpc アドレスを通知します。クライアントはサーバーの rpc アドレスを知った後、接続しリクエストを送信します。

image.png

SparkContext の初期化#

3 つのステップに分かれます:

  • 異なる kind に応じて異なるタイプのコードインタプリタを作成

  • repl/Session を作成し、その主な責任は:

    1. インタプリタを起動し、SparkContext を取得
    2. ステートメントを非同期で実行するためのスレッドプールを保持(インタプリタを通じて実行)
    3. ステートメントを非同期でキャンセルするためのスレッドプールを保持
    4. 1 つのセッション内のすべてのステートメントを管理
  • interpreter#startメソッドを呼び出してセッションを起動します。

コードスニペットの実行方法#

image.png

executeCodeFunc () メソッドの分析
上図の第 9 ステップにある executeCodeFunc は、実際にコードスニペットを実行する関数で、プロセスは以下の通りです。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。