每天萬億+級 實時分析、數據規整 - 阿裏雲HybridDB for PostgreSQL最佳實踐
背景
橫看成嶺側成峰,
遠近高低各不同。
不識廬山真麵目,
隻緣身在此山中。
不同的視角我們所看到的物體是不一樣的,
https://t.m.china.com.cn/convert/c_ovWL9w.html
圖為墨西哥城放射狀的街區廣場。
圖為西班牙迷宮般的果樹漩渦。
地心說和日心說也是視角不同所呈現的。
實際上數據也有這樣,我們每天產生海量的數據,有各種屬性,以每個屬性為視角(分組、歸類、聚合),看到的是對應屬性為中心的數據。
對應的業務場景也非常多,例如:
1、物聯網,
每個傳感器有很多屬性:ID,地理位置,傳感器歸屬,各類指標等。
以傳感器的角度進行觀察,觀察某個傳感器ID在流金歲月中的值。
以歸屬角度(例如歸屬於公安、城管、某家公司、。。。)進行觀察,
以地理位置為視角進行觀察,。。。。
2、車聯網、站長網。。。。
按車輛、按客戶、按訪問者。。。多重視角進行觀察
觀察在數據庫中可以觸發兩種行為,一種是實時計算的行為,另一種是數據規整的行為。
數據規整指將數據按視角重新歸類存放。(例如在雲端匯聚了各個網站的被訪問記錄,站長隻關注他自己的網站的被訪問記錄,當需要向站長提供數據時,可以按網站進行數據規整)。
那麼如何在雲端實現實時分析、準實時數據歸類的需求呢?
1 架構
HybridDB for PostgreSQL是阿裏雲的一款分析型MPP數據庫產品(基於Greenplum開源版本而來,新增了插件功能、雲端特性以及內核代碼優化),提供了水平擴展的能力以及強大的分析SQL兼容性,同時與雲端海量存儲OSS進行了深度整合,可以並行讀寫OSS,將OSS作為數據存儲來使用。
實時計算架構
海量數據源,寫入OSS,通過HybridDB for PostgreSQL的oss_ext插件,實時分析寫入的數據。
OSS帶寬指標:目前每個計算節點每個會話約30MB/s的讀寫速率。
對於列式存儲格式,數值類型。1億記錄約381MB,壓縮比5:1的話,約76.3MB。
按行換算的性能指標:2048個計算節點,讀寫吞吐約 805億行/s。每天處理6900萬億行(當然,按多列進出打個折扣,萬億級別也是沒有問題的)。
準實時數據規整架構
實時數據規整的目的是按視角將數據規整,數據進入OSS時,是打亂的。由HybridDB for PostgreSQL對接重分布分組規整後,再寫出到OSS,形成規整的數據。
為什麼需要重分布?
前麵談到了視角問題,我們可能有多重視角來觀察數據,而在數據庫中隻能選擇一種固定的分布鍵,當視角與之不同時,就需要重分布。
準實時導出的優化:
對於一個視角,可能有少量或多種屬性,例如用戶實際,假設有100萬個用戶,如果每個計算節點分別導出100萬用戶,每個用戶對應到OSS的一個規整文件,那麼由於文件數過多,導出會較慢。
那麼可以對用戶重分布,例如1000個節點,每個節點分配到1000個用戶的數據,這樣的話,並行寫出到OSS時,一下子就降低到了每個節點寫1000個文件的規模。
如何強製重分布呢?後麵講到。
2 實時計算
HybridDB for PostgreSQL與OSS對接的詳細文檔請參考:
https://help.aliyun.com/document_detail/35457.html
簡略步驟如下:
1、創建OSS用戶
2、創建OSS BUCKET,例如每個小時一個BUCKET
3、寫入數據(最好寫入小文件,數量為HybridDB for PostgreSQL的倍數)
4、在HybridDB for PostgreSQL中創建OSS外部表,例如每個小時一個
5、直接讀取OSS外表進行分析
3 準實時數據規整
1、需求
按不同的視覺維度,進行分組,每個視覺屬性規整到一個OSS文件。也就是說一個OSS文件不能存在多個對象。
2、查詢
比如需要按視角分組,按時間排序輸出。
select 聚合函數(t order by 時間) from tbl t group by 視角字段;
以上SQL,數據庫可能不會按視角字段重分布,而是使用兩階段提交的方式。例如
postgres=# create table tbl(uid int, info text, c1 int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'uid' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE
表按UID分布
但是查詢不按它,看看會不會重分布
postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from tbl t group by c1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Gather Motion 48:1 (slice2; segments: 48) (cost=0.03..0.06 rows=1 width=36)
-> GroupAggregate (cost=0.03..0.06 rows=1 width=36)
Group By: c1
-> Sort (cost=0.03..0.04 rows=1 width=36)
Sort Key: c1
-- 按c1重分布
-> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.00..0.02 rows=1 width=36)
Hash Key: c1
-> Seq Scan on tbl t (cost=0.00..0.00 rows=1 width=36)
Settings: optimizer=off
Optimizer status: legacy query optimizer
(10 rows)
這條是不被期望的,因為發生了兩次聚合。
我們在將數據寫入OSS時,不希望兩次聚合。
postgres=# explain select max(c1) from tbl group by info;
QUERY PLAN
--------------------------------------------------------------------------------------------------
Gather Motion 48:1 (slice2; segments: 48) (cost=0.04..0.05 rows=1 width=36)
-- 第二次聚合
-> HashAggregate (cost=0.04..0.05 rows=1 width=36)
Group By: tbl.info
-- 重分布
-> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.01..0.03 rows=1 width=36)
Hash Key: tbl.info
-- 第一次聚合
-> HashAggregate (cost=0.01..0.01 rows=1 width=36)
Group By: tbl.info
-> Seq Scan on tbl (cost=0.00..0.00 rows=1 width=36)
Settings: optimizer=off
Optimizer status: legacy query optimizer
(10 rows)
強製按指定鍵重分布
postgres=# explain select row_number() over (partition by c1 order by info), * from tbl;
QUERY PLAN
--------------------------------------------------------------------------------------------------------
Gather Motion 48:1 (slice2; segments: 48) (cost=0.03..0.04 rows=1 width=40)
-> Window (cost=0.03..0.04 rows=1 width=40)
Partition By: c1
Order By: info
-> Sort (cost=0.03..0.04 rows=1 width=40)
Sort Key: c1, info
-> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.00..0.02 rows=1 width=40)
Hash Key: c1
-> Seq Scan on tbl (cost=0.00..0.00 rows=1 width=40)
Settings: optimizer=off
Optimizer status: legacy query optimizer
(11 rows)
按強製重分布, 改寫SQL
使用窗口查詢,將數據強製重分布,然後再進行計算節點的原地聚合。
postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from (select row_number() over (partition by c1 order by info), * from tbl) t group by c1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Gather Motion 48:1 (slice2; segments: 48) (cost=0.03..0.07 rows=1 width=36)
-- 計算節點原地聚合
-> GroupAggregate (cost=0.03..0.07 rows=1 width=36)
Group By: t.c1
-> Subquery Scan t (cost=0.03..0.05 rows=1 width=36)
-> Window (cost=0.03..0.04 rows=1 width=40)
Partition By: tbl.c1
Order By: tbl.info
-- 按指定要求的順序排序,例如按時間
-> Sort (cost=0.03..0.04 rows=1 width=40)
Sort Key: tbl.c1, tbl.info
-- 按C1重分布
-> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.00..0.02 rows=1 width=40)
Hash Key: tbl.c1
-> Seq Scan on tbl (cost=0.00..0.00 rows=1 width=40)
Settings: optimizer=off
Optimizer status: legacy query optimizer
(14 rows)
3、自定義聚合函數
Greenplum的自定義聚合與單節點聚合不同,一種為單階段模式,另一種為兩階段聚合模式。
單階段模式,將數據收到MASTER後進行聚合。流水:
初始值INITCOND,MASTER過程函數SFUNC,MASTER FINAL函數FINALFUNC。
兩階段模式先在數據節點並行執行,然後在MASTER執行第二階段。流水:
初始值INITCOND,數據節點過程函數SFUNC(數據節點並行執行),MASTER聚合函數PREFUNC,MASTER FINAL函數FINALFUNC。
SFUNC操作流水如下
1、每個節點調用sfunc聚合,輸入參數為(input_type數據 , 臨時結果stype),輸出為stype。處理第一條記錄時,臨時結果stype為 NULL 或 初始值INITCOND。
postgres=# \h create aggre
Command: CREATE AGGREGATE
Description: define a new aggregate function
Syntax:
CREATE AGGREGATE name ( input_data_type [ , ... ] ) (
SFUNC = sfunc,
STYPE = state_data_type
[ , PREFUNC = prefunc ]
[ , FINALFUNC = ffunc ]
[ , INITCOND = initial_condition ]
[ , SORTOP = sort_operator ]
)
or the old syntax
CREATE AGGREGATE name (
BASETYPE = base_type,
SFUNC = sfunc,
STYPE = state_data_type
[ , FINALFUNC = ffunc ]
[ , INITCOND = initial_condition ]
[ , SORTOP = sort_operator ]
)
兩階段聚合優化方法如下
在節點調用sfunc聚合,輸入參數為(input_type數據 , 臨時結果stype),輸出為stype
sfunc( internal-state, next-data-values ) ---> next-internal-state
segment第一階段收集結果傳輸到master調用prefunc,輸入(stype , stype),得到的結果為stype
prefunc( internal-state, internal-state ) ---> next-internal-state
最後再將stype轉換為聚合的輸出類型即可(可選使用finalfunc)。
hll_union_agg 優化例子
CREATE AGGREGATE gp_hll_union_agg (hll) (
SFUNC = hll_union,
prefunc = hll_union, -- 第二階段函數
STYPE = hll
);
hll_add_agg 優化例子
# select hll_empty();
hll_empty
--------------
\021\213\177
(1 row)
CREATE AGGREGATE gp_hll_add_agg (hll_hashval) (
SFUNC = hll_add,
STYPE = hll,
prefunc = hll_union, -- 第二階段函數
initcond='\021\213\177' -- 初始值
);
但是請注意,由於在segment節點sfunc執行完沒有斷點接口,所以我們無法在SEGMENT節點直接將一階段聚合的數據寫入到OSS。(除非改GPDB代碼,加入一個斷點接口。)
怎麼辦呢?
通過UDF函數來實現,並要求它在每個數據節點單獨執行。
create or replace function f(gid int, v anyarray) returns void as $$
declare
oss_ext_tbl name;
begin
oss_ext_tbl := 'ext_tbl_'||gid;
execute format ('insert into %I select unnest(%L)', oss_ext_tbl, v);
end;
$$ language plpgsql strict;
雖然這是一種方法,但是這種方式依舊不是最高效的,因為還有一次聚合的過程。
更高效率的方法是首先對數據重分布和排序,同時在導出到文件時自動根據上下文的VALUE變化,切換文件,根據新的VALUE命名並寫入新文件。
這部分工作需要修改數據庫的導出代碼來實現。
4、並行寫出到OSS
實現了在導出到文件時自動根據上下文的VALUE變化,切換文件,根據新的VALUE命名並寫入新文件這部分工作後,規整數據變得異常簡單。
1、非規整外部表(來源表)
例子
create external table origin (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)
......... -- 外部表OSS位置
;
同樣需要使用這種方法進行強製重分布
按UID規整,按crt_time排序
postgres=# explain select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------
Gather Motion 48:1 (slice2; segments: 48) (cost=0.03..0.05 rows=1 width=32)
-> Subquery Scan t (cost=0.03..0.05 rows=1 width=32)
-> Window (cost=0.03..0.04 rows=1 width=44)
Partition By: tbl.uid
Order By: tbl.crt_time
-> Sort (cost=0.03..0.04 rows=1 width=44)
Sort Key: tbl.uid, tbl.crt_time
-> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.00..0.02 rows=1 width=44)
Hash Key: tbl.uid
-> Seq Scan on origin tbl (cost=0.00..0.00 rows=1 width=44)
Settings: optimizer=off
Optimizer status: legacy query optimizer
(12 rows)
2、創建規整後OSS外部表
參考 阿裏雲HybridDB for PostgreSQL OSS存儲用法
create external table dest (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)
......... -- 外部表OSS位置
;
3、將數據寫入規整後OSS外部表
postgres=# explain insert into dest select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Insert (slice0; segments: 48) (rows=1 width=32)
-> Subquery Scan t (cost=0.03..0.05 rows=1 width=32)
-> Window (cost=0.03..0.04 rows=1 width=44)
Partition By: tbl.uid
Order By: tbl.crt_time
-> Sort (cost=0.03..0.04 rows=1 width=44)
Sort Key: tbl.uid, tbl.crt_time
-> Redistribute Motion 48:48 (slice1; segments: 48) (cost=0.00..0.02 rows=1 width=44)
Hash Key: tbl.uid
-> Seq Scan on origin tbl (cost=0.00..0.00 rows=1 width=44)
Settings: optimizer=off
Optimizer status: legacy query optimizer
(14 rows)
小結
使用HybridDB for PostgreSQL,同時實現了實時分析,準實時數據規整兩個需求。
OSS作為海量數據入口,HDB作為OSS的計算引擎,實現海量數據實時分析。
同時HDB作為數據規整引擎,被規整的數據不需要在數據庫本地落地,直接從OSS到OSS,隻是用到了HDB的規整能力。
性能可以通過擴展HDB的計算節點線性擴展:
海量數據源,寫入OSS,通過HybridDB for PostgreSQL的oss_ext插件,實時分析寫入的數據。
OSS帶寬指標:目前每個計算節點每個會話約30MB/s的讀寫速率。
對於列式存儲格式,數值類型。1億記錄約381MB,壓縮比5:1的話,約76.3MB。
按行換算的性能指標:2048個計算節點,讀寫吞吐約 805億行/s。每天處理6900萬億行(當然,按多列進出打個折扣,萬億級別也是沒有問題的)。
參考
阿裏雲HybridDB for PostgreSQL OSS存儲用法
《Greenplum 性能評估公式 - 阿裏雲HybridDB for PostgreSQL最佳實踐》
《Greenplum 最佳實踐 - 估值插件hll的使用(以及hll分式聚合函數優化)》
《Postgres-XC customized aggregate introduction》
《PostgreSQL aggregate function customize》
最後更新:2017-08-13 22:52:17
上一篇:
Greenplum在企業生產中的最佳實踐(上)
下一篇:
MPP分布式數據庫性能評估方法 - 阿裏雲HybridDB for PostgreSQL最佳實踐
android 手機屏蔽廣告 hosts
警示:一個專為AIX上11.2.0.4版本定製的Bug正在高發
Android開發13——內容提供者ContentProvider的基本使用
Page_Load上麵的代碼使用——ViewState,Session,Static,Linq
Java小白看過來,Java目前的就業前景怎麼樣
PostgreSQL 10.0 preview 功能增強 - 老板特性, LONG SQL過程可視 pg_stat_progress_vacuum
阿裏雲合作夥伴媒介匣獲千萬級Pre-A輪融資 競逐企業營銷服務市場
精準醫療再進一步,機器人成醫生得力助手
MongoDB時間問題
《軟件工藝師:專業、務實、自豪》一2.3.1 轉變開發方式