Yige

Yige

Build

Livy系列-Livy Session詳解

Livy 系列 - Livy Session 詳解#

內容整理自:

  1. 簡書作者 - 牛肉圓粉不加蔥 Apache Livy 文集

概要#

Livy 共有兩種 job,分別是 session 和 batch。session 和 batch 的創建過程也很不相同,batch 的創建以對應的 spark app 啟動為終點;而 session 除了要啟動相應的 spark app,還要能支持共享 sparkContext 來接受一個個 statements 的提交及運行,我將 session 的創建分為兩個大步驟:

  • client 端:運行在 LivyServer 中,接受 request 直到啟動 spark app(注意,這裡雖然叫 client 端,但是運行在 LivyServer 中的)
  • server 端(driver 內部):session 對應的 spark app driver 的啟動

client 端#

整體流程#

image.png

  • 啟動 session 對應的 spark app
  • 與 driver 建立連接
  • Session 的創建與初始化

啟動 session 對應的 spark app#

核心方法為: ContextLauncher#startDriver, 可以分為兩個大步驟:

  • 啟動 spark app
  • 等待 SparkSubmit 退出

啟動 spark app#

session 對應的 spark app 的 mainClass 為 RSCDriverBootstrapper
image.png

調用startDriver()方法, new 一個 SparkLauncher 對象,進行了配置、資源、mainClass 等設置,然後調用 launch () 方法拿到了 SparkSubmit 進程的對應的 Process 對象 process.

等待 SparkSubmit 退出#

SparkLauncher#launch () 返回的進程是 SparkSubmit 進程,再返回 process 後,會 new 一個 ContextLauncher.ChildProcess 對象,在過程中會新啟動一個線程來一直等待 SparkSubmit 進程退出,該線程中的邏輯如下:

public void run() {
  try {
    // 線程阻塞直到這個進程退出
    int exitCode = child.waitFor();
    // 如果是非正常退出,表示 Spark App 啟動失敗,會拋異常
    if (exitCode != 0) {
      LOG.warn("Child process exited with code {}.", exitCode);
      fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
    }
  } catch (InterruptedException ie) {
    LOG.warn("Waiting thread interrupted, killing child process.");
    Thread.interrupted();
    child.destroy();
  } catch (Exception e) {
    LOG.warn("Exception while waiting for child process.", e);
  }
}

與 driver 建立連接#

session 最大的特點就是可以共享 SparkContext,讓用戶提交的多個代碼片段都能跑在一個 SparkContext 上 (聯想記憶到 IO 多路復用,http 長連接),這有兩個好處:

  • 大大加速任務的啟動速度, yarn 上啟動一個 app 比較耗時,而使用 session,除了啟動 session 也需要相當的耗時外,之後提交的代碼片段都將立即執行 (聯想到http長連接)
  • 共享 RDD、table:持久化的 RDD、table 都可以被之後的代碼片段使用,這在不同用戶需要在相同的 RDD、table 上做計算的場景非常有用 (聯想到 IO 多路復用,網絡復用)

由於 driver 可能被 yarn 調度到任何一個節點啟動,所以無法由 LivyServer 主動與 driver 建立連接,而是預先在 client 端建立好 RpcServer 等待 driver 來連接。所以是先在 client 端創建一個 RpcServer, 等待 driver 端連接,連接成功後 client 獲取到了 driver 端的
信息再去連接到 driver 端的 RpcServer。總結起來就是三步:

  • client 傳遞其 RpcServer 信息給 driver
    時序圖中的第 (5) 步:RSCClientFactory#createClient,在該調用中創建了一個 org.apache.livy.rsc.rpc.RpcServer對象賦值給成員 server。

  • driver 連接 client 並傳遞其 RpcServer 信息(在 RSCDriver#initializeServer 中實現)
    image.png

  • client 接收 driver rpcServer 地址信息並連接

Session 的創建與初始化#

在與 driver 建立連接之後,會使用 rscClient、livyConf 等信息來創建 InteractiveSession 對象並進行初始化,流程如下:
image.png

關鍵的步驟:

  • 將 session 信息存儲到 state store 中以便 livy server 掛掉後能進行 recovery 恢復
  • 向 driver 發送一個空的 PingJob 來確定 driver 的狀態是否 ok,若 PingJob 成功執行,則說明 driver 狀態 ok,將 session 置為 running 狀態;若出錯或失敗,則說明 driver 出了一些問題,則將 session 的狀態置為 error

成功完成 session 的創建及初始化後,會將 session 添加到 SessionManager 中進行統一管理,SessionManager 的主要職責包括:

  • 持有所有 sessions
  • 清理過期 session
  • 從 state store 中恢復 sessions

server 端(driver 內部)#

概要#

image.png

如圖所示,driver 內部的啟動流程可以分為以下五個步驟:

  • 創建 ReplDriver 實例
  • 初始化 server
  • 初始化 SparkContext
  • 創建 JobContextImpl 實例並執行 jobs
  • 等待退出

創建 ReplDriver 實例#

ReplDriver 是 InteractiveSession 對應的 Spark App driver,用來接收 livy server 的各種請求並進行處理。也是 RSCDriver 的子類,RSCDriver:

  • 持有等待 RSCClient 進行連接的 RpcServer server
  • 初始化 SparkContext
  • 處理各種請求:CancelJob、EndSession、JobRequest、BypassJobRequest、SyncJobRequest、GetBypassJobStatus
  • 處理 add file 請求

初始化 server#

這一步在 RSCDriver#initializeServer () 中調用,用於連接 client 並告知 server 端 rpc 地址,client 獲知 server rpc 地址後會進行連接並發送請求

image.png

初始化 SparkContext#

分為 3 步:

  • 根據不同的 kind 創建不同類型的代碼解釋器

  • 創建 repl/Session,其主要職責是:

    1. 啟動 interpreter,並獲取 SparkContext
    2. 持有線程池來異步執行 statements(通過 interpreter 來執行)
    3. 持有線程池來異步取消 statements
    4. 管理一個 session 下所有的 statements
  • 調用 interpreter#start方法啟動 Session

如何執行代碼片段#

image.png

executeCodeFunc () 方法分析
即上圖中的第 9 步中的 executeCodeFunc,用來真正運行代碼片段的函數,流程如下
image.png

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。