Wednesday, May 24, 2017

Hive on Tez : How to control the number of Mappers and Reducers

Goal:

How to control the number of Mappers and Reducers in Hive on Tez.

Env:

Hive 2.1
Tez 0.8

Solution:

1. # of Mappers

Which Tez parameters control this?
  • tez.grouping.max-size(default 1073741824 which is 1GB)
  • tez.grouping.min-size(default 52428800 which is 50MB)
  • tez.grouping.split-count(not set by default)
Which log for debugging # of Mappers?
DAG syslog in the DAG Application Master container directory.
Search for "grouper.TezSplitGrouper", for example:
# grep grouper.TezSplitGrouper syslog_dag_1475192050844_0026_1
2017-05-23 15:00:50,285 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Grouping splits in Tez
2017-05-23 15:00:50,288 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Desired numSplits: 59 lengthPerGroup: 97890789 numLocations: 4 numSplitsPerLocation: 40 numSplitsInGroup: 2 totalLength: 5775556608 numOriginalSplits: 161 . Grouping by length: true count: false nodeLocalOnly: false
2017-05-23 15:00:50,291 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Doing rack local after iteration: 18 splitsProcessed: 139 numFullGroupsInRound: 0 totalGroups: 68 lengthPerGroup: 73418096 numSplitsInGroup: 1
2017-05-23 15:00:50,291 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Allowing small groups after iteration: 19 splitsProcessed: 139 numFullGroupsInRound: 0 totalGroups: 68
2017-05-23 15:00:50,291 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Number of splits desired: 59 created: 69 splitsProcessed: 161
This means, this Hive on Tez query finally spawns 69 Mappers.

If we set tez.grouping.max-size=tez.grouping.min-size=1073741824(1G), here is the result:
# grep grouper.TezSplitGrouper syslog_dag_1475192050844_0030_1
2017-05-23 17:16:11,851 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Grouping splits in Tez
2017-05-23 17:16:11,852 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Desired splits: 6 too large.  Desired splitLength: 97890789 Min splitLength: 1073741824 New desired splits: 6 Final desired splits: 6 All splits have localhost: false Total length: 5775556608 Original splits: 161
2017-05-23 17:16:11,854 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Desired numSplits: 6 lengthPerGroup: 962592768 numLocations: 4 numSplitsPerLocation: 40 numSplitsInGroup: 26 totalLength: 5775556608 numOriginalSplits: 161 . Grouping by length: true count: false nodeLocalOnly: false
2017-05-23 17:16:11,856 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Doing rack local after iteration: 3 splitsProcessed: 135 numFullGroupsInRound: 0 totalGroups: 6 lengthPerGroup: 721944576 numSplitsInGroup: 19
2017-05-23 17:16:11,856 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Allowing small groups after iteration: 4 splitsProcessed: 135 numFullGroupsInRound: 0 totalGroups: 6
2017-05-23 17:16:11,856 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Number of splits desired: 6 created: 7 splitsProcessed: 161
This time only 7 Mappers are spawned because of "Min splitLength: 1073741824".

If we set tez.grouping.split-count=13 here is the result:
# grep grouper.TezSplitGrouper syslog_dag_1475192050844_0039_1
2017-05-24 16:27:05,523 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Grouping splits in Tez
2017-05-24 16:27:05,523 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Desired numSplits overridden by config to: 13
2017-05-24 16:27:05,526 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Desired numSplits: 13 lengthPerGroup: 444273585 numLocations: 4 numSplitsPerLocation: 40 numSplitsInGroup: 12 totalLength: 5775556608 numOriginalSplits: 161 . Grouping by length: true count: false nodeLocalOnly: false
2017-05-24 16:27:05,528 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Doing rack local after iteration: 5 splitsProcessed: 156 numFullGroupsInRound: 0 totalGroups: 14 lengthPerGroup: 333205184 numSplitsInGroup: 9
2017-05-24 16:27:05,528 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Allowing small groups after iteration: 6 splitsProcessed: 156 numFullGroupsInRound: 0 totalGroups: 14
2017-05-24 16:27:05,528 [INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Number of splits desired: 13 created: 15 splitsProcessed: 161
This time 15 Mappers are spawned because of "Desired numSplits overridden by config to: 13".

BTW, the query tested is "select count(*) from passwords".
Here "Original splits: 161" means there are totally 161 files:
# ls  passwords|wc -l
161
Here "Total length: 5775556608" means the table size is about 5.4G:
# du -b passwords
5775556769 passwords

The detailed algorithm is in tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java.
And also this article is explaining the logic:https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works

2. # of Reducers

Same as Hive on MR query, below parameters controls # of Reducers:
  • hive.exec.reducers.bytes.per.reducer(default 256000000)
  • hive.exec.reducers.max(default 1009)
  • hive.tez.auto.reducer.parallelism(default false)
Take below query for example, focus on "Reducer 2" which is the join:
hive> select count(*) from passwords a, passwords b where a.col0=b.col1;
Query ID = mapr_20170524140623_bc36636e-c295-4e75-b7ac-fe066320dce1
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1475192050844_0034)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     69         69        0        0       0       0
Map 4 .......... container     SUCCEEDED     69         69        0        0       0       0
Reducer 2 ...... container     SUCCEEDED     45         45        0        0       1       0
Reducer 3 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 04/04  [==========================>>] 100%  ELAPSED TIME: 164.32 s
----------------------------------------------------------------------------------------------
OK
0
"Reducer 2" spawns 45 Reducers.

If we double hive.exec.reducers.bytes.per.reducer to 512000000, "Reducer 2" spawns only half # of Reducers -- 23 this time.
hive> set hive.exec.reducers.bytes.per.reducer=512000000;
hive> select count(*) from passwords a, passwords b where a.col0=b.col1;
Query ID = mapr_20170524142206_d07caa6a-0061-43a6-b5e9-4f67880cf118
Total jobs = 1
Launching Job 1 out of 1
Tez session was closed. Reopening...
Session re-established.


Status: Running (Executing on YARN cluster with App id application_1475192050844_0035)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     69         69        0        0       0       0
Map 4 .......... container     SUCCEEDED     69         69        0        0       0       0
Reducer 2 ...... container     SUCCEEDED     23         23        0        0       0       0
Reducer 3 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 04/04  [==========================>>] 100%  ELAPSED TIME: 179.62 s
----------------------------------------------------------------------------------------------
OK
0

Of course, we can set a hard limit of # of Reducers by setting hive.exec.reducers.max=10:
hive> set hive.exec.reducers.max=10;
hive> select count(*) from passwords a, passwords b where a.col0=b.col1;
Query ID = mapr_20170524142736_4367dee2-b695-4162-ad47-99d7ff2311bc
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1475192050844_0035)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     69         69        0        0       0       0
Map 4 .......... container     SUCCEEDED     69         69        0        0       0       0
Reducer 2 ...... container     SUCCEEDED     10         10        0        0       1       0
Reducer 3 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 04/04  [==========================>>] 100%  ELAPSED TIME: 153.35 s
----------------------------------------------------------------------------------------------
OK
0

Another feature is controlled by hive.tez.auto.reducer.parallelism:
Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as necessary.
hive> set hive.tez.auto.reducer.parallelism = true;
hive> select count(*) from passwords a, passwords b where a.col0=b.col1;
Query ID = mapr_20170524143541_18b3c2b6-75a8-4fbb-8a0d-2cf354fd7a72
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1475192050844_0036)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED     69         69        0        0       0       0
Map 4 .......... container     SUCCEEDED     69         69        0        0       0       0
Reducer 2 ...... container     SUCCEEDED     12         12        0        0       1       0
Reducer 3 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 04/04  [==========================>>] 100%  ELAPSED TIME: 143.69 s
----------------------------------------------------------------------------------------------
OK
0
From DAG syslog file, we can see in the beginning "Reducer 2" tried to spawn 90 Reducers, and then it got changed to 12 at runtime:
[root@s3 container_e02_1475192050844_0036_01_000001]# grep "Reducer 2" syslog_dag_1475192050844_0036_1 |grep parallelism
2017-05-24 14:36:36,831 [INFO] [App Shared Pool - #0] |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: Reducer 2 from 90 to 12
2017-05-24 14:36:36,837 [INFO] [App Shared Pool - #0] |impl.VertexImpl|: Resetting vertex location hints due to change in parallelism for vertex: vertex_1475192050844_0036_1_02 [Reducer 2]
2017-05-24 14:36:36,841 [INFO] [App Shared Pool - #0] |impl.VertexImpl|: Vertex vertex_1475192050844_0036_1_02 [Reducer 2] parallelism set to 12 from 90


10 comments:

Popular Posts