389
技術社區[雲棲]
SQL優化器原理 - Auto Hash Join
這是MaxCompute有關SQL優化器原理的係列文章之一。我們會陸續推出SQL優化器有關優化規則和框架的其他文章。添加釘釘群“關係代數優化技術”(群號11719083)可以獲取最新文章發布動態(二維碼在文章末尾)。
本文主要描述MaxCompute優化器實現的Auto Hash Join的功能。
簡介
在MaxCompute中,Join操作符的實現算法之一名為"Hash Join",其實現原理是,把小表的數據全部讀入內存中,並拷貝多份分發到大表數據所在機器,在 map 階段直接掃描大表數據與內存中的小表數據進行匹配。Hash join執行方式效率很高,但是要求小表數據足夠小以便放到內存中,假如小表數據太大,則任務在執行過程中會報OutOfMemory錯誤。
在MapCompute中,可以使用MapJoin關鍵字來實現Hash join,如下所示:
select /* + mapjoin(b) */ a.* from table1 a join table2 b on a.col1 = b.col2;
// b表為小表
但是這種通過使用hint的方式還是不夠智能。另外對於query複雜的情況,用戶很可能因為無法確定join的某一路數據量大小而放棄使用mapjoin。在最新的MaxCompute SQL 2.0中,基於代價的優化器(Cost Based Optimizer,CBO)包含了一個自動優化join為hash join的優化規則。
實現原理
在CBO中會對所有的operator的cost進行估計,這個cost包含rowcount、cpu、內存等等。有了各個operator的cost,就能估計其對應輸出數據量的大小,公式可以簡單的認為是:data_size = rowcount * averageRowSize
。有了dataSize之後,就可以很容易知道這個任務是否適合使用HashJoin,其判定方法就是計算各個parent operator的data size之和是否小於某個閾值。假如估算出的data size在閾值範圍之內,則會產生一個包含HashJoin的計劃。同時對於Join,CBO也會產生一個普通的包含MergeJoin的計劃,最後在這兩個計劃中選擇cost最小的作為最優計劃。
簡單說來,在CBO中是否選擇HashJoin作為最優計劃的步驟有兩個:
- Step1:估算join的輸入數據量大小,判定是否產生一個包含HashJoin的計劃
- Step2:對比HashJoin、MergeJoin相關計劃的cost,選擇cost最小的計劃作為最優計劃
舉例,對如下sql進行優化:
select t1.name from
(select dt_bad_linenum as name from bad_tpch_customer) t1
join
(select c_name from tpch_customer) t2
on t1.name = t2.c_name;
上述sql在CBO中會翻譯生成如下operator tree:
OdpsLogicalProject(name=[$0]): rowcount = 9000000.0, cumulative cost = {48000008.0 rows, 39000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 5
LogicalJoin(condition=[EQ($0, $1)], joinType=[inner]): rowcount = 9000000.0, cumulative cost = {39000008.0 rows, 30000010.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 4
OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1
OdpsLogicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_file,dt_bad_linenum,dt_bad_msg,dt_bad_code,dt_bad_data(5) {0, 1, 2, 3, 4}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 5.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 0
OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
OdpsLogicalTableScan(table=[[tpch_100gb.tpch_customer, c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment(8) {0, 1, 2, 3, 4, 5, 6, 7}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 15000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 2
從上可以看到,join的parent operator有兩個:
OdpsLogicalProject(name=[$1]): rowcount = 4.0, cumulative cost = {8.0 rows, 9.0 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 1
OdpsLogicalProject(c_name=[$1]): rowcount = 1.5E7, cumulative cost = {3.0E+7 rows, 30000001 cpu, 0.0 io, 0.0 memory, 0.0 network}, id = 3
其中id為1的project其輸出記錄數是4行,且其輸出列隻有1列(bad_tpch_customer表中有5列),估算其輸出數據量,認為其適合使用HashJoin,因此其產生的計劃中包含兩種:
- 計劃1:HashJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {28500024.88 rows, 28500013.222723687326862 cpu, 270001607.0 io, 496.0 memory, 378.0 network}, id = 109
OdpsPhysicalHashJoin(type=[INNER], equi=[[($0,$1)]], mainstream=[1]): rowcount = 3.24, cumulative cost = {28500021.64 rows, 28500013.222723687326862 cpu, 270001548.0 io, 496.0 memory, 378.0 network}, id = 108
OdpsPhysicalStreamlineRead(order=[[]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 0.0 network}, id = 106
OdpsPhysicalStreamlineWrite(shuffle=[broadcast], order=[[]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 105
OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 104
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 103
OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 102
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 107
OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 99
- 計劃2:MergeJoin
OdpsPhysicalProject(name=[$0]): rowcount = 3.24, cumulative cost = {55500024.88 rows, 471791423.394757487326862 cpu, 756001229.0 io, 336.0 memory, 270459000360.0 network}, id = 104
OdpsPhysicalMergeJoin(type=[INNER], equi=[[($0,$1)]]): rowcount = 3.24, cumulative cost = {55500021.64 rows, 471791423.394757487326862 cpu, 756001170.0 io, 336.0 memory, 270459000360.0 network}, id = 103
OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 3.6, cumulative cost = {18.4 rows, 13.222723687326862 cpu, 1170.0 io, 100.0 memory, 360.0 network}, id = 99
OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 3.6, cumulative cost = {14.8 rows, 8.611361843663431 cpu, 810.0 io, 100.0 memory, 0.0 network}, id = 98
OdpsPhysicalProject(name=[$0]): rowcount = 3.6, cumulative cost = {11.2 rows, 4.0 cpu, 450.0 io, 100.0 memory, 0.0 network}, id = 97
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 3.6, cumulative cost = {7.6 rows, 4.0 cpu, 400.0 io, 100.0 memory, 0.0 network}, id = 96
OdpsPhysicalTableScan(table=[[tpch_100gb.bad_tpch_customer, dt_bad_linenum(1) {1}]]): rowcount = 4.0, cumulative cost = {4.0 rows, 0.0 cpu, 400.0 io, 0.0 memory, 0.0 network}, id = 95
OdpsPhysicalStreamlineRead(order=[[0]]): rowcount = 1.35E7, cumulative cost = {5.55E+7 rows, 458291406.5720338 cpu, 756000000.0 io, 18.0 memory, 270459000000.0 network}, id = 102
OdpsPhysicalStreamlineWrite(shuffle=[hash[0],JoinHasher], order=[[0]]): rowcount = 1.35E7, cumulative cost = {4.20E+7 rows, 236645703.2860169 cpu, 513000000.0 io, 18.0 memory, 0.0 network}, id = 101
OdpsPhysicalFilter(condition=[ISNOTNULL($0)]): rowcount = 1.35E7, cumulative cost = {2.85E+7 rows, 15000000.0 cpu, 270000000.0 io, 18.0 memory, 0.0 network}, id = 100
OdpsPhysicalTableScan(table=[[tpch_100gb.tpch_customer, c_name(1) {1}]]): rowcount = 1.5E7, cumulative cost = {1.5E+7 rows, 0.0 cpu, 2.7E+8 io, 0.0 memory, 0.0 network}, id = 92
比較上述兩個計劃的cost,明顯計劃1的cost更小,因此選擇包含HashJoin的計劃1作為最優計劃。
總結
AutoHashJoin的一個很大的好處是能讓用戶免參與的進行這個優化,同時對於一些複雜的query也更有可能使用HashJoin。但是,因為CBO無法完美估計數據量,會出現誤判從而導致任務OOM的情況。針對這種情況,MaxCompute也進行了相應的調整,對於CBO誤判導致HashJoin OOM的任務會關閉HashJoin rule來重試。
目前CBO中使用HashJoin的閾值比較保守,默認是25MB。主要原因是CBO對於數據量的估計有偏差,無法完美估計數據量,而估計不準的原因有兩個:
- 數據是壓縮存儲的,CBO拿到的statistics不準
- CBO的估計算法有偏差
這兩個問題也是CBO致力解決的問題。
最後更新:2017-08-28 09:03:30