Livy シリーズ - Livy Session の詳細#
内容整理自:
- 簡書著者 - 牛肉丸粉不加葱 Apache Livy 文集
概要#
Livy には 2 種類のジョブがあり、それぞれセッションとバッチです。セッションとバッチの作成プロセスは非常に異なり、バッチの作成は対応する Spark アプリの起動を終点とします。一方、セッションは対応する Spark アプリを起動するだけでなく、共有 SparkContext をサポートして、個々のステートメントの提出と実行を受け入れる必要があります。私はセッションの作成を 2 つの大きなステップに分けます:
クライアント側
:LivyServer 内で実行され、Spark アプリを起動するまでリクエストを受け付けます(注意、ここではクライアント側と呼ばれていますが、LivyServer 内で実行されています)サーバー側(ドライバー内部)
:セッションに対応する Spark アプリのドライバーの起動
クライアント側#
全体の流れ#
- セッションに対応する Spark アプリを起動
- ドライバーとの接続を確立
- セッションの作成と初期化
セッションに対応する Spark アプリの起動#
核心的なメソッドは:ContextLauncher#startDriver
で、2 つの大きなステップに分けられます:
- Spark アプリを起動
- SparkSubmit の終了を待つ
Spark アプリの起動#
セッションに対応する Spark アプリの mainClass はRSCDriverBootstrapper
です。
startDriver()
メソッドを呼び出し、新しい SparkLauncher オブジェクトを作成し、設定、リソース、mainClass などの設定を行い、その後 launch () メソッドを呼び出して SparkSubmit プロセスの対応する Process オブジェクト process を取得します。
SparkSubmit の終了を待つ#
SparkLauncher#launch () が返すプロセスは SparkSubmit プロセスで、process を返した後、新しい ContextLauncher.ChildProcess オブジェクトを作成します。このプロセス内で新しいスレッドが起動され、SparkSubmit プロセスの終了を待ち続けます。このスレッド内のロジックは以下の通りです:
public void run() {
try {
// スレッドはこのプロセスが終了するまでブロックされます
int exitCode = child.waitFor();
// 異常終了の場合、Sparkアプリの起動に失敗したことを示し、例外をスローします
if (exitCode != 0) {
LOG.warn("子プロセスがコード {} で終了しました。", exitCode);
fail(new IOException(String.format("子プロセスがコード %d で終了しました。", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("待機スレッドが中断されました。子プロセスを終了します。");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("子プロセスの待機中に例外が発生しました。", e);
}
}
ドライバーとの接続を確立#
セッションの最大の特徴は、SparkContext を共有できることです。これにより、ユーザーが提出した複数のコードスニペットが 1 つの SparkContext 上で実行できるようになります(IO の多重化、HTTP の長接続を連想)。これには 2 つの利点があります:
- タスクの起動速度が大幅に向上します。Yarn 上でアプリを起動するのは時間がかかりますが、セッションを使用することで、セッションの起動に相応の時間がかかることを除けば、その後提出されたコードスニペットはすぐに実行されます(HTTP の長接続を連想)。
- RDD、テーブルの共有:永続化された RDD、テーブルはその後のコードスニペットで使用でき、異なるユーザーが同じ RDD、テーブル上で計算を行う必要があるシナリオでは非常に便利です(IO の多重化、ネットワークの再利用を連想)。
ドライバーは Yarn によって任意のノードにスケジュールされる可能性があるため、LivyServer がドライバーに積極的に接続することはできません。そのため、事前にクライアント側で RpcServer を作成し、ドライバーの接続を待機します。したがって、最初にクライアント側で RpcServer を作成し、ドライバー側の接続を待ち、接続が成功した後、クライアントはドライバー側の情報を取得してドライバー側の RpcServer に接続します。まとめると、3 つのステップになります:
-
クライアントがその RpcServer 情報をドライバーに渡します。
時系列図の第 (5) ステップ:RSCClientFactory#createClient
、この呼び出しでorg.apache.livy.rsc.rpc.RpcServer
オブジェクトを作成し、メンバー server に割り当てます。 -
ドライバーがクライアントに接続し、その RpcServer 情報を渡します(
RSCDriver#initializeServer
で実装)。
-
クライアントがドライバーの rpcServer アドレス情報を受け取り、接続します。
セッションの作成と初期化#
ドライバーとの接続が確立された後、rscClient、livyConf などの情報を使用して InteractiveSession オブジェクトを作成し、初期化します。プロセスは以下の通りです:
重要なステップ:
- セッション情報を state store に保存し、livy サーバーがクラッシュした後にリカバリーできるようにします。
- ドライバーに空の PingJob を送信して、ドライバーの状態が正常かどうかを確認します。PingJob が正常に実行されれば、ドライバーの状態は正常であり、セッションを running 状態に設定します。エラーや失敗があれば、ドライバーに問題が発生したことを示し、セッションの状態を error に設定します。
セッションの作成と初期化が成功裏に完了した後、セッションはSessionManager
に追加され、統一管理されます。SessionManager の主な責任は以下の通りです:
- すべてのセッションを保持
- 期限切れのセッションをクリーンアップ
- state store からセッションを復元
サーバー側(ドライバー内部)#
概要#
図のように、ドライバー内部の起動プロセスは以下の 5 つのステップに分けられます:
- ReplDriver インスタンスを作成
- サーバーを初期化
- SparkContext を初期化
- JobContextImpl インスタンスを作成し、ジョブを実行
- 終了を待つ
ReplDriver インスタンスの作成#
ReplDriver は InteractiveSession に対応する Spark アプリのドライバーで、Livy サーバーからのさまざまなリクエストを受け取り、処理します。また、RSCDriver のサブクラスでもあります。RSCDriver は:
- RSCClient の接続を待つ RpcServer サーバーを保持
- SparkContext を初期化
- 様々なリクエストを処理:CancelJob、EndSession、JobRequest、BypassJobRequest、SyncJobRequest、GetBypassJobStatus
- add file リクエストを処理
サーバーの初期化#
このステップは RSCDriver#initializeServer () で呼び出され、クライアントに接続し、サーバー側の rpc アドレスを通知します。クライアントはサーバーの rpc アドレスを知った後、接続しリクエストを送信します。
SparkContext の初期化#
3 つのステップに分かれます:
-
異なる kind に応じて異なるタイプのコードインタプリタを作成
-
repl/Session を作成し、その主な責任は:
- インタプリタを起動し、SparkContext を取得
- ステートメントを非同期で実行するためのスレッドプールを保持(インタプリタを通じて実行)
- ステートメントを非同期でキャンセルするためのスレッドプールを保持
- 1 つのセッション内のすべてのステートメントを管理
-
interpreter#start
メソッドを呼び出してセッションを起動します。
コードスニペットの実行方法#
executeCodeFunc () メソッドの分析
上図の第 9 ステップにある executeCodeFunc は、実際にコードスニペットを実行する関数で、プロセスは以下の通りです。