Env:
Drill 1.0Theory:
- A query is broken into multiple phases called Major Fragments
- Major Fragments can be materialized into one or more Minor Fragments
- Minor Fragments contain one or more relational Operators
- Drill execution system works on a coordinate system of these IDs:
QueryID:MajorFragmentId:MinorFragmentId:OperatorId. - Coordinates of trees are numbered from the root (with root always being zero)
- Logical Plan describes what we want to do, but not how
- Physical Plan is composed of Major Fragments but Minor Fragments are not defined yet.
- Major Fragments are separated by Exchanges
Goal:
Understand concepts -- Major/Minor Fragments, Operators, Logical/Physical Plan, Exchanges.Workshop:
This workshop uses below SQL to explain:>select a.* from dfs.root.`user/hive/warehouse/passwords_csv` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b where a.columns[1]=b.columns[1] order by a.columns[3] desc limit 5;
1. Logical Plan
0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> !set maxwidth 100000000 0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> explain plan without implementation for . . . . . . . . . . . . . . . . . . . . . . .> select a.* from dfs.root.`user/hive/warehouse/passwords_csv` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b . . . . . . . . . . . . . . . . . . . . . . .> where a.columns[1]=b.columns[1] order by a.columns[3] desc limit 5; +------+------+ | text | json | +------+------+ | DrillScreenRel DrillProjectRel(*=[$0]) DrillLimitRel(fetch=[5]) DrillSortRel(sort0=[$1], dir0=[DESC]) DrillProjectRel(*=[$0], EXPR$1=[$1]) DrillProjectRel(*=[$1], ITEM=[$2], ITEM2=[$3], ITEM0=[$0]) DrillJoinRel(condition=[=($3, $0)], joinType=[inner]) DrillProjectRel(ITEM=[ITEM($0, 1)]) DrillScanRel(table=[[dfs, root, user/hive/warehouse/passwords_csv_2]], groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) DrillProjectRel(*=[$0], ITEM=[ITEM($1, 3)], ITEM2=[ITEM($1, 1)]) DrillScanRel(table=[[dfs, root, user/hive/warehouse/passwords_csv]], groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv, numFiles=1, columns=[`*`], files=[maprfs:/user/hive/warehouse/passwords_csv/000000_0]]])
2. Physical Plan
0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> !set maxwidth 100000000 0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> explain plan for . . . . . . . . . . . . . . . . . . . . . . .> select a.* from dfs.root.`user/hive/warehouse/passwords_csv` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b . . . . . . . . . . . . . . . . . . . . . . .> where a.columns[1]=b.columns[1] order by a.columns[3] desc limit 5; +------+------+ | text | json | +------+------+ | 00-00 Screen 00-01 Project(*=[$0]) 00-02 Project(T2¦¦*=[$0]) 00-03 SelectionVectorRemover 00-04 Limit(fetch=[5]) 00-05 SingleMergeExchange(sort0=[1 DESC]) 01-01 SelectionVectorRemover 01-02 TopN(limit=[5]) 01-03 Project(T2¦¦*=[$0], ITEM0=[$1]) 01-04 HashToRandomExchange(dist0=[[$1]]) 02-01 UnorderedMuxExchange 03-01 Project(T2¦¦*=[$0], ITEM0=[$1], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($1))]) 03-02 Project(T2¦¦*=[$0], ITEM0=[$1]) 03-03 Project(T2¦¦*=[$1], ITEM0=[$2], ITEM2=[$3], ITEM=[$0]) 03-04 HashJoin(condition=[=($3, $0)], joinType=[inner]) 03-06 Project(ITEM=[$0]) 03-08 HashToRandomExchange(dist0=[[$0]]) 04-01 UnorderedMuxExchange 06-01 Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) 06-02 Project(ITEM=[ITEM($0, 1)]) 06-03 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) 03-05 Project(T2¦¦*=[$0], ITEM0=[$1], ITEM2=[$2]) 03-07 Project(T2¦¦*=[$0], ITEM=[$1], ITEM2=[$2]) 03-09 HashToRandomExchange(dist0=[[$2]]) 05-01 UnorderedMuxExchange 07-01 Project(T2¦¦*=[$0], ITEM=[$1], ITEM2=[$2], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($2))]) 07-02 Project(T2¦¦*=[$0], ITEM=[ITEM($1, 3)], ITEM2=[ITEM($1, 1)]) 07-03 Project(T2¦¦*=[$0], columns=[$1]) 07-04 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv, numFiles=1, columns=[`*`], files=[maprfs:/user/hive/warehouse/passwords_csv/000000_0]]])Note: The first column of physical plan is in this format: MajorFragmentId-OperatorId.
3. QueryId
The QueryId for this query is 2aa81e0e-dfe2-1e2b-06c0-972ac0c8ccd0.Check visualized physical plan on Drill web UI:
http://<Drillbit_IP>:8047/profiles/2aa81e0e-dfe2-1e2b-06c0-972ac0c8ccd0
Below is a snippet of the plan:
4. Exchanges
Above white color major fragments are exchanges.Exchanges are composed of Sender&Receiver.
Eg: Major Fragment 04 contains:
5. Major/Minor Fragments
Minor Fragments show the parallelization of that Major Fragment.For example, from Profile Overview, Major Fragment 03 contains 16 Minor Fragments:
From Major Fragment 03 Profile, we can dig into the statistics for each Minor Fragment:
Note: Above "xx" is like a wildcard.
For example:
03-15-xx means the statistics of MajorFragment03-MinorFragment15-<ALL Operators>.
04-xx-01 means the statistics of MajorFragment04-<ALL MinorFragments>-Operator01.
Reference:
Youtube: Drill Configuration Options (23:00)EXPLAIN Commands
No comments:
Post a Comment