閱讀893 返回首頁    go 阿裏雲 go 技術社區[雲棲]


每天萬億+級 實時分析、數據規整 - 阿裏雲HybridDB for PostgreSQL最佳實踐

背景

橫看成嶺側成峰,

遠近高低各不同。

不識廬山真麵目,

隻緣身在此山中。

不同的視角我們所看到的物體是不一樣的,

https://t.m.china.com.cn/convert/c_ovWL9w.html

pic

圖為墨西哥城放射狀的街區廣場。

pic

圖為西班牙迷宮般的果樹漩渦。

地心說和日心說也是視角不同所呈現的。

pic

pic

實際上數據也有這樣,我們每天產生海量的數據,有各種屬性,以每個屬性為視角(分組、歸類、聚合),看到的是對應屬性為中心的數據。

對應的業務場景也非常多,例如:

1、物聯網,

每個傳感器有很多屬性:ID,地理位置,傳感器歸屬,各類指標等。

以傳感器的角度進行觀察,觀察某個傳感器ID在流金歲月中的值。

以歸屬角度(例如歸屬於公安、城管、某家公司、。。。)進行觀察,

以地理位置為視角進行觀察,。。。。

2、車聯網、站長網。。。。

按車輛、按客戶、按訪問者。。。多重視角進行觀察

觀察在數據庫中可以觸發兩種行為,一種是實時計算的行為,另一種是數據規整的行為。

數據規整指將數據按視角重新歸類存放。(例如在雲端匯聚了各個網站的被訪問記錄,站長隻關注他自己的網站的被訪問記錄,當需要向站長提供數據時,可以按網站進行數據規整)。

那麼如何在雲端實現實時分析、準實時數據歸類的需求呢?

1 架構

HybridDB for PostgreSQL是阿裏雲的一款分析型MPP數據庫產品(基於Greenplum開源版本而來,新增了插件功能、雲端特性以及內核代碼優化),提供了水平擴展的能力以及強大的分析SQL兼容性,同時與雲端海量存儲OSS進行了深度整合,可以並行讀寫OSS,將OSS作為數據存儲來使用。

實時計算架構

pic

海量數據源,寫入OSS,通過HybridDB for PostgreSQL的oss_ext插件,實時分析寫入的數據。

OSS帶寬指標:目前每個計算節點每個會話約30MB/s的讀寫速率。

對於列式存儲格式,數值類型。1億記錄約381MB,壓縮比5:1的話,約76.3MB。

按行換算的性能指標:2048個計算節點,讀寫吞吐約 805億行/s。每天處理6900萬億行(當然,按多列進出打個折扣,萬億級別也是沒有問題的)。

準實時數據規整架構

pic

實時數據規整的目的是按視角將數據規整,數據進入OSS時,是打亂的。由HybridDB for PostgreSQL對接重分布分組規整後,再寫出到OSS,形成規整的數據。

為什麼需要重分布?

前麵談到了視角問題,我們可能有多重視角來觀察數據,而在數據庫中隻能選擇一種固定的分布鍵,當視角與之不同時,就需要重分布。

準實時導出的優化:

對於一個視角,可能有少量或多種屬性,例如用戶實際,假設有100萬個用戶,如果每個計算節點分別導出100萬用戶,每個用戶對應到OSS的一個規整文件,那麼由於文件數過多,導出會較慢。

那麼可以對用戶重分布,例如1000個節點,每個節點分配到1000個用戶的數據,這樣的話,並行寫出到OSS時,一下子就降低到了每個節點寫1000個文件的規模。

如何強製重分布呢?後麵講到。

2 實時計算

HybridDB for PostgreSQL與OSS對接的詳細文檔請參考:

https://help.aliyun.com/document_detail/35457.html

簡略步驟如下:

1、創建OSS用戶

2、創建OSS BUCKET,例如每個小時一個BUCKET

3、寫入數據(最好寫入小文件,數量為HybridDB for PostgreSQL的倍數)

4、在HybridDB for PostgreSQL中創建OSS外部表,例如每個小時一個

5、直接讀取OSS外表進行分析

3 準實時數據規整

1、需求

按不同的視覺維度,進行分組,每個視覺屬性規整到一個OSS文件。也就是說一個OSS文件不能存在多個對象。

2、查詢

比如需要按視角分組,按時間排序輸出。

select 聚合函數(t order by 時間) from tbl t group by 視角字段;  

以上SQL,數據庫可能不會按視角字段重分布,而是使用兩階段提交的方式。例如

postgres=# create table tbl(uid int, info text, c1 int);  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'uid' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
CREATE TABLE  
表按UID分布  
  
但是查詢不按它,看看會不會重分布  
  
postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from tbl t group by c1;  
                                               QUERY PLAN                                                 
--------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.06 rows=1 width=36)  
   ->  GroupAggregate  (cost=0.03..0.06 rows=1 width=36)  
         Group By: c1  
         ->  Sort  (cost=0.03..0.04 rows=1 width=36)  
               Sort Key: c1  
	       -- 按c1重分布  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=36)  
                     Hash Key: c1  
                     ->  Seq Scan on tbl t  (cost=0.00..0.00 rows=1 width=36)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(10 rows)  
  
這條是不被期望的,因為發生了兩次聚合。  
我們在將數據寫入OSS時,不希望兩次聚合。  
  
postgres=# explain select max(c1) from tbl group by info;  
                                            QUERY PLAN                                              
--------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.04..0.05 rows=1 width=36)  
   -- 第二次聚合  
   ->  HashAggregate  (cost=0.04..0.05 rows=1 width=36)  
         Group By: tbl.info  
         -- 重分布  
	 ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.01..0.03 rows=1 width=36)  
               Hash Key: tbl.info  
               -- 第一次聚合  
	       ->  HashAggregate  (cost=0.01..0.01 rows=1 width=36)  
                     Group By: tbl.info  
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=36)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(10 rows)  

強製按指定鍵重分布

postgres=# explain select row_number() over (partition by c1 order by info), * from tbl;  
                                               QUERY PLAN                                                 
--------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.04 rows=1 width=40)  
   ->  Window  (cost=0.03..0.04 rows=1 width=40)  
         Partition By: c1  
         Order By: info  
         ->  Sort  (cost=0.03..0.04 rows=1 width=40)  
               Sort Key: c1, info  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)  
                     Hash Key: c1  
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(11 rows)  

按強製重分布, 改寫SQL

使用窗口查詢,將數據強製重分布,然後再進行計算節點的原地聚合。

postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from (select row_number() over (partition by c1 order by info), * from tbl) t group by c1;  
                                                     QUERY PLAN                                                       
--------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.07 rows=1 width=36)  
   -- 計算節點原地聚合  
   ->  GroupAggregate  (cost=0.03..0.07 rows=1 width=36)  
         Group By: t.c1  
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=36)  
               ->  Window  (cost=0.03..0.04 rows=1 width=40)  
                     Partition By: tbl.c1  
                     Order By: tbl.info  
                     -- 按指定要求的順序排序,例如按時間  
		     ->  Sort  (cost=0.03..0.04 rows=1 width=40)  
                           Sort Key: tbl.c1, tbl.info  
                           -- 按C1重分布  
			   ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)  
                                 Hash Key: tbl.c1  
                                 ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(14 rows)  

3、自定義聚合函數

Greenplum的自定義聚合與單節點聚合不同,一種為單階段模式,另一種為兩階段聚合模式。

單階段模式,將數據收到MASTER後進行聚合。流水:

初始值INITCOND,MASTER過程函數SFUNC,MASTER FINAL函數FINALFUNC。

兩階段模式先在數據節點並行執行,然後在MASTER執行第二階段。流水:

初始值INITCOND,數據節點過程函數SFUNC(數據節點並行執行),MASTER聚合函數PREFUNC,MASTER FINAL函數FINALFUNC。

SFUNC操作流水如下

1、每個節點調用sfunc聚合,輸入參數為(input_type數據 , 臨時結果stype),輸出為stype。處理第一條記錄時,臨時結果stype為 NULL 或 初始值INITCOND。

postgres=# \h create aggre  
Command:     CREATE AGGREGATE  
Description: define a new aggregate function  
Syntax:  
CREATE AGGREGATE name ( input_data_type [ , ... ] ) (  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , PREFUNC = prefunc ]  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  
  
or the old syntax  
  
CREATE AGGREGATE name (  
    BASETYPE = base_type,  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  

兩階段聚合優化方法如下

在節點調用sfunc聚合,輸入參數為(input_type數據 , 臨時結果stype),輸出為stype

sfunc( internal-state, next-data-values ) ---> next-internal-state    

segment第一階段收集結果傳輸到master調用prefunc,輸入(stype , stype),得到的結果為stype

prefunc( internal-state, internal-state ) ---> next-internal-state    

最後再將stype轉換為聚合的輸出類型即可(可選使用finalfunc)。

hll_union_agg 優化例子

CREATE AGGREGATE gp_hll_union_agg (hll) (   
  SFUNC = hll_union,   
  prefunc = hll_union, -- 第二階段函數  
  STYPE = hll   
);   

hll_add_agg 優化例子

# select hll_empty();  
  hll_empty     
--------------  
 \021\213\177  
(1 row)  
  
CREATE AGGREGATE gp_hll_add_agg (hll_hashval) (  
  SFUNC = hll_add,   
  STYPE = hll,   
  prefunc = hll_union, -- 第二階段函數  
  initcond='\021\213\177'  -- 初始值  
);   

但是請注意,由於在segment節點sfunc執行完沒有斷點接口,所以我們無法在SEGMENT節點直接將一階段聚合的數據寫入到OSS。(除非改GPDB代碼,加入一個斷點接口。)

怎麼辦呢?

通過UDF函數來實現,並要求它在每個數據節點單獨執行。

create or replace function f(gid int, v anyarray) returns void as $$  
declare  
  oss_ext_tbl name;  
begin  
  oss_ext_tbl := 'ext_tbl_'||gid;  
  execute format ('insert into %I select unnest(%L)', oss_ext_tbl, v);  
end;  
$$ language plpgsql strict;  

雖然這是一種方法,但是這種方式依舊不是最高效的,因為還有一次聚合的過程。

更高效率的方法是首先對數據重分布和排序,同時在導出到文件時自動根據上下文的VALUE變化,切換文件,根據新的VALUE命名並寫入新文件。

這部分工作需要修改數據庫的導出代碼來實現。

4、並行寫出到OSS

實現了在導出到文件時自動根據上下文的VALUE變化,切換文件,根據新的VALUE命名並寫入新文件這部分工作後,規整數據變得異常簡單。

1、非規整外部表(來源表)

例子

create external table origin (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)  
.........  -- 外部表OSS位置  
;  

同樣需要使用這種方法進行強製重分布

按UID規整,按crt_time排序

postgres=# explain select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;  
                                                  QUERY PLAN                                                    
--------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.05 rows=1 width=32)  
   ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)  
         ->  Window  (cost=0.03..0.04 rows=1 width=44)  
               Partition By: tbl.uid  
               Order By: tbl.crt_time  
               ->  Sort  (cost=0.03..0.04 rows=1 width=44)  
                     Sort Key: tbl.uid, tbl.crt_time  
                     ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)  
                           Hash Key: tbl.uid  
                           ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(12 rows)  

2、創建規整後OSS外部表

參考 阿裏雲HybridDB for PostgreSQL OSS存儲用法

create external table dest (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)  
.........  -- 外部表OSS位置  
;  

3、將數據寫入規整後OSS外部表

postgres=# explain insert into dest select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;  
                                                     QUERY PLAN                                                       
--------------------------------------------------------------------------------------------------------------------  
 Insert (slice0; segments: 48)  (rows=1 width=32)  
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)  
               ->  Window  (cost=0.03..0.04 rows=1 width=44)  
                     Partition By: tbl.uid  
                     Order By: tbl.crt_time  
                     ->  Sort  (cost=0.03..0.04 rows=1 width=44)  
                           Sort Key: tbl.uid, tbl.crt_time  
                           ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)  
                                 Hash Key: tbl.uid  
                                 ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(14 rows)  

小結

使用HybridDB for PostgreSQL,同時實現了實時分析,準實時數據規整兩個需求。

OSS作為海量數據入口,HDB作為OSS的計算引擎,實現海量數據實時分析。

同時HDB作為數據規整引擎,被規整的數據不需要在數據庫本地落地,直接從OSS到OSS,隻是用到了HDB的規整能力。

性能可以通過擴展HDB的計算節點線性擴展:

海量數據源,寫入OSS,通過HybridDB for PostgreSQL的oss_ext插件,實時分析寫入的數據。

OSS帶寬指標:目前每個計算節點每個會話約30MB/s的讀寫速率。

對於列式存儲格式,數值類型。1億記錄約381MB,壓縮比5:1的話,約76.3MB。

按行換算的性能指標:2048個計算節點,讀寫吞吐約 805億行/s。每天處理6900萬億行(當然,按多列進出打個折扣,萬億級別也是沒有問題的)。

參考

阿裏雲HybridDB for PostgreSQL

阿裏雲HybridDB for PostgreSQL OSS存儲用法

《Greenplum 性能評估公式 - 阿裏雲HybridDB for PostgreSQL最佳實踐》

《Greenplum 最佳實踐 - 估值插件hll的使用(以及hll分式聚合函數優化)》

《Postgres-XC customized aggregate introduction》

《PostgreSQL aggregate function customize》

最後更新:2017-08-13 22:52:17

  上一篇:go  Greenplum在企業生產中的最佳實踐(上)
  下一篇:go  MPP分布式數據庫性能評估方法 - 阿裏雲HybridDB for PostgreSQL最佳實踐