Yige

Yige

Build

Spark Advanced - Summary of Spark Tuning

Summary of Spark Tuning#

Content organized from:
Thirty Thousand Words Long Article | Spark Performance Optimization Practical Manual

Development Tuning#

  • Avoid creating duplicate RDDs

  • Reuse the same RDD as much as possible

  • Persist RDDs that are used multiple times

  • Try to avoid using shuffle-type operators

  • Use map-side pre-aggregation shuffle operations, such as using reduceByKey or aggregateByKey operators instead of groupByKey operator

  • Use high-performance operators, such as mapPartitions instead of ordinary map, use foreachPartitions instead of foreach, perform coalesce after filter, use repartitionAndSortWithinPartitions instead of repartition and sort operations

  • Broadcast large variables

  • Use Kryo to optimize serialization performance

  • Optimize data structures: try to use strings instead of objects, use primitive types (int, Long) instead of strings, use arrays instead of collections

Resource Tuning#

Spark running process:
image.png

Resource Parameter Tuning#

  • num-executors: Used to set how many Executor processes to use for executing Spark jobs in total
  • executor-memory: This parameter is used to set the memory for each Executor process. The size of Executor memory often directly determines the performance of Spark jobs and is directly related to common JVM OOM exceptions
  • executor-cores: This parameter is used to set the number of CPU cores for each Executor process. This parameter determines the ability of each Executor process to execute task threads in parallel
  • driver-memory: Used to set the memory for the Driver process. The memory for the Driver is usually not set, or setting around 1G should be sufficient. It is important to note that if using the collect operator to pull all data to the driver side, the Driver must have enough memory
  • spark.default.parallelism: Used to set the default number of tasks for each stage. The recommended setting principle from the official website is 2~3 times num-executors * executor-cores, which is more appropriate. For example, if the total number of CPU cores for Executors is 300, then setting 1000 tasks is acceptable
  • spark.storage.memoryFraction: Used to set the proportion of RDD persistent data that can occupy Executor memory. The default is 0.6. Depending on the different persistence strategies you choose, if memory is insufficient, data may not be persisted, or data may be written to disk
  • spark.shuffle.memoryFraction: Used to set the proportion of Executor memory that can be used for aggregation operations after a task pulls the output of the previous stage's task during the shuffle process. The default is 0.2

Resource Parameter Reference Example#

./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \

Data Skew Tuning#

  • Most tasks execute quickly, but a few tasks execute very slowly, and the progress of Spark depends on the longest-running task.
  • During the shuffle phase, if the amount of data corresponding to a certain key is particularly large, data skew will occur.

Tuning Ideas#

  • Identify which stage the data skew occurs in: As long as you see a shuffle-type operator in the Spark code or a SQL statement in Spark SQL that causes shuffle (such as a group by statement), you can determine that this point divides the previous and subsequent stages.
  • Use the Spark Web UI to view the running time and allocated data volume of each task in the reported stage, and then locate the specific code through the log exception stack.
  • Check the data distribution of the keys causing data skew and choose different solutions to resolve it:
    1. If data skew is caused by group by or join statements in Spark SQL, check the key distribution of the tables used in the SQL.
    2. If data skew is caused by executing shuffle operators on Spark RDD, you can add code to view key distribution in the Spark job, such as RDD.countByKey(). Then collect/take the counts of each key to the client for printing to see the key distribution.

Solutions for Data Skew#

Solution 1: Filter Out Few Keys Causing Skew#

If we determine that the few keys with particularly large data volumes are not particularly important to the execution and calculation results of the job, we can simply filter out those few keys. For example, in Spark SQL, you can use the where clause to filter out these keys or use the filter operator on RDD in Spark Core to filter out these keys.

Solution 2: Increase Parallelism of Shuffle Operations#

Implementation Idea:
When executing shuffle operators on RDD, pass a parameter to the shuffle operator, such as reduceByKey(1000), which sets the number of shuffle read tasks for this shuffle operator. For shuffle-type SQL statements in Spark, such as group by, join, etc., a parameter needs to be set, namely spark.sql.shuffle.partitions, which represents the parallelism of shuffle read tasks. The default value is 200, which is a bit small for many scenarios.

Implementation Principle:
Increasing the number of shuffle read tasks allows multiple keys originally assigned to one task to be distributed to multiple tasks, allowing each task to handle less data than before. For example, if there are originally 5 keys, each corresponding to 10 pieces of data, and these 5 keys are assigned to one task, then this task has to process 50 pieces of data. After increasing the shuffle read tasks, each task is assigned one key, meaning each task processes 10 pieces of data, thus naturally reducing the execution time of each task.

Solution 3: Two-Stage Aggregation (Local Aggregation + Global Aggregation)#

Implementation Idea:
Perform two-stage aggregation. The first stage is local aggregation, where each key is assigned a random number, such as a random number within 10. At this point, identical keys become different, for example, (hello, 1) (hello, 1) (hello, 1) (hello, 1) becomes (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1). Then, perform local aggregation operations such as reduceByKey on the data with random numbers, resulting in local aggregation results such as (1_hello, 2) (2_hello, 2). After that, remove the prefixes from each key, resulting in (hello,2)(hello,2), and perform global aggregation again to obtain the final result, such as (hello, 4).

Implementation Code:

// Step 1: Assign a random prefix to each key in the RDD.
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(new PairFunction<Tuple2<Long,Long>, String, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception {
        Random random = new Random();
        int prefix = random.nextInt(10);
        return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
    }
});
// Step 2: Perform local aggregation on the keys with random prefixes.
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
        return v1 + v2;
    }
});
// Step 3: Remove the random prefixes from each key in the RDD.
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)throws Exception {
        long originalKey = Long.valueOf(tuple._1.split("_")[1]);
        return new Tuple2<Long, Long>(originalKey, tuple._2);
    }
});
// Step 4: Perform global aggregation on the RDD without random prefixes.
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
        return v1 + v2;
    }
});

Advantages of the Solution:
For aggregation-type shuffle operations that cause data skew, the effect is very good. It can usually resolve data skew or at least significantly alleviate it, improving the performance of Spark jobs several times.

Disadvantages of the Solution:
It is only applicable to aggregation-type shuffle operations, and the applicable range is relatively narrow. If it is a join-type shuffle operation, other solutions must be used.

Solution 4: Convert Reduce Join to Map Join#

Implementation Idea:
Instead of using the join operator for connection operations, use Broadcast variables and map-type operators to implement join operations, thereby completely avoiding shuffle-type operations and completely preventing data skew from occurring.

Implementation Principle:
Ordinary joins go through the shuffle process, and once shuffle occurs, it means that data with the same key is pulled into a shuffle read task for joining, which is a reduce join. However, if one RDD is relatively small, you can use the full data of the small RDD as a broadcast variable + map operator to achieve the same effect as join, which is a map join. At this point, shuffle operations will not occur, and thus data skew will not occur.

Implementation Code:

// First, collect the data from the smaller RDD to the Driver.
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// Then use Spark's broadcasting feature to convert the small RDD data into a broadcast variable, so that each Executor only has one copy of the RDD data.
// This can save memory space as much as possible and reduce the performance overhead of network transmission.
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// Perform map-type operations on the other RDD instead of join-type operations.
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception {
        // In the operator function, use the broadcast variable to access the rdd1 data in the local Executor.
        List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
        // Convert rdd1 data into a Map for later join operations.
        Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
        for(Tuple2<Long, Row> data : rdd1Data) {
            rdd1DataMap.put(data._1, data._2);
        }
        // Get the key and value of the current RDD data.
        String key = tuple._1;
        String value = tuple._2;
        // Get the data that can be joined from rdd1 data Map based on the key.
        Row rdd1Value = rdd1DataMap.get(key);
        return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
    }
});
// Here it should be noted.
// The above approach is only applicable when the keys in rdd1 are unique and do not have duplicates.
// If there are multiple identical keys in rdd1, then flatMap-type operations must be used, and when joining, map cannot be used; instead, all data in rdd1 must be traversed for joining.
// Each piece of data in rdd2 may return multiple join results.

Solution 5: Sample Skewed Keys and Split Join Operations#

Applicable Scenario:
When joining two RDDs/Hive tables, if both data volumes are relatively large, you can check the key distribution of both RDDs/Hive tables. If data skew occurs because a few keys in one RDD/Hive table have a large amount of data while all keys in the other RDD/Hive table are distributed relatively evenly, then this solution is more suitable.
Implementation Idea:

  1. For the RDD containing a few keys with excessively large data volumes, sample a portion using the sample operator, then count the number of each key to determine which keys have the largest data volumes.
  2. Then, extract the data corresponding to these keys from the original RDD to form a separate RDD, and assign a random number as a prefix within n to each key, while the majority of keys that do not cause skew form another RDD.
  3. Next, filter out the data corresponding to those skewed keys from the other RDD that needs to be joined and form a separate RDD, expanding each piece of data into n pieces, where each of these n pieces is sequentially appended with a prefix from 0 to n, while the majority of keys that do not cause skew also form another RDD.
  4. Then, join the independent RDD with random prefixes and the independently expanded RDD, which will scatter the original identical keys into n parts, distributing them across multiple tasks for joining.
  5. The other two ordinary RDDs can be joined as usual.
  6. Finally, use the union operator to merge the results of the two joins, which is the final join result.

Implementation Principle:
For data skew caused by joins, if only a few keys are causing the skew, you can split those few keys into independent RDDs and scatter them into n parts with random prefixes for joining. At this point, the data corresponding to these few keys will not be concentrated on a few tasks but will be distributed across multiple tasks for joining, as shown in the image:

Solution 6: Use Random Prefixes and Expand RDDs for Joining#

Applicable Scenario: If there are a large number of keys in the RDD causing data skew during join operations, splitting the keys is not meaningful. In this case, the last solution must be used to resolve the issue.
Implementation Idea:

  1. Similar to "Solution 5", first check the data distribution in the RDD/Hive table to identify the RDD/Hive table causing data skew, such as multiple keys corresponding to over 10,000 pieces of data.
  2. Then, assign a random prefix within n to each piece of data in that RDD.
  3. Simultaneously, expand the other normal RDD, turning each piece of data into n pieces, with each expanded piece sequentially assigned a prefix from 0 to n.
  4. Finally, join the two processed RDDs.

Implementation Principle:
By adding random prefixes to originally identical keys, they become different keys, allowing these processed "different keys" to be distributed across multiple tasks for processing, rather than having one task handle a large amount of the same key. The difference between this solution and "Solution 5" is that the previous solution aims to handle only a few skewed keys specifically, and since the processing requires expanding the RDD, the memory usage is not large; whereas this solution targets cases with a large number of skewed keys, and since it cannot split some keys for separate processing, it must expand the entire RDD, which requires high memory resources.

Advantages of the Solution: It can generally handle data skew caused by join-type operations, and the effect is relatively significant, with performance improvement being quite good.
Disadvantages of the Solution: This solution is more about alleviating data skew rather than completely avoiding it. Additionally, it requires expanding the entire RDD, which demands high memory resources.

Shuffle Tuning#

Reference: Spark Shuffle Principles and Related Tuning

In versions after Spark 1.2, the default HashShuffleManager has been changed to SortShuffleManager. SortShuffleManager has made certain improvements compared to HashShuffleManager. The main improvement is that while each Task generates a lot of temporary disk files during shuffle operations, it will ultimately merge (merge) all temporary files into one disk file, so each Task only has one disk file. When the shuffle read task of the next stage pulls its data, it only needs to read part of the data from each disk file based on the index.

Optimization of HashShuffleManager#

Set spark.shuffle.consolidateFiles, default is false.
After enabling the consolidate mechanism, the concept of shuffleFileGroup will appear. The consolidate mechanism allows different tasks to reuse the same batch of disk files, which can effectively merge the disk files of multiple tasks to a certain extent, thereby significantly reducing the number of disk files and improving the performance of shuffle writes.

SortShuffleManager#

The operating mechanism of SortShuffleManager is mainly divided into two types:

  • One is the ordinary operating mechanism
  • The other is the bypass operating mechanism. When the number of shuffle read tasks is less than or equal to the value of spark.shuffle.sort.bypassMergeThreshold parameter (default is 200), the bypass mechanism will be enabled.
  • spark.shuffle.file.buffer
    Default value: 32k
    Parameter Description: This parameter is used to set the buffer size of BufferedOutputStream for shuffle write tasks. Data will first be written to the buffer before being written to disk files, and will only overflow to disk when the buffer is full.

  • spark.reducer.maxSizeInFlight
    Default value: 48m
    Parameter Description: This parameter is used to set the buffer size for shuffle read tasks, and this buffer determines how much data can be pulled each time. If the available memory resources for the job are relatively sufficient, this parameter can be increased appropriately.

  • spark.shuffle.io.maxRetries
    Default value: 3
    Parameter Description: When shuffle read tasks pull their data from the nodes where shuffle write tasks are located, if the pull fails due to network anomalies, it will automatically retry. This parameter represents the maximum number of retries allowed. If the pull is still unsuccessful within the specified number of times, it may lead to job execution failure.

  • spark.shuffle.io.retryWait
    Default value: 5s
    Parameter Description: Represents the waiting interval for each retry to pull data. It is recommended to increase the interval duration (e.g., 60s) to enhance the stability of shuffle operations.

  • spark.shuffle.memoryFraction
    Default value: 0.2
    Parameter Description: This parameter represents the proportion of Executor memory allocated for aggregation operations by shuffle read tasks, with a default of 20%.

  • spark.shuffle.manager
    Default value: sort
    Parameter Description: This parameter is used to set the type of ShuffleManager. After Spark 1.5, there are three options: hash, sort, and tungsten-sort. HashShuffleManager was the default option before Spark 1.2, but both Spark 1.2 and later versions default to SortShuffleManager. Tungsten-sort is similar to sort but uses the off-heap memory management mechanism planned in tungsten, which is more efficient in memory usage.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.