Env:
Drill 1.0Theory:
Drill uses distributed and broadcast joins to join tables.- planner.broadcast_factor: Controls the cost of doing a broadcast when performing a join. The lower the setting, the cheaper it is to do a broadcast join compared to other types of distribution for a join, such as a hash distribution.
Default: 1 - planner.broadcast_threshold: Threshold, in terms of a number of rows, that determines whether a broadcast join is chosen for a query.
Default: 10000000
Goal:
Understand the join types.Know when/how to switch between broadcast join and distributed join.
Know how to control the parallelism of each join type.
Workshop:
1. Join Types
What are the join types in Drill? This is one of the most popular DBA interview questions.People with different backgrounds will answer the same question in different dimensions.
- SQL Developer: Inner Join, Left/Righer/Full Outer Join. [See here.]
- Traditional DBA: Hash Join, Sort-Merge Join, Nested Loops. [See here.]
- Admin of Hadoop/MPP products: Broadcast Join, Distributed Join. [See here.]
[This is the focus of this article.]
2. What are Broadcast Join and Distributed Join?
- Broadcast Join
Used for: Hash Join, Sort-Merge Join, Nested Loops.
Not work for right/full outer join, can work for inner and left outer join.
Use Case: Large (fact) table joins smaller (dimension) table.
- Distributed Join
Used for: Hash Join, Sort-Merge Join.
Use Case: 2 Large tables join.
3. How to switch between Broadcast Join and Distributed Join?
By default, broadcast join is enabled by configuration "planner.enable_broadcast_join".SQL planner can automatically choose which join type depending on the estimated cost.
Below 2 configurations can be used to manually choose which join type.
a. planner.broadcast_factor
Per Drill 1.0 source code, this configuration's logic is in BroadcastExchangePrel.java:$ grep broadcastFactor exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java final double broadcastFactor = PrelUtil.getSettings(getCluster()).getBroadcastFactor(); final double cpuCost = broadcastFactor * DrillCostBase.SVR_CPU_COST * inputRows ; final double networkCost = broadcastFactor * DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * numEndPoints;It controls the estimated CPU and network cost for broadcast.
The higher it is, the higher the cost of broadcast join is. Then Drill prefers distributed join. Vice versa.
Take below join SQL for example:
explain plan including all attributes for select count(*) from ( select a.* from dfs.root.`user/hive/warehouse/passwords_csv_middle` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b where a.columns[1]=b.columns[1] );When planner.broadcast_factor is 1 by default, this is the SQL plan including estimated costs:
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.07557531E7 rows, 1.946839431E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 22837 00-01 StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755753E7 rows, 1.94683943E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 22836 00-02 UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755752E7 rows, 1.94683931E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 22835 01-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755751E7 rows, 1.94683923E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 22834 01-02 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {2.5834828E7 rows, 1.35632847E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 22833 01-03 HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {2.0913905E7 rows, 1.15949155E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 22832 01-05 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22827 01-07 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22826 01-04 Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 22831 01-06 BroadcastExchange : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 22830 02-01 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22829 02-02 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 22828Firstly you need to understand that the cost is cumulative cost which including the cost of its children operators. For physical operator "BroadcastExchange", its CPU cost is 19991192 - 7688920 = 12302272, network cost is 18896289792 - 0 = 18896289792.
Now let's reduce planner.broadcast_factor from 1 to 0.5:
> alter session set `planner.broadcast_factor`=0.5; +-------+------------------------------------+ | ok | summary | +-------+------------------------------------+ | true | planner.broadcast_factor updated. | +-------+------------------------------------+ 1 row selected (0.068 seconds)Below is the SQL plan for the same SQL:
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.07557531E7 rows, 1.885328071E8 cpu, 0.0 io, 9.448148992E9 network, 2.7064998400000002E7 memory}, id = 23815 00-01 StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755753E7 rows, 1.88532807E8 cpu, 0.0 io, 9.448148992E9 network, 2.7064998400000002E7 memory}, id = 23814 00-02 UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755752E7 rows, 1.88532795E8 cpu, 0.0 io, 9.448148992E9 network, 2.7064998400000002E7 memory}, id = 23813 01-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755751E7 rows, 1.88532787E8 cpu, 0.0 io, 9.448144896E9 network, 2.7064998400000002E7 memory}, id = 23812 01-02 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {2.5834828E7 rows, 1.29481711E8 cpu, 0.0 io, 9.448144896E9 network, 2.7064998400000002E7 memory}, id = 23811 01-03 HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {2.0913905E7 rows, 1.09798019E8 cpu, 0.0 io, 9.448144896E9 network, 2.7064998400000002E7 memory}, id = 23810 01-05 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23805 01-07 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23804 01-04 Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 9.448144896E9 network, 0.0 memory}, id = 23809 01-06 BroadcastExchange : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 9.448144896E9 network, 0.0 memory}, id = 23808 02-01 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23807 02-02 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 23806We can see now the CPU cost becomes 13840056 - 7688920 = 6151136, and network cost becomes 9448144896 - 0 = 9448144896.
Both of the 2 estimated costs become half of previous ones which match source code logic.
Now let's increase planner.broadcast_factor to 2, and SQL plan changes to distributed join:
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.85940901E7 rows, 3.180145181E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 24814 00-01 StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.859409E7 rows, 3.18014518E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 24813 00-02 UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594089E7 rows, 3.18014506E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 24812 01-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594088E7 rows, 3.18014498E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 24811 01-02 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {4.3673165E7 rows, 2.58963422E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 24810 01-03 HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {3.8752242E7 rows, 2.3927973E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 24809 01-05 Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 24801 01-07 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 24800 02-01 UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.9683692E7 rows, 4.920923E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24799 04-01 Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.4762769E7 rows, 4.4288307E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24798 04-02 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24797 04-03 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24796 01-04 Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 24808 01-06 Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 24807 01-08 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 24806 03-01 UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {6151136.0 rows, 1.537784E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24805 05-01 Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24804 05-02 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24803 05-03 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 24802
b. planner.broadcast_threshold
Per Drill 1.0 source code, this configuration's logic is in JoinPruleBase.java:protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) { double estimatedRightRowCount = RelMetadataQuery.getRowCount(right); if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold() && ! left.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.SINGLETON) && (join.getJoinType() == JoinRelType.INNER || join.getJoinType() == JoinRelType.LEFT) ) { return true; } return false; }Broadcast join can be chosen only if the estimated row of smaller table is below planner.broadcast_threshold(10000000 rows by default).
And it only supports inner join and left outer join which means right/full outer joins are not supported.
In above SQL, the estimated row count of smaller table is 1537784.0, and broadcast join is chosen when planner.broadcast_factor is 1 by default.
Now let's see what will happen if we decrease planner.broadcast_threshold to 1537784.
0: jdbc:drill:zk=> alter session set `planner.broadcast_threshold`=1537784; +-------+---------------------------------------+ | ok | summary | +-------+---------------------------------------+ | true | planner.broadcast_threshold updated. | +-------+---------------------------------------+ 1 row selected (0.184 seconds) 0: jdbc:drill:zk=> explain plan including all attributes for . . . . . . . . . . . . . . . . . . . . . . .> select count(*) from . . . . . . . . . . . . . . . . . . . . . . .> ( . . . . . . . . . . . . . . . . . . . . . . .> select a.* from dfs.root.`user/hive/warehouse/passwords_csv_middle` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b . . . . . . . . . . . . . . . . . . . . . . .> where a.columns[1]=b.columns[1] . . . . . . . . . . . . . . . . . . . . . . .> ); +------+------+ | text | json | +------+------+ | 00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.85940901E7 rows, 3.180145181E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 25637 00-01 StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.859409E7 rows, 3.18014518E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 25636 00-02 UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594089E7 rows, 3.18014506E8 cpu, 0.0 io, 5.290973184E10 network, 2.7064998400000002E7 memory}, id = 25635 01-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {4.8594088E7 rows, 3.18014498E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 25634 01-02 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {4.3673165E7 rows, 2.58963422E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 25633 01-03 HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {3.8752242E7 rows, 2.3927973E8 cpu, 0.0 io, 5.2909727744E10 network, 2.7064998400000002E7 memory}, id = 25632 01-05 Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 25624 01-07 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {2.4604615E7 rows, 1.27943998E8 cpu, 0.0 io, 4.0312201216E10 network, 0.0 memory}, id = 25623 02-01 UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.9683692E7 rows, 4.920923E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25622 04-01 Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 4920923.0, cumulative cost = {1.4762769E7 rows, 4.4288307E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25621 04-02 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25620 04-03 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25619 01-04 Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 25631 01-06 Project(ITEM=[$0]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 25630 01-08 HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {7688920.0 rows, 3.9982384E7 cpu, 0.0 io, 1.2597526528E10 network, 0.0 memory}, id = 25629 03-01 UnorderedMuxExchange : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {6151136.0 rows, 1.537784E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25628 05-01 Project(ITEM=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($0))]) : rowType = RecordType(ANY ITEM, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.3840056E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25627 05-02 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25626 05-03 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25625If we increase planner.broadcast_threshold to 1537785, broadcast join can be chose:
0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> alter session set `planner.broadcast_threshold`=1537785; +-------+---------------------------------------+ | ok | summary | +-------+---------------------------------------+ | true | planner.broadcast_threshold updated. | +-------+---------------------------------------+ 1 row selected (0.067 seconds) 0: jdbc:drill:zk=h2.poc.com:5181,h3.poc.com:5> explain plan including all attributes for . . . . . . . . . . . . . . . . . . . . . . .> select count(*) from . . . . . . . . . . . . . . . . . . . . . . .> ( . . . . . . . . . . . . . . . . . . . . . . .> select a.* from dfs.root.`user/hive/warehouse/passwords_csv_middle` a, dfs.root.`user/hive/warehouse/passwords_csv_2` b . . . . . . . . . . . . . . . . . . . . . . .> where a.columns[1]=b.columns[1] . . . . . . . . . . . . . . . . . . . . . . .> ); +------+------+ | text | json | +------+------+ | 00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.07557531E7 rows, 1.946839431E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 27438 00-01 StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755753E7 rows, 1.94683943E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 27437 00-02 UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755752E7 rows, 1.94683931E8 cpu, 0.0 io, 1.8896293888E10 network, 2.7064998400000002E7 memory}, id = 27436 01-01 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0755751E7 rows, 1.94683923E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 27435 01-02 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 4920923.0, cumulative cost = {2.5834828E7 rows, 1.35632847E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 27434 01-03 HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType = RecordType(ANY ITEM, ANY ITEM0): rowcount = 4920923.0, cumulative cost = {2.0913905E7 rows, 1.15949155E8 cpu, 0.0 io, 1.8896289792E10 network, 2.7064998400000002E7 memory}, id = 27433 01-05 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 4920923.0, cumulative cost = {9841846.0 rows, 2.4604615E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27428 01-07 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_middle, numFiles=1, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_middle/000000_0]]]) : rowType = RecordType(ANY columns): rowcount = 4920923.0, cumulative cost = {4920923.0 rows, 4920923.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27427 01-04 Project(ITEM0=[$0]) : rowType = RecordType(ANY ITEM0): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 27432 01-06 BroadcastExchange : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {4613352.0 rows, 1.9991192E7 cpu, 0.0 io, 1.8896289792E10 network, 0.0 memory}, id = 27431 02-01 Project(ITEM=[ITEM($0, 1)]) : rowType = RecordType(ANY ITEM): rowcount = 1537784.0, cumulative cost = {3075568.0 rows, 7688920.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27430 02-02 Scan(groupscan=[EasyGroupScan [selectionRoot=/user/hive/warehouse/passwords_csv_2, numFiles=2, columns=[`columns`[1]], files=[maprfs:/user/hive/warehouse/passwords_csv_2/000000_0, maprfs:/user/hive/warehouse/passwords_csv_2/000000_0_copy_1]]]) : rowType = RecordType(ANY columns): rowcount = 1537784.0, cumulative cost = {1537784.0 rows, 1537784.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27429
4. How to control the parallelism of above 2 Join types?
Tradeoff are always between performance and resource capacity. Query always executes faster with more system resources.Assume the cluster has enough system resource, if you want to increase the performance of a query, just simply increase the parallelism of the query.
Before reading any further, it is suggested to firstly understand the basic of Control Query Parallelization.
Below tests will do a fact-dimension table join, and will test the factors which can affect the parallelism of joins.
To give all cluster resource to each SQL, I manually increased planner.width.max_per_node to 999(large enough).
Here are the table size and row number:
- Fact table A: 734M with 20 million rows. (1 CSV file * 734M)
- Fact table B: 734M with 20 million rows. (10 CSV files * 73.4M)
- Dimension table C: 147M with 4 million rows. (2 CSV files * 73.4M)
Note that: Broadcast join consists of broadcasting the right side of the join and keeping the left side distribution as-is, which means that the parallelism of the join will depend on the left side scan.
The parallelism of this join is 3 because MFS default chunk size is 256M, and the fact table A has only 1 file with 734MB, so there are 3 splits.
The query takes 16 seconds to finish.
Test 2: B joins C using broadcast join
The same join type and the same table sizes, the only difference is fact table B has 10 files.
Now the parallelism is 10 because there are 10 files in fact table B, and each file is smaller than MFS chunk size.
The query takes 14 seconds to finish.
Test 3: A joins C using distributed join.
Comparing to test 1, distributed join's parallelism is 77. This is because planner.slice_target is 100K by default. And the estimated row count of fact table A is "rowcount = 7688921.0" from the physical SQL plan, so it spawns 77 minor fragments.
Please note: major fragment "04-xx-xx" is to scan A, and its parallelism is still 3.
The query takes 8 seconds to finish.
Test 4: B joins C using distributed join.
Comparing to test 3, distributed join also spawns 77 minor fragments because the row number of fact tables A and B are the same.
Please note: major fragment "04-xx-xx" is to scan B, and its paralelism is 10 because it has 10 files.
The query takes 7 seconds to finish.
In all:
- The parallelism of broadcast join is decided by the fact table size and file number. (The same as fact table scan's parallelism)
- The parallelism of distributed join is decided by "planner.slice_target" and larger table's row number.
With same size of data and same number of files:
- The smaller the row width is, the more parallelism for distributed join.
The larger the row width is, the more parallelism for broadcast join.
Reference:
Join Planning GuidelinesSort-Based and Hash-Based Memory-Constrained Operators
No comments:
Post a Comment