Livy Series - Detailed Explanation of Livy Session#
Content organized from:
- Author on JianShu - Beef Ball Noodles Without Scallions Apache Livy Collection
Overview#
Livy has two types of jobs, namely session and batch. The creation processes of session and batch are quite different; the creation of batch ends with the startup of the corresponding Spark app, while session not only needs to start the corresponding Spark app but also supports sharing SparkContext to accept and run individual statements. I divide the creation of session into two major steps:
client side
: runs in LivyServer, accepts requests until the Spark app is started (note that although it is called client side, it runs in LivyServer)server side (inside driver)
: the startup of the Spark app driver corresponding to the session
Client Side#
Overall Process#
- Start the Spark app corresponding to the session
- Establish a connection with the driver
- Create and initialize the session
Start the Spark App Corresponding to the Session#
The core method is: ContextLauncher#startDriver
, which can be divided into two major steps:
- Start the Spark app
- Wait for SparkSubmit to exit
Start the Spark App#
The mainClass of the Spark app corresponding to the session is RSCDriverBootstrapper
Call the startDriver()
method, create a new SparkLauncher object, configure resources, mainClass, etc., and then call the launch() method to obtain the corresponding Process object process of the SparkSubmit process.
Wait for SparkSubmit to Exit#
The process returned by SparkLauncher#launch() is the SparkSubmit process. After returning the process, a new ContextLauncher.ChildProcess object will be created, which will start a new thread to wait for the SparkSubmit process to exit. The logic in that thread is as follows:
public void run() {
try {
// The thread blocks until this process exits
int exitCode = child.waitFor();
// If it exits abnormally, it indicates that the Spark App failed to start, an exception will be thrown
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
}
}
Establish Connection with Driver#
The biggest feature of the session is that it can share SparkContext, allowing multiple code snippets submitted by users to run on a single SparkContext (think of IO multiplexing, HTTP long connections). This has two benefits:
- Greatly speeds up the task startup speed; starting an app on YARN is relatively time-consuming, while using a session, apart from the considerable time needed to start the session, subsequent submitted code snippets will execute immediately (think of
HTTP long connections
) - Shared RDD, table: Persisted RDDs and tables can be used by subsequent code snippets, which is very useful in scenarios where different users need to perform calculations on the same RDDs and tables (think of IO multiplexing, network reuse)
Since the driver may be scheduled to start on any node by YARN, LivyServer cannot actively establish a connection with the driver. Instead, it pre-establishes an RpcServer on the client side to wait for the driver to connect. Therefore, the process is to first create an RpcServer on the client side, wait for the driver to connect, and after a successful connection, the client obtains the driver-side information and then connects to the driver-side RpcServer. In summary, there are three steps:
-
The client passes its RpcServer information to the driver
Step (5) in the sequence diagram:RSCClientFactory#createClient
, in this call, aorg.apache.livy.rsc.rpc.RpcServer
object is created and assigned to the member server. -
The driver connects to the client and passes its RpcServer information (implemented in
RSCDriver#initializeServer
)
-
The client receives the driver rpcServer address information and connects
Creation and Initialization of Session#
After establishing a connection with the driver, an InteractiveSession object is created and initialized using rscClient, livyConf, and other information. The process is as follows:
Key Steps:
- Store session information in the state store for recovery in case the Livy server crashes
- Send an empty PingJob to the driver to determine if the driver's status is okay. If the PingJob executes successfully, it indicates that the driver's status is okay, and the session is set to running; if there is an error or failure, it indicates that there is a problem with the driver, and the session's status is set to error
After successfully completing the creation and initialization of the session, the session will be added to the SessionManager
for unified management. The main responsibilities of the SessionManager include:
- Holding all sessions
- Cleaning up expired sessions
- Recovering sessions from the state store
Server Side (Inside Driver)#
Overview#
As shown in the figure, the startup process inside the driver can be divided into the following five steps:
- Create ReplDriver instance
- Initialize server
- Initialize SparkContext
- Create JobContextImpl instance and execute jobs
- Wait for exit
Create ReplDriver Instance#
ReplDriver is the Spark App driver corresponding to the InteractiveSession, used to receive various requests from the Livy server and process them. It is also a subclass of RSCDriver, RSCDriver:
- Holds the RpcServer server waiting for RSCClient to connect
- Initializes SparkContext
- Processes various requests: CancelJob, EndSession, JobRequest, BypassJobRequest, SyncJobRequest, GetBypassJobStatus
- Handles add file requests
Initialize Server#
This step is called in RSCDriver#initializeServer(), used to connect to the client and inform the server-side rpc address. After the client learns the server rpc address, it will connect and send requests.
Initialize SparkContext#
Divided into 3 steps:
-
Create different types of code interpreters based on different kinds
-
Create repl/Session, whose main responsibilities are:
- Start the interpreter and obtain SparkContext
- Hold a thread pool to asynchronously execute statements (executed through the interpreter)
- Hold a thread pool to asynchronously cancel statements
- Manage all statements under a session
-
Call the
interpreter#start
method to start the Session
How to Execute Code Snippets#
Analysis of executeCodeFunc() Method
The executeCodeFunc in step 9 of the above figure is the function that actually runs the code snippets. The process is as follows: