閱讀429 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Greenplum 跨庫數據JOIN需求 - dblink的使用和弊端以及解決方案

標簽

PostgreSQL , Greenplum , dblink


背景

Greenplum在許多企業中被用於數據倉庫,一個企業中通常會有統一的用戶管理係統,賬務係統;還有許多業務線。

數據被分成兩類,一類是業務相關的,一類是全公司統一的數據。

如果用戶將兩個部分數據分別存入不同的數據庫(單個實例可以創建多個數據庫),並且公共數據需要與業務數據JOIN時,你可能會想到dblink這個功能,通過DBLINK管理其他數據庫然後和本地數據進行JOIN。

pic

pic

如果你對實例和數據庫的概念不太理解,可以參考一下這篇文檔。

《PostgreSQL 邏輯結構 和 權限體係 介紹》

那麼到底dblink是否適合這個場景呢?

部署dblink on Greenplum

Greenplum默認並沒有打包dblink,所以需要部署一下。

下載與greenplum base postgresql 一致的postgresql源碼

例如現在greenplum base postgresql是8.3的版本。

cd postgresql-8.3/contrib/dblink/  
  
vi Makefile  
  
PG_CPPFLAGS = -I$(libpq_srcdir) -w  
  
export PATH=/home/gpdb/bin:$PATH  
  
USE_PGXS=1 make   
USE_PGXS=1 make install  

將dblink.so拷貝到所有節點的gp軟件目錄

/bin/mkdir -p '/home/digoal/gp/lib/postgresql'  
/bin/sh /home/digoal/gp/lib/postgresql/pgxs/src/makefiles/../../config/install-sh -c -m 755  dblink.so '/home/digoal/gp/lib/postgresql/dblink.so'  
/bin/sh /home/digoal/gp/lib/postgresql/pgxs/src/makefiles/../../config/install-sh -c -m 644 ./uninstall_dblink.sql '/home/digoal/gp/share/postgresql/contrib'  
/bin/sh /home/digoal/gp/lib/postgresql/pgxs/src/makefiles/../../config/install-sh -c -m 644 dblink.sql '/home/digoal/gp/share/postgresql/contrib'  
/bin/sh /home/digoal/gp/lib/postgresql/pgxs/src/makefiles/../../config/install-sh -c -m 644 ./README.dblink '/home/digoal/gp/doc/postgresql/contrib'  

測試

需要使用dblink的數據庫,執行dblink.sql

psql db1 -f ./dblink.sql  

創建2張測試表,注意他們的分布鍵,用於觀察。

create table tbl(id int);  
  
create table tbl1(c1 int,id int);  
  
postgres=# \d tbl  
      Table "public.tbl"  
 Column |  Type   | Modifiers   
--------+---------+-----------  
 id     | integer |   
Distributed by: (id)  
  
postgres=# \d tbl1  
     Table "public.tbl1"  
 Column |  Type   | Modifiers   
--------+---------+-----------  
 c1     | integer |   
 id     | integer |   
Distributed by: (c1)  

分別插入100萬測試數據

insert into tbl select generate_series(1,1000000);  
insert into tbl1 select 1,generate_series(1,1000000);  

測試1,原地JOIN

Redistribute Motion 3:3,表示從3個節點重分布到3個節點,說明原始數據來自3個節點。

Gather Motion 3:1,表示從3個節點匯聚到1個節點,

  
postgres=# explain analyze select count(*) from tbl join tbl1 on tbl.id=tbl1.id;  
                                                                                  QUERY PLAN                                                                                     
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Aggregate  (cost=72258.70..72258.71 rows=1 width=8)  
   Rows out:  1 rows with 648 ms to end, start offset by 17 ms.  
   ->  Gather Motion 3:1  (slice2; segments: 3)  (cost=72258.63..72258.68 rows=1 width=8)  
         Rows out:  3 rows at destination with 647 ms to first row, 648 ms to end, start offset by 17 ms.  
         ->  Aggregate  (cost=72258.63..72258.64 rows=1 width=8)  
               Rows out:  Avg 1.0 rows x 3 workers.  Max 1 rows (seg0) with 645 ms to end, start offset by 19 ms.  
               ->  Hash Join  (cost=23619.20..69756.61 rows=333603 width=0)  
                     Hash Cond: tbl1.id = tbl.id  
                     Rows out:  Avg 333333.3 rows x 3 workers.  Max 333385 rows (seg2) with 120 ms to first row, 560 ms to end, start offset by 19 ms.  
                     Executor memory:  7813K bytes avg, 7814K bytes max (seg2).  
                     Work_mem used:  7813K bytes avg, 7814K bytes max (seg2). Workfile: (0 spilling, 0 reused)  
                     (seg2)   Hash chain length 1.2 avg, 2 max, using 281103 of 524341 buckets.  
                       
		     因為兩個表的JOIN字段並不都是他們的分布鍵,所以其中一個表會選擇按JOIN字段進行重新分布,或者廣播全表。(視成本決定)  
		       
		     ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..31125.27 rows=333603 width=4)  
                           Hash Key: tbl1.id  
                           Rows out:  Avg 333333.3 rows x 3 workers at destination.  Max 333385 rows (seg2) with 0.102 ms to first row, 286 ms to end, start offset by 139 ms.  
                           ->  Seq Scan on tbl1  (cost=0.00..11109.09 rows=333603 width=4)  
                                 Rows out:  1000000 rows (seg0) with 0.118 ms to first row, 191 ms to end, start offset by 21 ms.  
                     ->  Hash  (cost=11109.09..11109.09 rows=333603 width=4)  
                           Rows in:  Avg 333333.3 rows x 3 workers.  Max 333385 rows (seg2) with 118 ms to end, start offset by 22 ms.  
                           ->  Seq Scan on tbl  (cost=0.00..11109.09 rows=333603 width=4)  
                                 Rows out:  Avg 333333.3 rows x 3 workers.  Max 333385 rows (seg2) with 0.027 ms to first row, 33 ms to end, start offset by 22 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 163K bytes.  
   (slice1)    Executor memory: 257K bytes avg x 3 workers, 283K bytes max (seg0).  
   (slice2)    Executor memory: 24788K bytes avg x 3 workers, 24788K bytes max (seg0).  Work_mem: 7814K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 668.319 ms  
(28 rows)  

測試2,一張表JOIN另一個DBLINK的結果

從DBLINK結果的重分布信息(1:3),可以分析出這樣的結論

1. 可以肯定的是,DBLINK並沒有在每個數據節點執行,但是在哪個數據節點執行的,從計劃上看不出來。

2. 由於DBLINK沒有在所有節點執行,意味著,如果DBLINK返回的結果集很大的話,這個執行節點的壓力會較大。

postgres=# explain analyze select count(*) from tbl join (select * from dblink('dbname=postgres','select * from tbl1') AS t(c1 int,id int)) t on tbl.id=t.id;  
                                                                                     QUERY PLAN                                                                                       
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Aggregate  (cost=13691.18..13691.19 rows=1 width=8)  
   Rows out:  1 rows with 1673 ms to end, start offset by 7.751 ms.  
   ->  Gather Motion 3:1  (slice2; segments: 3)  (cost=13691.11..13691.17 rows=1 width=8)  
         Rows out:  3 rows at destination with 1669 ms to first row, 1673 ms to end, start offset by 7.752 ms.  
         ->  Aggregate  (cost=13691.11..13691.12 rows=1 width=8)  
               Rows out:  Avg 1.0 rows x 3 workers.  Max 1 rows (seg0) with 1670 ms to end, start offset by 11 ms.  
               ->  Hash Join  (cost=65.00..13688.61 rows=334 width=0)  
                     Hash Cond: tbl.id = t.id  
                     Rows out:  Avg 333333.3 rows x 3 workers.  Max 333385 rows (seg2) with 1469 ms to first row, 1629 ms to end, start offset by 11 ms.  
                     Executor memory:  7813K bytes avg, 7814K bytes max (seg2).  
                     Work_mem used:  7813K bytes avg, 7814K bytes max (seg2). Workfile: (0 spilling, 0 reused)  
                     (seg2)   Hash chain length 1.6 avg, 4 max, using 205910 of 262151 buckets.  
                     ->  Seq Scan on tbl  (cost=0.00..11109.09 rows=333603 width=4)  
                           Rows out:  Avg 333333.3 rows x 3 workers.  Max 333385 rows (seg2) with 0.039 ms to first row, 37 ms to end, start offset by 1479 ms.  
                     ->  Hash  (cost=52.50..52.50 rows=334 width=4)  
                           Rows in:  Avg 333333.3 rows x 3 workers.  Max 333385 rows (seg2) with 1468 ms to end, start offset by 12 ms.  
                             
			   重分布信息,可以看出信息是從1個節點重分布到3個節點的。  
			   這裏沒有看到Gather Motion(即數據收到master的過程),是不是可以判斷dblink是在某個數據節點上被執行的?還不能。  
			   ->  Redistribute Motion 1:3  (slice1)  (cost=0.00..52.50 rows=1000 width=4)  
                                 Hash Key: t.id  
                                 Rows out:  Avg 333333.3 rows x 3 workers at destination.  Max 333385 rows (seg2) with 1068 ms to first row, 1400 ms to end, start offset by 12 ms.  
                                   
				 dblink調用信息,這裏看不出來它到底是在哪個節點調用的。也不知道是不是所有節點調用的。  
				 ->  Function Scan on dblink t  (cost=0.00..12.50 rows=3000 width=4)  
                                       Rows out:  1000000 rows with 1066 ms to first row, 1217 ms to end, start offset by 12 ms.  
                                       Work_mem used:  8193K bytes.  
 Slice statistics:  
   (slice0)    Executor memory: 163K bytes.  
   (slice1)    Executor memory: 41138K bytes (entry db).  Work_mem: 8193K bytes max.  
   (slice2)    Executor memory: 20767K bytes avg x 3 workers, 20767K bytes max (seg0).  Work_mem: 7814K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 1681.166 ms  
(29 rows)  

測試3,自定義function 1的調用和重分布

從重分布執行計劃結果看,自定義函數也隻在某個節點被調用。

create or replace function f() returns setof int as $$  
  select generate_series(1,100000);  
$$ language sql strict;  
postgres=# explain analyze select count(*) from tbl join (select * from f() as t(id)) t on tbl.id=t.id;  
                                                                                  QUERY PLAN                                                                                     
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Aggregate  (cost=13691.18..13691.19 rows=1 width=8)  
   Rows out:  1 rows with 276 ms to end, start offset by 8.441 ms.  
   ->  Gather Motion 3:1  (slice2; segments: 3)  (cost=13691.11..13691.17 rows=1 width=8)  
         Rows out:  3 rows at destination with 269 ms to first row, 276 ms to end, start offset by 8.442 ms.  
         ->  Aggregate  (cost=13691.11..13691.12 rows=1 width=8)  
               Rows out:  Avg 1.0 rows x 3 workers.  Max 1 rows (seg0) with 273 ms to end, start offset by 11 ms.  
               ->  Hash Join  (cost=65.00..13688.61 rows=334 width=0)  
                     Hash Cond: tbl.id = t.id  
                     Rows out:  Avg 33333.3 rows x 3 workers.  Max 33348 rows (seg0) with 177 ms to first row, 269 ms to end, start offset by 11 ms.  
                     Executor memory:  782K bytes avg, 782K bytes max (seg0).  
                     Work_mem used:  782K bytes avg, 782K bytes max (seg0). Workfile: (0 spilling, 0 reused)  
                     (seg0)   Hash chain length 1.0 avg, 1 max, using 33348 of 262151 buckets.  
                     ->  Seq Scan on tbl  (cost=0.00..11109.09 rows=333603 width=4)  
                           Rows out:  Avg 333333.3 rows x 3 workers.  Max 333385 rows (seg2) with 0.027 ms to first row, 31 ms to end, start offset by 188 ms.  
                     ->  Hash  (cost=52.50..52.50 rows=334 width=4)  
                           Rows in:  Avg 33333.3 rows x 3 workers.  Max 33348 rows (seg0) with 175 ms to end, start offset by 13 ms.  
                             
			   從一個節點重新分布到3個節點  
			   ->  Redistribute Motion 1:3  (slice1)  (cost=0.00..52.50 rows=1000 width=4)  
                                 Hash Key: t.id  
                                 Rows out:  Avg 33333.3 rows x 3 workers at destination.  Max 33348 rows (seg0) with 92 ms to first row, 167 ms to end, start offset by 13 ms.  
                                   
				 函數在某個節點被調用  
				 ->  Function Scan on f t  (cost=0.00..12.50 rows=3000 width=4)  
                                       Rows out:  100000 rows with 93 ms to first row, 101 ms to end, start offset by 12 ms.  
                                       Work_mem used:  1025K bytes.  
 Slice statistics:  
   (slice0)    Executor memory: 163K bytes.  
   (slice1)    Executor memory: 5313K bytes (entry db).  Work_mem: 1025K bytes max.  
   (slice2)    Executor memory: 6431K bytes avg x 3 workers, 6431K bytes max (seg0).  Work_mem: 782K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 284.298 ms  
(29 rows)  

測試4,自定義function 2的調用和重分布

某些情況會報錯,例如: 當函數中有訪問到數據庫表,並且需要與其他表進行JOIN時。

postgres=# create or replace function f() returns setof int as $$  
  select id from tbl1;  
$$ language sql strict;  
CREATE FUNCTION  
postgres=# \set VERBOSITY verbose  
  
postgres=# explain analyze select count(*) from tbl join (select * from f() as t(id)) t on tbl.id=t.id;  
NOTICE:  XX000: function cannot execute on segment because it accesses relation "public.tbl1" (functions.c:155)  (entry db r10k04474.sqa.zmf:29999 pid=53723) (cdbdisp.c:1326)  
DETAIL:  SQL function "f" during startup  
LOCATION:  cdbdisp_finishCommand, cdbdisp.c:1326  
  
  
postgres=# explain analyze select count(*) from f();  
                                              QUERY PLAN                                                
------------------------------------------------------------------------------------------------------  
 Aggregate  (cost=20.00..20.01 rows=1 width=8)  
   Rows out:  1 rows with 1383 ms to end, start offset by 0.071 ms.  
   ->  Function Scan on f  (cost=0.00..12.50 rows=3000 width=0)  
         Rows out:  1000000 rows with 1186 ms to first row, 1275 ms to end, start offset by 0.072 ms.  
         Work_mem used:  8193K bytes.  
 Slice statistics:  
   (slice0)    Executor memory: 33064K bytes.  Work_mem: 8193K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 1383.044 ms  
(10 rows)  

測試5,單獨調用dblink和自定義函數

從執行計劃可以看出,沒有Gather motion節點,說明dblink函數和自定義函數就是在master節點執行的。

postgres=# explain analyze select count(*) from dblink('dbname=postgres','select * from tbl1') as t(c1 int,id int);  
                                              QUERY PLAN                                                
------------------------------------------------------------------------------------------------------  
 -- 注意這裏沒有Gather Motion節點,那說明dblink函數就是在master執行的  
 Aggregate  (cost=20.00..20.01 rows=1 width=8)  
   Rows out:  1 rows with 1306 ms to end, start offset by 0.074 ms.  
   ->  Function Scan on dblink t  (cost=0.00..12.50 rows=3000 width=0)  
         Rows out:  1000000 rows with 1099 ms to first row, 1195 ms to end, start offset by 0.075 ms.  
         Work_mem used:  8193K bytes.  
 Slice statistics:  
   (slice0)    Executor memory: 41029K bytes.  Work_mem: 8193K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 1306.167 ms  
(10 rows)  
  
postgres=# explain analyze select count(*) from f() as t(id);  
                                             QUERY PLAN                                               
----------------------------------------------------------------------------------------------------  
 -- 注意這裏沒有Gather Motion節點,那說明f()函數就是在master執行的  
 Aggregate  (cost=20.00..20.01 rows=1 width=8)  
   Rows out:  1 rows with 826 ms to end, start offset by 0.072 ms.  
   ->  Function Scan on f t  (cost=0.00..12.50 rows=3000 width=0)  
         Rows out:  1000000 rows with 627 ms to first row, 718 ms to end, start offset by 0.072 ms.  
         Work_mem used:  8193K bytes.  
 Slice statistics:  
   (slice0)    Executor memory: 33064K bytes.  Work_mem: 8193K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 825.970 ms  
(10 rows)  

如果在數據節點執行,應該有Gather motion節點,例如

postgres=# explain analyze select * from tbl1;  
                                                  QUERY PLAN                                                     
---------------------------------------------------------------------------------------------------------------  
 -- 數據從3個數據節點收到MASTER節點  
 Gather Motion 3:1  (slice1; segments: 3)  (cost=0.00..11109.09 rows=1000809 width=8)  
   Rows out:  1000000 rows at destination with 3.191 ms to first row, 335 ms to end, start offset by 0.284 ms.  
   ->  Seq Scan on tbl1  (cost=0.00..11109.09 rows=333603 width=8)  
         Rows out:  1000000 rows (seg0) with 0.032 ms to first row, 96 ms to end, start offset by 3.223 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 235K bytes.  
   (slice1)    Executor memory: 139K bytes avg x 3 workers, 155K bytes max (seg0).  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 415.013 ms  
(10 rows)  
  
  
兩階段聚合的例子  
  
postgres=# explain analyze select count(*) from tbl1;  
                                                       QUERY PLAN                                                          
-------------------------------------------------------------------------------------------------------------------------  
 -- master節點的聚合操作  
 Aggregate  (cost=13611.18..13611.19 rows=1 width=8)  
   Rows out:  1 rows with 360 ms to end, start offset by 0.349 ms.  
     
   -- 數據從3個數據節點收到MASTER節點  
   ->  Gather Motion 3:1  (slice1; segments: 3)  (cost=13611.11..13611.17 rows=1 width=8)  
         Rows out:  3 rows at destination with 3.013 ms to first row, 360 ms to end, start offset by 0.350 ms.  
           
	 -- 數據節點的聚合操作  
	 ->  Aggregate  (cost=13611.11..13611.12 rows=1 width=8)  
               Rows out:  Avg 1.0 rows x 3 workers.  Max 1 rows (seg0) with 356 ms to end, start offset by 4.229 ms.  
               ->  Seq Scan on tbl1  (cost=0.00..11109.09 rows=333603 width=0)  
                     Rows out:  1000000 rows (seg0) with 0.028 ms to first row, 244 ms to end, start offset by 4.230 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 159K bytes.  
   (slice1)    Executor memory: 163K bytes avg x 3 workers, 163K bytes max (seg0).  
 Statement statistics:  
   Memory used: 128000K bytes  
 Total runtime: 360.824 ms  
(14 rows)  

分布式數據庫兩階段聚合的原理請參考

《Postgres-XC customized aggregate introduction》

《Greenplum 最佳實踐 - 估值插件hll的使用(以及hll分式聚合函數優化)》

Greenplum dblink 弊端

目前dblink與普通的用戶自定義函數類似,並沒有和Greenplum的MPP架構進行適配,它們會在master節點被調用,如果dblink返回的結果集較大,master很容易成為瓶頸。

如果需要使用dblink與其他表進行JOIN,流程是這樣的。

1. 首先會在master調用dblink,

2. dblink執行的結果集會收到master節點

3. master節點將結果集重分布到數據節點,

4. 然後再與其他表進行JOIN。(好在JOIN並不會在master節點執行。)

當然,我們不排除gpdb社區未來會改造dblink,來適配MPP的架構。但是至少目前還存在以上弊端,(除非dblink返回的結果集很小,否則請謹慎使用)。

建議的方案

1. 建議數據放到一個數據庫中,使用不同的schema來區分不同的業務數據或公共數據。這樣的話在同一個數據庫中就可以任意的JOIN了,對master無傷害。

2. 如果不同業務一定要使用多個數據庫,那麼建議使用外部表作為公共表,這樣做也不會傷害MASTER,並且每個節點都可以並行的訪問外部表的數據。

例如gpfdist外部表,阿裏雲HybridDB的OSS外部表等。

外部表一旦寫入,就不可修改,如果公共數據經常變化,或者定期需要更新,(例如某些賬務係統,每天或定期會將用戶信息更新到Greenplum中)那麼建議使用一個字段來標示最新數據,同時低頻率的增量合並外部表。

例如

2.1. 隻寫 tbl_foreign_table_news(id int, xxx, xxx 最後更新時間)。

2.2. 低頻率的truncate tbl_foreign_table_origin,然後將tbl_foreign_table_news合並到 tbl_foreign_table_origin。

2.3. 用戶查詢tbl_foreign_table_origin即為公共數據。

3. 如果dblink獲取的結果集較小,那麼使用dblink作為臨時的方案,來實現實例內跨庫數據JOIN是沒有太大問題的。

阿裏雲HybridDB for PostgreSQL經典用法

pic

參考

《PostgreSQL 邏輯結構 和 權限體係 介紹》

《Postgres-XC customized aggregate introduction》

《Greenplum 最佳實踐 - 估值插件hll的使用(以及hll分式聚合函數優化)》

最後更新:2017-05-07 07:57:21

  上一篇:go IOS WebView控件詳解
  下一篇:go Delphi原生Windows程序