Hadoop Series - MapReduce#
The main idea of MapReduce: automatically split a large computation program into Map and Reduce, divide and conquer
.
Process Overview#
input --> map --> shuffle --> reduce ---> output
Map Side Process#
- Map: Custom Map function to process data
- Partition: To send the results of the map to the corresponding reduce side. The total number of partitions equals the number of reducers. The specific implementation is: hash the key, then take the modulus with the number of reduce tasks, and then go to the specified job (default HashPartitioner, can be customized through
job.setPartitionerClass(MyPartition.class)
) - Sort: First sort by the partition number in
<key,value,partition>
, then sort by key, this is the sort operation, and the small overflow files are partitioned, ensuring that keys are ordered within the same partition. - Combine: Perform statistics in advance, conduct local merging to reduce data transmission costs from the Map side to the Reduce side. Developers must set combine in the program (custom combine operation in the program through
job.setCombinerClass(myCombine.class)
). This may occur in two stages: (1) After the map output data is sorted by partition, a combine operation will be executed before writing to the file, provided this operation is set in the job. (2) If the map output is large, and the number of overflow files exceeds 3 (this value can be configured through the propertymin.num.spills.for.combine
), a combine operation will also be executed during the merge process. - Merge: When the map is large, each overflow will produce a spill_file, resulting in multiple spill_files, which then need to be merged, ultimately ensuring that a MapTask outputs only one file. That is, after all data from the Map Task is processed, a merge operation will be performed on all intermediate data files generated by the task to ensure that a Map Task ultimately generates only one intermediate data file.
Reduce Side Process#
- Copy: The Reducer node downloads the data belonging to its partition from each mapper node, and data from the same partition will land on the same node.
- Merge: The Reducer merges the data pulled from different Mapper nodes into one file.
- Reduce: Perform statistics on the data.
Submitting MR Job to Yarn Process#
1. Generate RunJar process, the client requests RM to execute a job.
2. RM returns the path for resource submission related to the job staging-dir and the job ID generated by this job.
3. The client submits the job-related resources to the corresponding shared file system path (/yarn-staging/jobID).
4. The client reports the submission results to RM.
5. RM adds the job to the task queue.
6. NM obtains new tasks from RM's task queue through heartbeat connection with RM.
7. RM allocates running resource containers to NM.
8. RM starts the MRAppMaster process in the container.
9. The created MRAppmaster is responsible for allocating which NodeManagers to run map (i.e., yarnchild process) and reduce tasks.
10. The NM running the map and reduce tasks retrieves job-related resources from the shared file system, including jar files, configuration files, etc., and then runs the map and reduce tasks.
11. After the job execution is complete, MRAppMaster unregisters itself from RM, releasing resources.
Shuffle Mechanism#
The entire process from Map output to Reduce input can be broadly referred to as Shuffle. Shuffle spans both the Map and Reduce sides, including the Partitioning and Spill processes on the Map side, and the copy and Merge processes on the Reduce side.
Circular Memory Buffer#
In the map method(), the last step outputs the intermediate processing results through OutputCollector.collect(key,value)
or context.write(key,value)
. In the relevant collect(key,value)
method, the Partitioner.getPartition(K2 key, V2 value, int numPartitions)
method is called to obtain the partition number corresponding to the output key/value (the partition number can be considered as corresponding to a node where a Reduce Task will be executed), and then <key,value,partition>
is temporarily stored in the MapOutputBuffer's internal circular data buffer
in memory, which has a default size of 100MB, adjustable via the parameter io.sort.mb
.
The data stored in the MapOutputBuffer uses two index structures, involving three circular memory buffers:
-
kvoffsets buffer
: Also known as the offset index array, used to save the offset of key/value information in the position index kvindices. When the usage ofkvoffsets
exceedsmapreduce.map.sort.spill.percent (default 80%)
, a SpillThread thread will trigger a "spill" operation, starting a Spill phase operation. -
kvindices buffer
: Also known as the position index array, used to save the starting position of key/value in the data buffer kvbuffer. -
kvbuffer, the data buffer
: Used to save the actual values of key/value. By default, this buffer can use up to 95% ofio.sort.mb
. When the usage ofkvbuffer exceeds mapreduce.map.sort.spill.percent (default 80%)
, a SpillThread thread will trigger a "spill" operation, starting a Spill phase operation.
Spill#
When the kvoffsets buffer
and kvbuffer, the data buffer
in the MapOutputBuffer reach the threshold, a spill operation is triggered. This writes the data from the memory buffer to the local disk, sorting it first by partition and then by key during the write process.
- This spill operation is performed by another separate thread, which does not affect the thread writing map results to the buffer.
- Before writing data to disk, a sorting operation must be performed on the data to be written to disk, first sorting by the partition number in <key,value,partition>, and then sorting by key. This is the sort operation, and the final small overflow files are partitioned, ensuring that keys are ordered within the same partition.
Copy Phase#
By default, when the number of completed Map Task tasks in the entire MapReduce job exceeds 5% of the total number of Map Tasks, the JobTracker will begin scheduling the execution of Reduce Task tasks. Then, the Reduce Task tasks will default to starting mapred.reduce.parallel.copies (default 5)
MapOutputCopier threads to copy a portion of their data from the completed Map Task task nodes. These copied data will first be saved in the memory buffer, and when the memory buffer usage reaches a certain threshold, it will be written to disk.
Supplement
The control of the size of this memory buffer is not set like the map side's memory buffer through io.sort.mb
, but rather through another parameter: mapred.job.shuffle.input.buffer.percent (default 0.7)
. This parameter means that the data in the shuffle in the reduce memory can use a maximum memory amount of: 0.7 × maxHeap of the reduce task. If the maximum heap usage of the reduce task is typically set through mapred.child.java.opts
, for example, set to -Xmx1024m, the reduce will use 70% of its heap size to cache data in memory. If the reduce's heap is adjusted to be larger due to business reasons, the corresponding cache size will also increase, which is why the parameter used for caching in reduce is a percentage rather than a fixed value.
Merge Process#
The data copied over will first be placed in the memory buffer, which is more flexible in size than the map side's buffer, as it is based on the JVM's heap size
settings. Since the Shuffle phase does not run the Reducer, most of the memory should be allocated for Shuffle.
There are three forms:
- Memory to memory
- Memory to disk
- Disk to disk
By default, the first form is not enabled. When the amount of data in memory reaches a certain threshold, memory to disk merging is initiated (the reason for merging is that the reduce side does not sort the data when copying from multiple map sides; it simply loads them into memory, and when the threshold is reached to write to disk, merging is needed). This merging method continues to run until there is no data from the map side, after which the third disk-to-disk merging method will be initiated to generate the final file.
MR Tuning Thoughts#
Data Skew
: Data is summarized to Reduce, and if one ReduceTask has too much data, it will lead to very low efficiency for the entire program.- Improper settings for the number of Map and Reduce tasks.
- Long Map run time, leading to prolonged Reduce waiting.
Too many small files
: Regardless of the file size, metadata information will be allocated, and if there are too many, it will waste resources and storage space.- There are many large files in MR that cannot be split, causing continuous spills during the shuffle phase.
- Multiple small overflow files require multi-level merging.