Goal
This article explains the difference between below 2 Hive configurations which control if common join can be converted to map join:Env
All tests are done in Hive 1.2.0.Below 3 tables are created in advance:
Table Name | Table Size |
t30m | 30MB |
t60m | 60MB |
t90m | 90MB |
Concept
- hive.auto.convert.join
- hive.auto.convert.join.noconditionaltask
- hive.auto.convert.join.noconditionaltask.size
- hive.mapjoin.smalltable.filesize
Hands-on Experience
2-way join
Let's explain this behavior using the simplest 2-way join firstly.select count(*) from t30m a,t60m b where a.col0=b.col1 ;
a. Default configuration
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES: Stage-6 is a root stage , consists of Stage-1 Stage-1 Stage-2 depends on stages: Stage-1 Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-6 Conditional Operator Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col0 (type: string) sort order: + Map-reduce partition columns: col0 (type: string) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col1 is not null (type: boolean) Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col1 (type: string) sort order: + Map-reduce partition columns: col1 (type: string) Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11 Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (_col0 = _col11) (type: boolean) Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-2 Map Reduce Map Operator Tree: TableScan Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSinkStage-6(Conditional Operator) shows up because the sum of (n -1) file input size(30MB) > hive.auto.convert.join.noconditionaltask.size(10MB).
There is only common join Stage-1 because the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.
As a result, if we execute this query, common join will be used for sure.
b. Only increase hive.mapjoin.smalltable.filesize to large enough
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=31000000;Explain plan:
STAGE DEPENDENCIES: Stage-6 is a root stage , consists of Stage-7, Stage-1 Stage-7 has a backup stage: Stage-1 Stage-5 depends on stages: Stage-7 Stage-2 depends on stages: Stage-1, Stage-5 Stage-1 Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-6 Conditional Operator Stage: Stage-7 Map Reduce Local Work Alias -> Map Local Tables: a Fetch Operator limit: -1 Alias -> Map Local Operator Tree: a TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 col0 (type: string) 1 col1 (type: string) Stage: Stage-5 Map Reduce Map Operator Tree: TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col1 is not null (type: boolean) Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11 Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (_col0 = _col11) (type: boolean) Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Stage: Stage-2 Map Reduce Map Operator Tree: TableScan Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col0 (type: string) sort order: + Map-reduce partition columns: col0 (type: string) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col1 is not null (type: boolean) Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col1 (type: string) sort order: + Map-reduce partition columns: col1 (type: string) Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11 Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (_col0 = _col11) (type: boolean) Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSinkSince we only increased hive.mapjoin.smalltable.filesize, the Stage-6(Conditional Operator) is still there.
And map join Stage-5 shows up together with common join Stage-1.
If we execute this query, Hive will choose from common join and map join:
Stage-7 is selected by condition resolver. Stage-1 is filtered out by condition resolver. 2016-01-28 18:43:11 Starting to launch local task to process map join; maximum memory = 477102080
c. Only increase hive.auto.convert.join.noconditionaltask.size to large enough
set hive.auto.convert.join.noconditionaltask.size=31000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES: Stage-5 is a root stage Stage-2 depends on stages: Stage-5 Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-5 Map Reduce Local Work Alias -> Map Local Tables: a Fetch Operator limit: -1 Alias -> Map Local Operator Tree: a TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 col0 (type: string) 1 col1 (type: string) Stage: Stage-2 Map Reduce Map Operator Tree: TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col1 is not null (type: boolean) Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11 Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (_col0 = _col11) (type: boolean) Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Local Work: Map Reduce Local Work Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSinkAlthough hive.mapjoin.smalltable.filesize is default and pretty small, hive.auto.convert.join.noconditionaltask.size is large enough to avoid "Conditional Operator" stage and make sure "Map Join" stage-5 is the only choice.
Total jobs = 1 2016-01-28 18:45:41 Starting to launch local task to process map join; maximum memory = 477102080
3-way join
Take below join for example:select count(*) from t30m a,t60m b,t90m c where a.col0=b.col1 and b.col2=c.col3;
a. Default configuration
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES: Stage-11 is a root stage , consists of Stage-1 Stage-1 Stage-8 depends on stages: Stage-1 , consists of Stage-12, Stage-13, Stage-2 Stage-12 has a backup stage: Stage-2 Stage-6 depends on stages: Stage-12 Stage-3 depends on stages: Stage-2, Stage-6, Stage-7 Stage-13 has a backup stage: Stage-2 Stage-7 depends on stages: Stage-13 Stage-2 Stage-0 depends on stages: Stage-3 STAGE PLANS: Stage: Stage-11 Conditional Operator Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col0 (type: string) sort order: + Map-reduce partition columns: col0 (type: string) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (col1 is not null and col2 is not null) (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col1 (type: string) sort order: + Map-reduce partition columns: col1 (type: string) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE value expressions: col2 (type: string) Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11, _col12 Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-8 Conditional Operator Stage: Stage-12 Map Reduce Local Work Alias -> Map Local Tables: c Fetch Operator limit: -1 Alias -> Map Local Operator Tree: c TableScan alias: c Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col3 is not null (type: boolean) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 _col12 (type: string) 1 col3 (type: string) Stage: Stage-6 Map Reduce Map Operator Tree: TableScan Map Join Operator condition map: Inner Join 0 to 1 keys: 0 _col12 (type: string) 1 col3 (type: string) outputColumnNames: _col0, _col11, _col12, _col23 Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean) Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Stage: Stage-3 Map Reduce Map Operator Tree: TableScan Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-13 Map Reduce Local Work Alias -> Map Local Tables: $INTNAME Fetch Operator limit: -1 Alias -> Map Local Operator Tree: $INTNAME TableScan HashTable Sink Operator keys: 0 _col12 (type: string) 1 col3 (type: string) Stage: Stage-7 Map Reduce Map Operator Tree: TableScan alias: c Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col3 is not null (type: boolean) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 _col12 (type: string) 1 col3 (type: string) outputColumnNames: _col0, _col11, _col12, _col23 Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean) Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Stage: Stage-2 Map Reduce Map Operator Tree: TableScan Reduce Output Operator key expressions: _col12 (type: string) sort order: + Map-reduce partition columns: _col12 (type: string) Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string), _col11 (type: string) TableScan alias: c Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col3 is not null (type: boolean) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col3 (type: string) sort order: + Map-reduce partition columns: col3 (type: string) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 keys: 0 _col12 (type: string) 1 col3 (type: string) outputColumnNames: _col0, _col11, _col12, _col23 Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean) Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSinkStage-11 and Stage-8 (Conditional Operator) show up because the sum of (n -1) file input size(30MB+60MB) > hive.auto.convert.join.noconditionaltask.size(10MB).
Here are 2 steps here:
Step 1: (a joins b);
Step 2: (Output_of_Step1 joins c)
For step 1, it is same as 2-way join: There is only common join Stage-1 because the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.
For step 2, there are 3 options in explain plan:
Option 1: Common Join -- Stage-2
Option 2: Map Join(Put Output_of_Step1 in memory) -- Stage-13 and Stage-7
Option 3: Map Join(Put c in memory) -- Stage-12 and Stage-6
Since the Output_of_Step1 < hive.mapjoin.smalltable.filesize, Option 2 is chosen when we actually execute this query:
Stage-1 is selected by condition resolver. Stage-12 is filtered out by condition resolver. Stage-13 is selected by condition resolver. Stage-2 is filtered out by condition resolver. 2016-01-28 18:53:01 Starting to launch local task to process map join; maximum memory = 477102080 Hadoop job information for Stage-7: number of mappers: 1; number of reducers: 0 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 1 MapReduce Jobs Launched: Stage-Stage-1: Map: 3 Reduce: 1 Cumulative CPU: 25.66 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS Stage-Stage-7: Map: 1 Cumulative CPU: 5.07 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS Stage-Stage-3: Map: 1 Reduce: 1 Cumulative CPU: 2.27 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
b. Only increase hive.mapjoin.smalltable.filesize to large enough
set hive.auto.convert.join.noconditionaltask.size=10000000; set hive.mapjoin.smalltable.filesize=31000000;Explain plan:
STAGE DEPENDENCIES: Stage-11 is a root stage , consists of Stage-14, Stage-1 Stage-14 has a backup stage: Stage-1 Stage-10 depends on stages: Stage-14 Stage-8 depends on stages: Stage-1, Stage-10 , consists of Stage-12, Stage-13, Stage-2 Stage-12 has a backup stage: Stage-2 Stage-6 depends on stages: Stage-12 Stage-3 depends on stages: Stage-2, Stage-6, Stage-7 Stage-13 has a backup stage: Stage-2 Stage-7 depends on stages: Stage-13 Stage-2 Stage-1 Stage-0 depends on stages: Stage-3 STAGE PLANS: Stage: Stage-11 Conditional Operator Stage: Stage-14 Map Reduce Local Work Alias -> Map Local Tables: a Fetch Operator limit: -1 Alias -> Map Local Operator Tree: a TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 col0 (type: string) 1 col1 (type: string) Stage: Stage-10 Map Reduce Map Operator Tree: TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (col1 is not null and col2 is not null) (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11, _col12 Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Stage: Stage-8 Conditional Operator Stage: Stage-12 Map Reduce Local Work Alias -> Map Local Tables: c Fetch Operator limit: -1 Alias -> Map Local Operator Tree: c TableScan alias: c Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col3 is not null (type: boolean) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 _col12 (type: string) 1 col3 (type: string) Stage: Stage-6 Map Reduce Map Operator Tree: TableScan Map Join Operator condition map: Inner Join 0 to 1 keys: 0 _col12 (type: string) 1 col3 (type: string) outputColumnNames: _col0, _col11, _col12, _col23 Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean) Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Stage: Stage-3 Map Reduce Map Operator Tree: TableScan Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-13 Map Reduce Local Work Alias -> Map Local Tables: $INTNAME Fetch Operator limit: -1 Alias -> Map Local Operator Tree: $INTNAME TableScan HashTable Sink Operator keys: 0 _col12 (type: string) 1 col3 (type: string) Stage: Stage-7 Map Reduce Map Operator Tree: TableScan alias: c Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col3 is not null (type: boolean) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 _col12 (type: string) 1 col3 (type: string) outputColumnNames: _col0, _col11, _col12, _col23 Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean) Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Stage: Stage-2 Map Reduce Map Operator Tree: TableScan Reduce Output Operator key expressions: _col12 (type: string) sort order: + Map-reduce partition columns: _col12 (type: string) Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: string), _col11 (type: string) TableScan alias: c Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col3 is not null (type: boolean) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col3 (type: string) sort order: + Map-reduce partition columns: col3 (type: string) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 keys: 0 _col12 (type: string) 1 col3 (type: string) outputColumnNames: _col0, _col11, _col12, _col23 Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean) Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col0 (type: string) sort order: + Map-reduce partition columns: col0 (type: string) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (col1 is not null and col2 is not null) (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: col1 (type: string) sort order: + Map-reduce partition columns: col1 (type: string) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE value expressions: col2 (type: string) Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11, _col12 Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSinkHere are 2 steps here:
Step 1: (a joins b);
Step 2: (Output_of_Step1 joins c)
For step 1, there are 2 options in explain plan:
Option 1: Common Join -- Stage-1
Option 2: Map Join(Put a in memory) -- Stage-14 and Stage-10
For step 2, there are 3 options in explain plan:
Option 1: Common Join -- Stage-2
Option 2: Map Join(Put Output_of_Step1 in memory) -- Stage-13 and Stage-7
Option 3: Map Join(Put c in memory) -- Stage-12 and Stage-6
If we actually execute this query, 2 Map Join will be chosen just because hive.mapjoin.smalltable.filesize is increased to large enough:
Stage-14 is selected by condition resolver. Stage-1 is filtered out by condition resolver. 2016-01-28 19:29:37 Starting to launch local task to process map join; maximum memory = 477102080 Hadoop job information for Stage-10: number of mappers: 1; number of reducers: 0 Stage-12 is filtered out by condition resolver. Stage-13 is selected by condition resolver. Stage-2 is filtered out by condition resolver. 2016-01-28 19:30:04 Starting to launch local task to process map join; maximum memory = 477102080 Hadoop job information for Stage-7: number of mappers: 1; number of reducers: 0 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 1
c. Only increase hive.auto.convert.join.noconditionaltask.size to large enough
set hive.auto.convert.join.noconditionaltask.size=95000000; set hive.mapjoin.smalltable.filesize=25000000;Explain plan:
STAGE DEPENDENCIES: Stage-9 is a root stage Stage-7 depends on stages: Stage-9 Stage-8 depends on stages: Stage-7 Stage-3 depends on stages: Stage-8 Stage-0 depends on stages: Stage-3 STAGE PLANS: Stage: Stage-9 Map Reduce Local Work Alias -> Map Local Tables: a Fetch Operator limit: -1 Alias -> Map Local Operator Tree: a TableScan alias: a Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col0 is not null (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 col0 (type: string) 1 col1 (type: string) Stage: Stage-7 Map Reduce Map Operator Tree: TableScan alias: b Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: (col1 is not null and col2 is not null) (type: boolean) Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 col0 (type: string) 1 col1 (type: string) outputColumnNames: _col0, _col11, _col12 Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Local Work: Map Reduce Local Work Stage: Stage-8 Map Reduce Local Work Alias -> Map Local Tables: c Fetch Operator limit: -1 Alias -> Map Local Operator Tree: c TableScan alias: c Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: col3 is not null (type: boolean) Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 _col12 (type: string) 1 col3 (type: string) Stage: Stage-3 Map Reduce Map Operator Tree: TableScan Map Join Operator condition map: Inner Join 0 to 1 keys: 0 _col12 (type: string) 1 col3 (type: string) outputColumnNames: _col0, _col11, _col12, _col23 Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean) Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Select Operator Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Local Work: Map Reduce Local Work Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSinkNo matter how small hive.mapjoin.smalltable.filesize is, as long as hive.auto.convert.join.noconditionaltask.size(95MB) > sum of n-1 table sizes(30MB+60MB), there is no any "Conditional Operator".
Both steps are map join -- Stage-3 and Stage-7.
Explain plan output is much easier to read and understood now.
Conclusion
1. Normally "Conditional Operator" exists and Hive checks hive.mapjoin.smalltable.filesize to decide if a common join or a map join should be chosen.2. When hive.auto.convert.join.noconditionaltask.size > sum of size for n-1 of the tables/partitions for an n-way join, then map join will be chosen regardless of hive.mapjoin.smalltable.filesize.
Be aware that, if you decide to increase hive.auto.convert.join.noconditionaltask.size to force a n-way join to use map join, make sure you know the potential size of each join output.
Because the size of output (30MB joins 60MB) could be 0 bytes, and also could be GBs. It all depends on the cardinality and the join condition.
I think that is probably why by default hive.auto.convert.join.noconditionaltask.size is only 10MB so that people will not abuse this configuration.
Shouldn't all the plans created be the same for a particular query? Which means the Hive driver create permutation of all possible ways in which queries could be run? like the plan for the second query, which draws out plans for both common join and a map join. Of course, its only during execution, will a certain path in the query plan(map join or common join) be executed. the actual selection of the path could be based off of hive.auto.convert.join.noconditionaltask.size and
ReplyDeletehive.mapjoin.smalltable.filesize
Also, i'm a little confused with the differences in the above properties. Could you elaborate a little. I couldn't get any help online. Its the same copy pasted thing everywhere.
Thanks
Abhilash
If you think like a query planner, of course, all plans should be in consideration.
DeleteHowever the explain plan output may not print all possible plans due to the value of some parameters such as hive.auto.convert.join.noconditionaltask.size and hive.mapjoin.smalltable.filesize.
Look at Plan a in 2-way join, the map join plan is not printed simply because: the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.
Thanks for your prompt reply.
DeleteSo why is it that in the 2 way join example (with hive.auto.convert.join.noconditionaltask.size=10000000 and hive.mapjoin.smalltable.filesize=31000000) does the query plan output show both stage 1 and stage 5? Shouldn't the plan show only the stage 5?
Thanks,
Abhilash
Also, when we talk of the "conditional task", do we mean checking the size of the small table or checking the size of the n-1 table/partitions? The reason why i'm confused with these 2 parameters is that i do not understand the need for hive.auto.convert.join.noconditionaltask.size, when hive.mapjoin.smalltable.filesize is already serving the purpose in an n way join.
Delete~Abhilash
check my latest reply below.
DeleteIf you do not want "conditional task", the only way is to make sure hive.auto.convert.join.noconditionaltask.size is large enough.
ReplyDeleteCheck the code ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java:
// If sizes of at least n-1 tables in a n-way join is known, and their sum is smaller than
// the threshold size, convert the join into map-join and don't create a conditional task
boolean convertJoinMapJoin = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK);
Then:
MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition);
newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
replaceTask(currTask, newTask, physicalContext);
After that , the code logic is to check individual table size VS hive.mapjoin.smalltable.filesize. And add possible mapjoin task into "conditional task".
The reason why they did not remove "common join" is probably because the statistics may not be accurate during planning time. Hive want to make the final decision at runtime according to the size of each table(or results).
Great post thannks
ReplyDelete