Goal:
Certain simple Hive queries can utilize fetch task, which can avoid the overhead of starting MapReduce job.This article explains how to enable this feature.
Solution:
1. hive.fetch.task.conversion
This parameter controls which kind of simple query can be converted to a single fetch task.It was added in Hive 0.10 per HIVE-2925.
- Value "none" is added in Hive 0.14 to disable this feature, per HIVE-8389.
- Value "minimal" means SELECT *, FILTER on partition columns (WHERE and HAVING clauses), LIMIT only.
Look at below explain plans:
hive> set hive.fetch.task.conversion; hive.fetch.task.conversion=minimal hive> explain select * from passwords limit 1; OK STAGE DEPENDENCIES: Stage-0 is a root stage STAGE PLANS: Stage: Stage-0 Fetch Operator limit: 1 Processor Tree: TableScan alias: passwords Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: col0 (type: string), col1 (type: string), col2 (type: string), col3 (type: string), col4 (type: string), col5 (type: string), col6 (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6 Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE Limit Number of rows: 1 Statistics: Num rows: 1 Data size: 47 Basic stats: COMPLETE Column stats: NONE ListSink hive> explain select col0 from passwords limit 1; OK STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: passwords Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE Limit Number of rows: 1 Statistics: Num rows: 1 Data size: 47 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 47 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: 1
- Value "more" means SELECT, FILTER, LIMIT only (including TABLESAMPLE, virtual columns)."more" can take any kind of expressions in the SELECT clause, including UDFs.(UDTFs and lateral views are not yet supported – see HIVE-5718.)
See below explain plan:
hive> set hive.fetch.task.conversion=more; hive> explain select col0 from passwords ; OK STAGE DEPENDENCIES: Stage-0 is a root stage STAGE PLANS: Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: TableScan alias: passwords Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: col0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: NONE ListSinkAnd it also allows "select * " with "filter" to use fetch task, eg:
select * from passwords where col0='root';So we have to use "more" very carefully and only use it after fully testing.
Because fetch can not utilize the parallelism of MapReduce framework.
2. hive.fetch.task.aggr
Aggregation queries with no group-by clause (for example, select count(*) from src) execute final aggregations in a single reduce task. If this parameter is set to true, Hive delegates the final aggregation stage to a fetch task, possibly decreasing the query time.It was added in Hive 0.12 per HIVE-4002.
It can help when the output of the mappers are very small, by reducing the time for shuffle and launching reduce task.
For example, compare below differences of explain plan:
hive> set hive.fetch.task.aggr; hive.fetch.task.aggr=false hive> explain select count(*) from passwords; OK STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: passwords Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE Select Operator Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col0 (type: bigint) Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col0 (type: bigint) outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Time taken: 1.471 seconds, Fetched: 44 row(s) hive> set hive.fetch.task.aggr=true; hive> explain select count(*) from passwords; OK STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: passwords Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE Select Operator Statistics: Num rows: 9963904 Data size: 477218560 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: Group By Operator aggregations: count(_col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: _col0 (type: bigint) outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE ListSink Time taken: 0.148 seconds, Fetched: 40 row(s)Here are the performance differences:
hive> set hive.fetch.task.aggr=false; hive> select count(*) from passwords; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapred.reduce.tasks=<number> Starting Job = job_201501081639_0044, Tracking URL = http://n1a.mycluster2.com:50030/jobdetails.jsp?jobid=job_201501081639_0044 Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job -kill job_201501081639_0044 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2015-01-15 12:10:15,706 Stage-1 map = 0%, reduce = 0% 2015-01-15 12:10:26,764 Stage-1 map = 47%, reduce = 0%, Cumulative CPU 6.65 sec 2015-01-15 12:10:27,776 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 8.31 sec 2015-01-15 12:10:32,808 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 9.27 sec MapReduce Total cumulative CPU time: 9 seconds 270 msec Ended Job = job_201501081639_0044 MapReduce Jobs Launched: Job 0: Map: 1 Reduce: 1 Cumulative CPU: 9.27 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 9 seconds 270 msec OK 9963904 Time taken: 19.505 seconds, Fetched: 1 row(s) hive> set hive.fetch.task.aggr=true; hive> select count(*) from passwords; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_201501081639_0045, Tracking URL = http://n1a.mycluster2.com:50030/jobdetails.jsp?jobid=job_201501081639_0045 Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job -kill job_201501081639_0045 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2015-01-15 12:10:47,231 Stage-1 map = 0%, reduce = 0% 2015-01-15 12:10:58,301 Stage-1 map = 47%, reduce = 0% 2015-01-15 12:10:59,308 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 7.26 sec MapReduce Total cumulative CPU time: 7 seconds 260 msec Ended Job = job_201501081639_0045 MapReduce Jobs Launched: Job 0: Map: 1 Cumulative CPU: 7.26 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 7 seconds 260 msec OK 9963904 Time taken: 13.817 seconds, Fetched: 1 row(s)Note, if the query has "group-by", it can not use this feature. For example:
select count(*) from passwords group by col0;
3. hive.fetch.task.conversion.threshold
This parameter controls input threshold (in bytes) for applying hive.fetch.task.conversion.It was added in Hive 0.13 per HIVE-3990, default is "-1" to disable the feature;
The default value was changed in Hive 0.14 to 1GB(1073741824) per HIVE-7397.
Take above example, the table size of "passwords" is 465MB:
[root@n3a passwords]# pwd /mapr/my.cluster.com/user/hive/warehouse/passwords [root@n3a passwords]# du -sh . 465M .If hive.fetch.task.conversion.threshold is less than the table size, it will use MapReduce Job:
hive> set hive.fetch.task.conversion.threshold=100000000; hive> select * from passwords limit 1; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_201501081639_0046, Tracking URL = http://n1a.mycluster2.com:50030/jobdetails.jsp?jobid=job_201501081639_0046 Kill Command = /opt/mapr/hadoop/hadoop-0.20.2/bin/../bin/hadoop job -kill job_201501081639_0046 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0 2015-01-15 12:19:06,474 Stage-1 map = 0%, reduce = 0% 2015-01-15 12:19:11,496 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 0.85 sec MapReduce Total cumulative CPU time: 850 msec Ended Job = job_201501081639_0046 MapReduce Jobs Launched: Job 0: Map: 1 Cumulative CPU: 0.85 sec MAPRFS Read: 0 MAPRFS Write: 0 SUCCESS Total MapReduce CPU Time Spent: 850 msec OK root x 0 0 root /root /bin/bash Time taken: 6.698 seconds, Fetched: 1 row(s)Else, it will only use fetch task:
hive> set hive.fetch.task.conversion.threshold=600000000;
hive> select * from passwords limit 1;
OK
root x 0 0 root /root /bin/bash
Time taken: 0.325 seconds, Fetched: 1 row(s)
Note, this parameter calculates or estimates based on the table size, not the result set size.
Nice one!!
ReplyDelete