Goal
This article explains the difference between below 2 Hive configurations which control if common join can be converted to map join:Env
All tests are done in Hive 1.2.0.Below 3 tables are created in advance:
| Table Name | Table Size |
| t30m | 30MB |
| t60m | 60MB |
| t90m | 90MB |
Concept
- hive.auto.convert.join
- hive.auto.convert.join.noconditionaltask
- hive.auto.convert.join.noconditionaltask.size
- hive.mapjoin.smalltable.filesize
Hands-on Experience
2-way join
Let's explain this behavior using the simplest 2-way join firstly.select count(*) from t30m a,t60m b where a.col0=b.col1 ;
a. Default configuration
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES:
Stage-6 is a root stage , consists of Stage-1
Stage-1
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-6
Conditional Operator
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col0 (type: string)
sort order: +
Map-reduce partition columns: col0 (type: string)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col1 is not null (type: boolean)
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col1 (type: string)
sort order: +
Map-reduce partition columns: col1 (type: string)
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11
Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col0 = _col11) (type: boolean)
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Stage-6(Conditional Operator) shows up because the sum of (n -1) file input size(30MB) > hive.auto.convert.join.noconditionaltask.size(10MB).There is only common join Stage-1 because the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.
As a result, if we execute this query, common join will be used for sure.
b. Only increase hive.mapjoin.smalltable.filesize to large enough
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=31000000;Explain plan:
STAGE DEPENDENCIES:
Stage-6 is a root stage , consists of Stage-7, Stage-1
Stage-7 has a backup stage: Stage-1
Stage-5 depends on stages: Stage-7
Stage-2 depends on stages: Stage-1, Stage-5
Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-6
Conditional Operator
Stage: Stage-7
Map Reduce Local Work
Alias -> Map Local Tables:
a
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
a
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 col0 (type: string)
1 col1 (type: string)
Stage: Stage-5
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col1 is not null (type: boolean)
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11
Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col0 = _col11) (type: boolean)
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col0 (type: string)
sort order: +
Map-reduce partition columns: col0 (type: string)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col1 is not null (type: boolean)
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col1 (type: string)
sort order: +
Map-reduce partition columns: col1 (type: string)
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11
Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col0 = _col11) (type: boolean)
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Since we only increased hive.mapjoin.smalltable.filesize, the Stage-6(Conditional Operator) is still there.And map join Stage-5 shows up together with common join Stage-1.
If we execute this query, Hive will choose from common join and map join:
Stage-7 is selected by condition resolver. Stage-1 is filtered out by condition resolver. 2016-01-28 18:43:11 Starting to launch local task to process map join; maximum memory = 477102080
c. Only increase hive.auto.convert.join.noconditionaltask.size to large enough
set hive.auto.convert.join.noconditionaltask.size=31000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES:
Stage-5 is a root stage
Stage-2 depends on stages: Stage-5
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-5
Map Reduce Local Work
Alias -> Map Local Tables:
a
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
a
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 col0 (type: string)
1 col1 (type: string)
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col1 is not null (type: boolean)
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11
Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (_col0 = _col11) (type: boolean)
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Local Work:
Map Reduce Local Work
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Although hive.mapjoin.smalltable.filesize is default and pretty small, hive.auto.convert.join.noconditionaltask.size is large enough to avoid "Conditional Operator" stage and make sure "Map Join" stage-5 is the only choice.Total jobs = 1 2016-01-28 18:45:41 Starting to launch local task to process map join; maximum memory = 477102080
3-way join
Take below join for example:select count(*) from t30m a,t60m b,t90m c where a.col0=b.col1 and b.col2=c.col3;
a. Default configuration
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES:
Stage-11 is a root stage , consists of Stage-1
Stage-1
Stage-8 depends on stages: Stage-1 , consists of Stage-12, Stage-13, Stage-2
Stage-12 has a backup stage: Stage-2
Stage-6 depends on stages: Stage-12
Stage-3 depends on stages: Stage-2, Stage-6, Stage-7
Stage-13 has a backup stage: Stage-2
Stage-7 depends on stages: Stage-13
Stage-2
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-11
Conditional Operator
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col0 (type: string)
sort order: +
Map-reduce partition columns: col0 (type: string)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (col1 is not null and col2 is not null) (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col1 (type: string)
sort order: +
Map-reduce partition columns: col1 (type: string)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
value expressions: col2 (type: string)
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11, _col12
Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-8
Conditional Operator
Stage: Stage-12
Map Reduce Local Work
Alias -> Map Local Tables:
c
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
c
TableScan
alias: c
Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col3 is not null (type: boolean)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 _col12 (type: string)
1 col3 (type: string)
Stage: Stage-6
Map Reduce
Map Operator Tree:
TableScan
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col12 (type: string)
1 col3 (type: string)
outputColumnNames: _col0, _col11, _col12, _col23
Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-13
Map Reduce Local Work
Alias -> Map Local Tables:
$INTNAME
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$INTNAME
TableScan
HashTable Sink Operator
keys:
0 _col12 (type: string)
1 col3 (type: string)
Stage: Stage-7
Map Reduce
Map Operator Tree:
TableScan
alias: c
Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col3 is not null (type: boolean)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col12 (type: string)
1 col3 (type: string)
outputColumnNames: _col0, _col11, _col12, _col23
Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col12 (type: string)
sort order: +
Map-reduce partition columns: _col12 (type: string)
Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col11 (type: string)
TableScan
alias: c
Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col3 is not null (type: boolean)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col3 (type: string)
sort order: +
Map-reduce partition columns: col3 (type: string)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col12 (type: string)
1 col3 (type: string)
outputColumnNames: _col0, _col11, _col12, _col23
Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Stage-11 and Stage-8 (Conditional Operator) show up because the sum of (n -1) file
input size(30MB+60MB) >
hive.auto.convert.join.noconditionaltask.size(10MB).Here are 2 steps here:
Step 1: (a joins b);
Step 2: (Output_of_Step1 joins c)
For step 1, it is same as 2-way join: There is only common join Stage-1 because the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.
For step 2, there are 3 options in explain plan:
Option 1: Common Join -- Stage-2
Option 2: Map Join(Put Output_of_Step1 in memory) -- Stage-13 and Stage-7
Option 3: Map Join(Put c in memory) -- Stage-12 and Stage-6
Since the Output_of_Step1 < hive.mapjoin.smalltable.filesize, Option 2 is chosen when we actually execute this query:
Stage-1 is selected by condition resolver. Stage-12 is filtered out by condition resolver. Stage-13 is selected by condition resolver. Stage-2 is filtered out by condition resolver. 2016-01-28 18:53:01 Starting to launch local task to process map join; maximum memory = 477102080 Hadoop job information for Stage-7: number of mappers: 1; number of reducers: 0 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 1 MapReduce Jobs Launched: Stage-Stage-1: Map: 3 Reduce: 1 Cumulative CPU: 25.66 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS Stage-Stage-7: Map: 1 Cumulative CPU: 5.07 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS Stage-Stage-3: Map: 1 Reduce: 1 Cumulative CPU: 2.27 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
b. Only increase hive.mapjoin.smalltable.filesize to large enough
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=31000000;Explain plan:
STAGE DEPENDENCIES:
Stage-11 is a root stage , consists of Stage-14, Stage-1
Stage-14 has a backup stage: Stage-1
Stage-10 depends on stages: Stage-14
Stage-8 depends on stages: Stage-1, Stage-10 , consists of Stage-12, Stage-13, Stage-2
Stage-12 has a backup stage: Stage-2
Stage-6 depends on stages: Stage-12
Stage-3 depends on stages: Stage-2, Stage-6, Stage-7
Stage-13 has a backup stage: Stage-2
Stage-7 depends on stages: Stage-13
Stage-2
Stage-1
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-11
Conditional Operator
Stage: Stage-14
Map Reduce Local Work
Alias -> Map Local Tables:
a
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
a
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 col0 (type: string)
1 col1 (type: string)
Stage: Stage-10
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (col1 is not null and col2 is not null) (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11, _col12
Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work
Stage: Stage-8
Conditional Operator
Stage: Stage-12
Map Reduce Local Work
Alias -> Map Local Tables:
c
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
c
TableScan
alias: c
Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col3 is not null (type: boolean)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 _col12 (type: string)
1 col3 (type: string)
Stage: Stage-6
Map Reduce
Map Operator Tree:
TableScan
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col12 (type: string)
1 col3 (type: string)
outputColumnNames: _col0, _col11, _col12, _col23
Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-13
Map Reduce Local Work
Alias -> Map Local Tables:
$INTNAME
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$INTNAME
TableScan
HashTable Sink Operator
keys:
0 _col12 (type: string)
1 col3 (type: string)
Stage: Stage-7
Map Reduce
Map Operator Tree:
TableScan
alias: c
Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col3 is not null (type: boolean)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col12 (type: string)
1 col3 (type: string)
outputColumnNames: _col0, _col11, _col12, _col23
Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work
Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col12 (type: string)
sort order: +
Map-reduce partition columns: _col12 (type: string)
Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col11 (type: string)
TableScan
alias: c
Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col3 is not null (type: boolean)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col3 (type: string)
sort order: +
Map-reduce partition columns: col3 (type: string)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col12 (type: string)
1 col3 (type: string)
outputColumnNames: _col0, _col11, _col12, _col23
Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col0 (type: string)
sort order: +
Map-reduce partition columns: col0 (type: string)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (col1 is not null and col2 is not null) (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: col1 (type: string)
sort order: +
Map-reduce partition columns: col1 (type: string)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
value expressions: col2 (type: string)
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11, _col12
Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Here are 2 steps here:Step 1: (a joins b);
Step 2: (Output_of_Step1 joins c)
For step 1, there are 2 options in explain plan:
Option 1: Common Join -- Stage-1
Option 2: Map Join(Put a in memory) -- Stage-14 and Stage-10
For step 2, there are 3 options in explain plan:
Option 1: Common Join -- Stage-2
Option 2: Map Join(Put Output_of_Step1 in memory) -- Stage-13 and Stage-7
Option 3: Map Join(Put c in memory) -- Stage-12 and Stage-6
If we actually execute this query, 2 Map Join will be chosen just because hive.mapjoin.smalltable.filesize is increased to large enough:
Stage-14 is selected by condition resolver. Stage-1 is filtered out by condition resolver. 2016-01-28 19:29:37 Starting to launch local task to process map join; maximum memory = 477102080 Hadoop job information for Stage-10: number of mappers: 1; number of reducers: 0 Stage-12 is filtered out by condition resolver. Stage-13 is selected by condition resolver. Stage-2 is filtered out by condition resolver. 2016-01-28 19:30:04 Starting to launch local task to process map join; maximum memory = 477102080 Hadoop job information for Stage-7: number of mappers: 1; number of reducers: 0 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 1
c. Only increase hive.auto.convert.join.noconditionaltask.size to large enough
set hive.auto.convert.join.noconditionaltask.size=95000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES:
Stage-9 is a root stage
Stage-7 depends on stages: Stage-9
Stage-8 depends on stages: Stage-7
Stage-3 depends on stages: Stage-8
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-9
Map Reduce Local Work
Alias -> Map Local Tables:
a
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
a
TableScan
alias: a
Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col0 is not null (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 col0 (type: string)
1 col1 (type: string)
Stage: Stage-7
Map Reduce
Map Operator Tree:
TableScan
alias: b
Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (col1 is not null and col2 is not null) (type: boolean)
Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 col0 (type: string)
1 col1 (type: string)
outputColumnNames: _col0, _col11, _col12
Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work
Stage: Stage-8
Map Reduce Local Work
Alias -> Map Local Tables:
c
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
c
TableScan
alias: c
Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: col3 is not null (type: boolean)
Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 _col12 (type: string)
1 col3 (type: string)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col12 (type: string)
1 col3 (type: string)
outputColumnNames: _col0, _col11, _col12, _col23
Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Select Operator
Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Local Work:
Map Reduce Local Work
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
No matter how small hive.mapjoin.smalltable.filesize is, as long as hive.auto.convert.join.noconditionaltask.size(95MB) > sum of n-1 table sizes(30MB+60MB), there is no any "Conditional Operator".Both steps are map join -- Stage-3 and Stage-7.
Explain plan output is much easier to read and understood now.
Conclusion
1. Normally "Conditional Operator" exists and Hive checks hive.mapjoin.smalltable.filesize to decide if a common join or a map join should be chosen.2. When hive.auto.convert.join.noconditionaltask.size > sum of size for n-1 of the tables/partitions for an n-way join, then map join will be chosen regardless of hive.mapjoin.smalltable.filesize.
Be aware that, if you decide to increase hive.auto.convert.join.noconditionaltask.size to force a n-way join to use map join, make sure you know the potential size of each join output.
Because the size of output (30MB joins 60MB) could be 0 bytes, and also could be GBs. It all depends on the cardinality and the join condition.
I think that is probably why by default hive.auto.convert.join.noconditionaltask.size is only 10MB so that people will not abuse this configuration.
Shouldn't all the plans created be the same for a particular query? Which means the Hive driver create permutation of all possible ways in which queries could be run? like the plan for the second query, which draws out plans for both common join and a map join. Of course, its only during execution, will a certain path in the query plan(map join or common join) be executed. the actual selection of the path could be based off of hive.auto.convert.join.noconditionaltask.size and
ReplyDeletehive.mapjoin.smalltable.filesize
Also, i'm a little confused with the differences in the above properties. Could you elaborate a little. I couldn't get any help online. Its the same copy pasted thing everywhere.
Thanks
Abhilash
If you think like a query planner, of course, all plans should be in consideration.
DeleteHowever the explain plan output may not print all possible plans due to the value of some parameters such as hive.auto.convert.join.noconditionaltask.size and hive.mapjoin.smalltable.filesize.
Look at Plan a in 2-way join, the map join plan is not printed simply because: the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.
Thanks for your prompt reply.
DeleteSo why is it that in the 2 way join example (with hive.auto.convert.join.noconditionaltask.size=10000000 and hive.mapjoin.smalltable.filesize=31000000) does the query plan output show both stage 1 and stage 5? Shouldn't the plan show only the stage 5?
Thanks,
Abhilash
Also, when we talk of the "conditional task", do we mean checking the size of the small table or checking the size of the n-1 table/partitions? The reason why i'm confused with these 2 parameters is that i do not understand the need for hive.auto.convert.join.noconditionaltask.size, when hive.mapjoin.smalltable.filesize is already serving the purpose in an n way join.
Delete~Abhilash
check my latest reply below.
DeleteIf you do not want "conditional task", the only way is to make sure hive.auto.convert.join.noconditionaltask.size is large enough.
ReplyDeleteCheck the code ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java:
// If sizes of at least n-1 tables in a n-way join is known, and their sum is smaller than
// the threshold size, convert the join into map-join and don't create a conditional task
boolean convertJoinMapJoin = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK);
Then:
MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition);
newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
replaceTask(currTask, newTask, physicalContext);
After that , the code logic is to check individual table size VS hive.mapjoin.smalltable.filesize. And add possible mapjoin task into "conditional task".
The reason why they did not remove "common join" is probably because the statistics may not be accurate during planning time. Hive want to make the final decision at runtime according to the size of each table(or results).
Great post thannks
ReplyDelete