Thursday, January 28, 2016

Difference between hive.mapjoin.smalltable.filesize and hive.auto.convert.join.noconditionaltask.size

Goal

This article explains the difference between below 2 Hive configurations which control if common join can be converted to map join:
  • hive.mapjoin.smalltable.filesize
  • hive.auto.convert.join.noconditionaltask.size

Env

All tests are done in Hive 1.2.0.
Below 3 tables are created in advance:

Table Name Table Size
t30m 30MB
t60m 60MB
t90m 90MB

Concept

  • hive.auto.convert.join
Starting from Hive 0.11.0, it is true by default which means Hive can convert common join to map join based on the input file size.
  • hive.auto.convert.join.noconditionaltask
  • hive.auto.convert.join.noconditionaltask.size
Added in Hive 0.11.0, and it is true by default which means, if the sum of size for n-1 of the tables/partitions for an n-way join is smaller than the size specified by hive.auto.convert.join.noconditionaltask.size(10MB by default), the join is directly converted to a mapjoin (there is no conditional task).
  •  hive.mapjoin.smalltable.filesize
Added in Hive 0.8.0, and it is 25MB by default which means, if the input file size is smaller than this threshold, Hive will try to convert the common join into map join.

Hands-on Experience

2-way join

Let's explain this behavior using the simplest 2-way join firstly.
select count(*) from
t30m a,t60m b
where a.col0=b.col1 ;

a. Default configuration

set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=25000000;
Explain plan:
STAGE DEPENDENCIES:
  Stage-6 is a root stage , consists of Stage-1
  Stage-1
  Stage-2 depends on stages: Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-6
    Conditional Operator

  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col0 (type: string)
                sort order: +
                Map-reduce partition columns: col0 (type: string)
                Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col1 is not null (type: boolean)
              Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col1 (type: string)
                sort order: +
                Map-reduce partition columns: col1 (type: string)
                Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 col0 (type: string)
            1 col1 (type: string)
          outputColumnNames: _col0, _col11
          Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: (_col0 = _col11) (type: boolean)
            Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count()
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              sort order:
              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
Stage-6(Conditional Operator) shows up because the sum of (n -1) file input size(30MB) > hive.auto.convert.join.noconditionaltask.size(10MB).
There is only common join Stage-1 because the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.
As a result, if we execute this query, common join will be used for sure.

b. Only increase hive.mapjoin.smalltable.filesize to large enough

set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=31000000;
Explain plan:
STAGE DEPENDENCIES:
  Stage-6 is a root stage , consists of Stage-7, Stage-1
  Stage-7 has a backup stage: Stage-1
  Stage-5 depends on stages: Stage-7
  Stage-2 depends on stages: Stage-1, Stage-5
  Stage-1
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-6
    Conditional Operator

  Stage: Stage-7
    Map Reduce Local Work
      Alias -> Map Local Tables:
        a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        a
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)

  Stage: Stage-5
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col1 is not null (type: boolean)
              Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)
                outputColumnNames: _col0, _col11
                Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
                Filter Operator
                  predicate: (_col0 = _col11) (type: boolean)
                  Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
                    Group By Operator
                      aggregations: count()
                      mode: hash
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                      File Output Operator
                        compressed: false
                        table:
                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                            serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              sort order:
              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col0 (type: string)
                sort order: +
                Map-reduce partition columns: col0 (type: string)
                Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col1 is not null (type: boolean)
              Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col1 (type: string)
                sort order: +
                Map-reduce partition columns: col1 (type: string)
                Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 col0 (type: string)
            1 col1 (type: string)
          outputColumnNames: _col0, _col11
          Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: (_col0 = _col11) (type: boolean)
            Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count()
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
Since we only increased hive.mapjoin.smalltable.filesize, the Stage-6(Conditional Operator) is still there.
And map join Stage-5 shows up together with common join Stage-1.
If we execute this query, Hive will choose from common join and map join:
Stage-7 is selected by condition resolver.
Stage-1 is filtered out by condition resolver.
2016-01-28 18:43:11 Starting to launch local task to process map join; maximum memory = 477102080

c. Only increase hive.auto.convert.join.noconditionaltask.size to large enough

set hive.auto.convert.join.noconditionaltask.size=31000000;
set hive.mapjoin.smalltable.filesize=25000000;
Explain plan:
STAGE DEPENDENCIES:
  Stage-5 is a root stage
  Stage-2 depends on stages: Stage-5
  Stage-0 depends on stages: Stage-2

STAGE PLANS:
  Stage: Stage-5
    Map Reduce Local Work
      Alias -> Map Local Tables:
        a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        a
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)
  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col1 is not null (type: boolean)
              Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)
                outputColumnNames: _col0, _col11
                Statistics: Num rows: 702873 Data size: 33305395 Basic stats: COMPLETE Column stats: NONE
                Filter Operator
                  predicate: (_col0 = _col11) (type: boolean)
                  Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    Statistics: Num rows: 351436 Data size: 16652673 Basic stats: COMPLETE Column stats: NONE
                    Group By Operator
                      aggregations: count()
                      mode: hash
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                      Reduce Output Operator
                        sort order:
                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                        value expressions: _col0 (type: bigint)
      Local Work:
        Map Reduce Local Work
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
Although hive.mapjoin.smalltable.filesize is default and pretty small, hive.auto.convert.join.noconditionaltask.size is large enough to avoid "Conditional Operator" stage and make sure "Map Join" stage-5 is the only choice.
Total jobs = 1
2016-01-28 18:45:41 Starting to launch local task to process map join; maximum memory = 477102080

3-way join

Take below join for example:
select count(*) from
t30m a,t60m b,t90m c 
where a.col0=b.col1 and b.col2=c.col3;

a. Default configuration

set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=25000000;
Explain plan:
STAGE DEPENDENCIES:
  Stage-11 is a root stage , consists of Stage-1
  Stage-1
  Stage-8 depends on stages: Stage-1 , consists of Stage-12, Stage-13, Stage-2
  Stage-12 has a backup stage: Stage-2
  Stage-6 depends on stages: Stage-12
  Stage-3 depends on stages: Stage-2, Stage-6, Stage-7
  Stage-13 has a backup stage: Stage-2
  Stage-7 depends on stages: Stage-13
  Stage-2
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-11
    Conditional Operator

  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col0 (type: string)
                sort order: +
                Map-reduce partition columns: col0 (type: string)
                Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (col1 is not null and col2 is not null) (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col1 (type: string)
                sort order: +
                Map-reduce partition columns: col1 (type: string)
                Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
                value expressions: col2 (type: string)
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 col0 (type: string)
            1 col1 (type: string)
          outputColumnNames: _col0, _col11, _col12
          Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-8
    Conditional Operator

  Stage: Stage-12
    Map Reduce Local Work
      Alias -> Map Local Tables:
        c
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        c
          TableScan
            alias: c
            Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col3 is not null (type: boolean)
              Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 _col12 (type: string)
                  1 col3 (type: string)

  Stage: Stage-6
    Map Reduce
      Map Operator Tree:
          TableScan
            Map Join Operator
              condition map:
                   Inner Join 0 to 1
              keys:
                0 _col12 (type: string)
                1 col3 (type: string)
              outputColumnNames: _col0, _col11, _col12, _col23
              Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
                Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                  Group By Operator
                    aggregations: count()
                    mode: hash
                    outputColumnNames: _col0
                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              sort order:
              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-13
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $INTNAME
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $INTNAME
          TableScan
            HashTable Sink Operator
              keys:
                0 _col12 (type: string)
                1 col3 (type: string)

  Stage: Stage-7
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: c
            Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col3 is not null (type: boolean)
              Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 _col12 (type: string)
                  1 col3 (type: string)
                outputColumnNames: _col0, _col11, _col12, _col23
                Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
                Filter Operator
                  predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
                  Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                    Group By Operator
                      aggregations: count()
                      mode: hash
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                      File Output Operator
                        compressed: false
                        table:
                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                            serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col12 (type: string)
              sort order: +
              Map-reduce partition columns: _col12 (type: string)
              Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: string), _col11 (type: string)
          TableScan
            alias: c
            Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col3 is not null (type: boolean)
              Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col3 (type: string)
                sort order: +
                Map-reduce partition columns: col3 (type: string)
                Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 _col12 (type: string)
            1 col3 (type: string)
          outputColumnNames: _col0, _col11, _col12, _col23
          Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
            Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count()
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
Stage-11 and Stage-8 (Conditional Operator) show up because the sum of (n -1) file input size(30MB+60MB) > hive.auto.convert.join.noconditionaltask.size(10MB).
Here are 2 steps here:
Step 1:  (a joins b);
Step 2: (Output_of_Step1 joins c)

For step 1, it is same as 2-way join: There is only common join Stage-1 because the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.

For step 2, there are 3 options in explain plan:
   Option 1: Common Join      -- Stage-2
   Option 2: Map Join(Put Output_of_Step1 in memory)  -- Stage-13 and Stage-7
   Option 3: Map Join(Put c in memory) -- Stage-12 and Stage-6
Since the Output_of_Step1 < hive.mapjoin.smalltable.filesize, Option 2 is chosen when we actually execute this query:
Stage-1 is selected by condition resolver.
Stage-12 is filtered out by condition resolver.
Stage-13 is selected by condition resolver.
Stage-2 is filtered out by condition resolver.
2016-01-28 18:53:01 Starting to launch local task to process map join; maximum memory = 477102080
Hadoop job information for Stage-7: number of mappers: 1; number of reducers: 0
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 1

MapReduce Jobs Launched:
Stage-Stage-1: Map: 3  Reduce: 1   Cumulative CPU: 25.66 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
Stage-Stage-7: Map: 1   Cumulative CPU: 5.07 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS
Stage-Stage-3: Map: 1  Reduce: 1   Cumulative CPU: 2.27 sec   MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS

b. Only increase hive.mapjoin.smalltable.filesize to large enough

set hive.auto.convert.join.noconditionaltask.size=10000000;
set hive.mapjoin.smalltable.filesize=31000000;
Explain plan:
STAGE DEPENDENCIES:
  Stage-11 is a root stage , consists of Stage-14, Stage-1
  Stage-14 has a backup stage: Stage-1
  Stage-10 depends on stages: Stage-14
  Stage-8 depends on stages: Stage-1, Stage-10 , consists of Stage-12, Stage-13, Stage-2
  Stage-12 has a backup stage: Stage-2
  Stage-6 depends on stages: Stage-12
  Stage-3 depends on stages: Stage-2, Stage-6, Stage-7
  Stage-13 has a backup stage: Stage-2
  Stage-7 depends on stages: Stage-13
  Stage-2
  Stage-1
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-11
    Conditional Operator

  Stage: Stage-14
    Map Reduce Local Work
      Alias -> Map Local Tables:
        a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        a
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)

  Stage: Stage-10
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (col1 is not null and col2 is not null) (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)
                outputColumnNames: _col0, _col11, _col12
                Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-8
    Conditional Operator

  Stage: Stage-12
    Map Reduce Local Work
      Alias -> Map Local Tables:
        c
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        c
          TableScan
            alias: c
            Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col3 is not null (type: boolean)
              Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 _col12 (type: string)
                  1 col3 (type: string)

  Stage: Stage-6
    Map Reduce
      Map Operator Tree:
          TableScan
            Map Join Operator
              condition map:
                   Inner Join 0 to 1
              keys:
                0 _col12 (type: string)
                1 col3 (type: string)
              outputColumnNames: _col0, _col11, _col12, _col23
              Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
                Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                  Group By Operator
                    aggregations: count()
                    mode: hash
                    outputColumnNames: _col0
                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      table:
                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              sort order:
              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-13
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $INTNAME
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $INTNAME
          TableScan
            HashTable Sink Operator
              keys:
                0 _col12 (type: string)
                1 col3 (type: string)

  Stage: Stage-7
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: c
            Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col3 is not null (type: boolean)
              Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 _col12 (type: string)
                  1 col3 (type: string)
                outputColumnNames: _col0, _col11, _col12, _col23
                Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
                Filter Operator
                  predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
                  Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                    Group By Operator
                      aggregations: count()
                      mode: hash
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                      File Output Operator
                        compressed: false
                        table:
                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                            serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col12 (type: string)
              sort order: +
              Map-reduce partition columns: _col12 (type: string)
              Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: string), _col11 (type: string)
          TableScan
            alias: c
            Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col3 is not null (type: boolean)
              Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col3 (type: string)
                sort order: +
                Map-reduce partition columns: col3 (type: string)
                Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 _col12 (type: string)
            1 col3 (type: string)
          outputColumnNames: _col0, _col11, _col12, _col23
          Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
          Filter Operator
            predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
            Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
              Group By Operator
                aggregations: count()
                mode: hash
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col0 (type: string)
                sort order: +
                Map-reduce partition columns: col0 (type: string)
                Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (col1 is not null and col2 is not null) (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: col1 (type: string)
                sort order: +
                Map-reduce partition columns: col1 (type: string)
                Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
                value expressions: col2 (type: string)
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 col0 (type: string)
            1 col1 (type: string)
          outputColumnNames: _col0, _col11, _col12
          Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
Here are 2 steps here:
Step 1:  (a joins b);
Step 2: (Output_of_Step1 joins c)

For step 1, there are 2 options in explain plan:
   Option 1: Common Join      -- Stage-1
   Option 2: Map Join(Put a in memory)  -- Stage-14 and Stage-10

For step 2, there are 3 options in explain plan:

   Option 1: Common Join      -- Stage-2
   Option 2: Map Join(Put Output_of_Step1 in memory)  -- Stage-13 and Stage-7
   Option 3: Map Join(Put c in memory) -- Stage-12 and Stage-6
If we actually execute this query, 2 Map Join will be chosen just because hive.mapjoin.smalltable.filesize is increased to large enough:
Stage-14 is selected by condition resolver.
Stage-1 is filtered out by condition resolver.
2016-01-28 19:29:37 Starting to launch local task to process map join;  maximum memory = 477102080
Hadoop job information for Stage-10: number of mappers: 1; number of reducers: 0

Stage-12 is filtered out by condition resolver.
Stage-13 is selected by condition resolver.
Stage-2 is filtered out by condition resolver.
2016-01-28 19:30:04 Starting to launch local task to process map join;  maximum memory = 477102080
Hadoop job information for Stage-7: number of mappers: 1; number of reducers: 0
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 1

c. Only increase hive.auto.convert.join.noconditionaltask.size to large enough

set hive.auto.convert.join.noconditionaltask.size=95000000;
set hive.mapjoin.smalltable.filesize=25000000;
Explain plan:
STAGE DEPENDENCIES:
  Stage-9 is a root stage
  Stage-7 depends on stages: Stage-9
  Stage-8 depends on stages: Stage-7
  Stage-3 depends on stages: Stage-8
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-9
    Map Reduce Local Work
      Alias -> Map Local Tables:
        a
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        a
          TableScan
            alias: a
            Statistics: Num rows: 638976 Data size: 30277632 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col0 is not null (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)

  Stage: Stage-7
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: b
            Statistics: Num rows: 1277952 Data size: 60555264 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: (col1 is not null and col2 is not null) (type: boolean)
              Statistics: Num rows: 319488 Data size: 15138816 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 col0 (type: string)
                  1 col1 (type: string)
                outputColumnNames: _col0, _col11, _col12
                Statistics: Num rows: 351436 Data size: 16652697 Basic stats: COMPLETE Column stats: NONE
                File Output Operator
                  compressed: false
                  table:
                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      Local Work:
        Map Reduce Local Work

  Stage: Stage-8
    Map Reduce Local Work
      Alias -> Map Local Tables:
        c
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        c
          TableScan
            alias: c
            Statistics: Num rows: 1916928 Data size: 90832896 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: col3 is not null (type: boolean)
              Statistics: Num rows: 958464 Data size: 45416448 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 _col12 (type: string)
                  1 col3 (type: string)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            Map Join Operator
              condition map:
                   Inner Join 0 to 1
              keys:
                0 _col12 (type: string)
                1 col3 (type: string)
              outputColumnNames: _col0, _col11, _col12, _col23
              Statistics: Num rows: 1054310 Data size: 49958093 Basic stats: COMPLETE Column stats: NONE
              Filter Operator
                predicate: ((_col0 = _col11) and (_col12 = _col23)) (type: boolean)
                Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  Statistics: Num rows: 263577 Data size: 12489499 Basic stats: COMPLETE Column stats: NONE
                  Group By Operator
                    aggregations: count()
                    mode: hash
                    outputColumnNames: _col0
                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                    Reduce Output Operator
                      sort order:
                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                      value expressions: _col0 (type: bigint)
      Local Work:
        Map Reduce Local Work
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          mode: mergepartial
          outputColumnNames: _col0
          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink
No matter how small hive.mapjoin.smalltable.filesize is, as long as hive.auto.convert.join.noconditionaltask.size(95MB)  > sum of n-1 table sizes(30MB+60MB), there is no any "Conditional Operator".
Both steps are map join -- Stage-3 and Stage-7.
Explain plan output is much easier to read and understood now.

Conclusion

1. Normally "Conditional Operator" exists and Hive checks hive.mapjoin.smalltable.filesize to decide if a common join or a map join should be chosen.
2. When hive.auto.convert.join.noconditionaltask.size > sum of size for n-1 of the tables/partitions for an n-way join, then map join will be chosen regardless of hive.mapjoin.smalltable.filesize.

Be aware that, if you decide to increase hive.auto.convert.join.noconditionaltask.size to force a n-way join to use map join, make sure you know the potential size of each join output.
Because the size of output (30MB joins 60MB) could be 0 bytes, and also could be GBs. It all depends on the cardinality and the join condition.
I think that is probably why by default hive.auto.convert.join.noconditionaltask.size is only 10MB so that people will not abuse this configuration.

7 comments:

  1. Shouldn't all the plans created be the same for a particular query? Which means the Hive driver create permutation of all possible ways in which queries could be run? like the plan for the second query, which draws out plans for both common join and a map join. Of course, its only during execution, will a certain path in the query plan(map join or common join) be executed. the actual selection of the path could be based off of hive.auto.convert.join.noconditionaltask.size and
    hive.mapjoin.smalltable.filesize

    Also, i'm a little confused with the differences in the above properties. Could you elaborate a little. I couldn't get any help online. Its the same copy pasted thing everywhere.

    Thanks
    Abhilash

    ReplyDelete
    Replies
    1. If you think like a query planner, of course, all plans should be in consideration.
      However the explain plan output may not print all possible plans due to the value of some parameters such as hive.auto.convert.join.noconditionaltask.size and hive.mapjoin.smalltable.filesize.

      Look at Plan a in 2-way join, the map join plan is not printed simply because: the smaller table size(30MB) > hive.mapjoin.smalltable.filesize.

      Delete
    2. Thanks for your prompt reply.

      So why is it that in the 2 way join example (with hive.auto.convert.join.noconditionaltask.size=10000000 and hive.mapjoin.smalltable.filesize=31000000) does the query plan output show both stage 1 and stage 5? Shouldn't the plan show only the stage 5?

      Thanks,
      Abhilash

      Delete
    3. Also, when we talk of the "conditional task", do we mean checking the size of the small table or checking the size of the n-1 table/partitions? The reason why i'm confused with these 2 parameters is that i do not understand the need for hive.auto.convert.join.noconditionaltask.size, when hive.mapjoin.smalltable.filesize is already serving the purpose in an n way join.

      ~Abhilash

      Delete
    4. check my latest reply below.

      Delete
  2. If you do not want "conditional task", the only way is to make sure hive.auto.convert.join.noconditionaltask.size is large enough.
    Check the code ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java:

    // If sizes of at least n-1 tables in a n-way join is known, and their sum is smaller than
    // the threshold size, convert the join into map-join and don't create a conditional task
    boolean convertJoinMapJoin = HiveConf.getBoolVar(conf,
    HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK);

    Then:
    MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition);

    newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
    replaceTask(currTask, newTask, physicalContext);


    After that , the code logic is to check individual table size VS hive.mapjoin.smalltable.filesize. And add possible mapjoin task into "conditional task".
    The reason why they did not remove "common join" is probably because the statistics may not be accurate during planning time. Hive want to make the final decision at runtime according to the size of each table(or results).


    ReplyDelete

Popular Posts