Symptom:
After a MapReduce Job with customized FileOutputCommitter got migrated from MRv1 to MRv2, some reducer output files are not moved into final output directory.Take a sample WordCount job with a customized FileOutputCommitter for example.
In this specific case, only 1st reducer's output files are moved into final output directory.
All other reducers' output files are still sitting in the temporary location:
/hao/wordfinal/part_00000/part-r-00000 /hao/wordfinal/part_00001/_temporary/1/task_1554837836642_0091_r_000001/part-r-00001 /hao/wordfinal/part_00002/_temporary/1/task_1554837836642_0091_r_000001/part-r-00002The expected final output files should be:
/hao/wordfinal/part_00000/part-r-00000 /hao/wordfinal/part_00001/part-r-00001 /hao/wordfinal/part_00002/part-r-00002Note: The same job works fine in MRv1 though before migrating to MRv2.
Env:
The MapReduce job is migrated from:MapR 5.x with Hadoop 0.20.2 (MRv1)
to
MapR 6.1 with Hadoop 2.7.0 (MRv2)
Root Cause:
This WordCount job has a customized FileOutputCommitter by overwriting Output Format Class:job.setOutputFormatClass(myOutputFormat.class);Inside this "myOutputFormat.java", the logic of the customized FileOutputCommitter is to set different output directory for different reducers:
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { if (this.myCommitter == null) { Path output = new Path(getOutputDir(context)); this.myCommitter = new FileOutputCommitter(output, context); } return this.myCommitter; } protected static String getOutputDir(TaskAttemptContext context) { int taskID = context.getTaskAttemptID().getTaskID().getId(); String outputBaseDir = getOutputPath(context) + "/part_" + NUMBER_FORMAT.format(taskID); // String outputBaseDir = getOutputPath(context) + "/part_static" ; return outputBaseDir; }
In MRv1's world:
This job works as expected.
$ hadoop fs -ls -R /hao/wordfinal drwxr-xr-x - mapr mapr 1 2019-04-24 14:15 /hao/wordfinal/_logs drwxr-xr-x - mapr mapr 1 2019-04-24 14:15 /hao/wordfinal/_logs/history -rwxr-xr-x 3 mapr mapr 18461 2019-04-24 14:15 /hao/wordfinal/_logs/history/s1.poc.com_1556138521585_job_201904241341_0004_mapr_wordcount.jar drwxr-xr-x - mapr mapr 2 2019-04-24 14:15 /hao/wordfinal/part_00000 drwxr-xr-x - mapr mapr 0 2019-04-24 14:15 /hao/wordfinal/part_00000/_temporary -rwxr-xr-x 3 mapr mapr 26 2019-04-24 14:15 /hao/wordfinal/part_00000/part-r-00000 drwxr-xr-x - mapr mapr 2 2019-04-24 14:15 /hao/wordfinal/part_00001 -rwxr-xr-x 3 mapr mapr 0 2019-04-24 14:15 /hao/wordfinal/part_00001/_SUCCESS -rwxr-xr-x 3 mapr mapr 18 2019-04-24 14:15 /hao/wordfinal/part_00001/part-r-00001 drwxr-xr-x - mapr mapr 2 2019-04-24 14:15 /hao/wordfinal/part_00002 drwxr-xr-x - mapr mapr 0 2019-04-24 14:15 /hao/wordfinal/part_00002/_temporary -rwxr-xr-x 3 mapr mapr 8 2019-04-24 14:15 /hao/wordfinal/part_00002/part-r-00002This is because all of the 3 reducer outputs are actually moved by the 3 reducers themselves based on Reducer Logs:
# grep -R hao * attempt_201904241341_0004_r_000000_0/syslog:2019-04-24 14:15:13,124 INFO output.FileOutputCommitter [main]: Saved output of task 'attempt_201904241341_0004_r_000000_0' to /hao/wordfinal/part_00000 attempt_201904241341_0004_r_000000_0/syslog:2019-04-24 14:15:18,257 INFO output.FileOutputCommitter [main]: Saved output of task 'attempt_201904241341_0004_r_000002_0' to /hao/wordfinal/part_00002 attempt_201904241341_0004_r_000001_0/syslog:2019-04-24 14:15:16,376 INFO output.FileOutputCommitter [main]: Saved output of task 'attempt_201904241341_0004_r_000001_0' to /hao/wordfinal/part_00001
In MRv2's world:
"The functionality of JobTracker in 1.x i.e resource management and job scheduling/monitoring are divided into separate daemons. - global ResourceManager (RM) and per-application ApplicationMaster (AM)."
So basically, not just reducers, AM will also call some methods(such as "mergePaths()" in this case) inside FileOutputCommitter.
As described in my previous article <What is the difference between mapreduce.fileoutputcommitter.algorithm.version=1 and 2>:
When mapreduce.fileoutputcommitter.algorithm.version=1(default value):
AM will do mergePaths() in the end after all reducers complete.
If this MR job has many reducers, AM will firstly wait for all reducers to finish and then use a single thread to merge the output files.
Here is where the issue is.
In above customized code, the output for each reducer is defined to "part_$taskID".
So developer expects the 3 reducer can write to 3 different sub-directories:
part_00000 part_00001 part_00002All of the 3 reducers will firstly write its output into a temporary location as below:
part_00000/_temporary/1/task_1554837836642_0091_r_000001/part-r-00000 part_00001/_temporary/1/task_1554837836642_0091_r_000001/part-r-00001 part_00002/_temporary/1/task_1554837836642_0091_r_000001/part-r-00002After all reducers complete, AM will call "mergePath()" inside FileOutputCommitter to move above files into the final output directory.
However AM's task attempt ID is also 0, so AM thinks the output directory should be part_00000.
That is why only reducer part_00000's output file is moved by AM into final output directory.
After this job completes, below will be the problematic situation:
part_00000/part-r-00000 part_00001/_temporary/1/task_1554837836642_0091_r_000001/part-r-00001 part_00002/_temporary/1/task_1554837836642_0091_r_000001/part-r-00002
Troubleshooting:
When this kind of issue happens in MRv2, we need to simplify the original job to a minimum reproduce like WordCount job in this case.To get to know "who is doing what", we can enable DEBUG log for Mapper, Reducer and AM:
-Dyarn.app.mapreduce.am.log.level=DEBUG -Dmapreduce.map.log.level=DEBUG -Dmapreduce.reduce.log.level=DEBUGFrom AM DEBUG log,we will know which reducers' temporary output is moved by AM, and which reducers' are not.
Eg:
2019-04-18 11:36:28,234 DEBUG [CommitterEvent Processor #1] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Merging data from MapRFileStatus{path=null; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to maprfs:/hao/wordfinal/part-r-00000From above DEBUG log, we know only 1st reducer' output is merged by AM.
Solution:
In MRv2's world, parameter "mapreduce.fileoutputcommitter.algorithm.version" is introduced per MAPREDUCE-4815.Setting "mapreduce.fileoutputcommitter.algorithm.version=2" in MRv2 can achive the same behavior for this specific job in MRv1, which is:
Reducers call "mergePath()" inside FileOutputCommitter instead of AM.
So for this specific customized job, in MRv2, if we set "-Dmapreduce.fileoutputcommitter.algorithm.version=2", then this job output will be moved by reducers.
As a result, below is the expected output file location:
# hadoop fs -ls -R /hao/wordfinal -rwxr-xr-x 3 mapr mapr 0 2019-04-23 11:55 /hao/wordfinal/_SUCCESS drwxr-xr-x - mapr mapr 2 2019-04-23 11:55 /hao/wordfinal/part_00000 drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary/1 drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary/1/_temporary drwxr-xr-x - mapr mapr 0 2019-04-23 11:55 /hao/wordfinal/part_00000/_temporary/1/_temporary/attempt_1554837836642_0127_r_000000_0 -rwxr-xr-x 3 mapr mapr 0 2019-04-23 11:55 /hao/wordfinal/part_00000/part-r-00000 drwxr-xr-x - mapr mapr 2 2019-04-23 11:55 /hao/wordfinal/part_00001 drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary/1 drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary/1/_temporary drwxr-xr-x - mapr mapr 0 2019-04-23 11:55 /hao/wordfinal/part_00001/_temporary/1/_temporary/attempt_1554837836642_0127_r_000001_0 -rwxr-xr-x 3 mapr mapr 0 2019-04-23 11:55 /hao/wordfinal/part_00001/part-r-00001 drwxr-xr-x - mapr mapr 2 2019-04-23 11:55 /hao/wordfinal/part_00002 drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary/1 drwxr-xr-x - mapr mapr 1 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary/1/_temporary drwxr-xr-x - mapr mapr 0 2019-04-23 11:55 /hao/wordfinal/part_00002/_temporary/1/_temporary/attempt_1554837836642_0127_r_000002_0 -rwxr-xr-x 3 mapr mapr 51 2019-04-23 11:55 /hao/wordfinal/part_00002/part-r-00002
Reference:
How to customize FileOutputCommitter for MapReduce job by overwriting Output Format ClassWhat is the difference between mapreduce.fileoutputcommitter.algorithm.version=1 and 2
No comments:
Post a Comment