泛金融賬務流水 存儲與快照回溯查詢 案例分享
標簽
PostgreSQL , 金融 , 審計數據 , feed , 軌跡數據 , 明細數據 , 快照 , 滑動窗口 , binlog , oss
背景
在金融行業中,或者一些含有支付業務,往來對賬業務,虛擬貨幣業務等業務的場景中,賬戶係統的變更流水是一份很大的數據。
為什麼需要這份流水呢?直接保留賬戶的最終狀態不就好了嗎?
實際上流水數據就是日誌數據,它記錄了用戶的每一筆賬戶變動,流水數據可以作為審計數據,也可以用於數據分析,還可用於數據的追溯(例如交警說你開車闖紅燈了,你會問交警要錄像和照片一樣)。
有多種產生流水數據的方式,一種是業務上產生,即寫表。
另一種是數據庫自己產生的日誌(WAL),這部分日誌也能複原出用戶賬戶變化的流水(old value, new value, 事務號)。
下麵是例子,如何管理流水數據呢?
例子1
例子1為業務寫流水。
表結構設計
1、賬號表
create table tbl
(
xid_num int8, -- 產生、變更這筆記錄的作業號,如果一個作業中包含多個賬號的變更(例如轉賬),可以通過作業號關聯起來。
uid int primary key, -- 用戶賬號ID
balance float8 check (balance >=0 ), -- 餘額
crt_time timestamp default now(), -- 記錄創建時間
mod_time timestamp -- 記錄修改時間
);
2、流水表
create table tbl_history (
xid_num int8, -- 作業號
uid int,
crt_time timestamp default now(),
audit_old hstore, -- 變更前的記錄,刪除前的記錄
audit_new hstore, -- 變更後的記錄,新增的記錄
tag text -- insert,update,delete標記
);
create index idx_tbl_history_xid on tbl_history (xid_num);
create index idx_tbl_history_uid on tbl_history (uid);
流水表具備時序屬性,使用BRIN索引是最好的。
create index idx_tbl_history_time on tbl_history using brin(crt_time);
3、(可選)流水表可以使用分區表,因為流水表具備時序屬性。
create table tbl_history (
xid_num int8, -- 作業號
uid int,
crt_time timestamp default now(),
audit_old hstore,
audit_new hstore,
tag text
)
partition by range(crt_time);
do language plpgsql $$
declare
s1 date;
s2 date;
suffix text;
begin
for i in 1..60 loop
s1 := date '2017-06-01'+(i||' month ')::interval;
s2 := date '2017-06-01'+((i+1)||' month ')::interval;
suffix := to_char(s1,'yyyymm');
execute 'create table tbl_history_ptr_'||suffix||' partition of tbl_history for values from ('''||s1||''') to ('''||s2||''')';
end loop;
end;
$$;
4、業務上控製流水的寫入,或者使用數據庫自帶的rule或觸發器實現流水記錄的自動生成。
使用規則自動生成流水的例子(創建規則後,不支持insert on conflict語法)
create rule r1 as on insert to tbl do also insert into tbl_history (xid_num,uid,audit_new,tag) values (txid_current(),NEW.uid,hstore(NEW),'insert');
create rule r2 as on delete to tbl do also insert into tbl_history (xid_num,uid,audit_old,tag) values (txid_current(),OLD.uid,hstore(OLD),'delete');
create rule r3 as on update to tbl do also insert into tbl_history (xid_num,uid,audit_old,audit_new,tag) values (txid_current(),OLD.uid,hstore(OLD),hstore(NEW),'update');
使用觸發器自動生成流水的例子。
create or replace function ftg1() returns trigger as $$
declare
begin
insert into tbl_history (xid_num,uid,audit_new,tag) values (txid_current(),NEW.uid,hstore(NEW),'insert');
return null;
end;
$$ language plpgsql strict;
create or replace function ftg2() returns trigger as $$
declare
begin
insert into tbl_history (xid_num,uid,audit_old,tag) values (txid_current(),OLD.uid,hstore(OLD),'delete');
return null;
end;
$$ language plpgsql strict;
create or replace function ftg3() returns trigger as $$
declare
begin
insert into tbl_history (xid_num,uid,audit_old,audit_new,tag) values (txid_current(),OLD.uid,hstore(OLD),hstore(NEW),'update');
return null;
end;
$$ language plpgsql strict;
create trigger tg1 after insert on tbl for each row execute procedure ftg1();
create trigger tg2 after delete on tbl for each row execute procedure ftg2();
create trigger tg3 after update on tbl for each row execute procedure ftg3();
5、壓測,生成賬號,扣減資金
\set uid random(1,10000000)
insert into tbl (uid,balance,crt_time) values (:uid, 100000, now()) on conflict (uid) do update set balance=tbl.balance+(random()*100)::int-50,mod_time=now();
壓測
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 120
6、查詢某個賬號過去某個時間點的狀態
postgres=# select * from tbl_history limit 10;
xid_num | uid | crt_time | audit_old | audit_new | tag
------------+---------+----------------------------+-----------+--------------------------------------------------------------------------------------------------------------------+--------
2833936976 | 6301000 | 2017-07-05 18:58:33.014571 | | "uid"=>"6301000", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.014571", "mod_time"=>NULL | insert
2833936980 | 6082888 | 2017-07-05 18:58:33.015117 | | "uid"=>"6082888", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.015117", "mod_time"=>NULL | insert
2833936981 | 941218 | 2017-07-05 18:58:33.015222 | | "uid"=>"941218", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.015222", "mod_time"=>NULL | insert
2833936977 | 1400395 | 2017-07-05 18:58:33.014793 | | "uid"=>"1400395", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.014793", "mod_time"=>NULL | insert
2833936979 | 1298648 | 2017-07-05 18:58:33.014791 | | "uid"=>"1298648", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.014791", "mod_time"=>NULL | insert
2833936985 | 5278098 | 2017-07-05 18:58:33.017009 | | "uid"=>"5278098", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.017009", "mod_time"=>NULL | insert
2833936978 | 9522366 | 2017-07-05 18:58:33.014795 | | "uid"=>"9522366", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.014795", "mod_time"=>NULL | insert
2833936986 | 9902071 | 2017-07-05 18:58:33.017085 | | "uid"=>"9902071", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.017085", "mod_time"=>NULL | insert
2833936982 | 5473115 | 2017-07-05 18:58:33.015527 | | "uid"=>"5473115", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.015527", "mod_time"=>NULL | insert
2833936988 | 8698002 | 2017-07-05 18:58:33.017249 | | "uid"=>"8698002", "balance"=>"100000", "xid_num"=>NULL, "crt_time"=>"2017-07-05 18:58:33.017249", "mod_time"=>NULL | insert
(10 rows)
select * from tbl_history where xid_num in (
select xid_num from tbl_history where uid=? and crt_time between ? and ?
);
結合OSS的設計
由於流水數據是曆史數據,隨著時間越來越久,數據會越來越冷,查詢幾率會越來越低。
如果所有的數據都放在數據庫中,成本是比較高的,除非你不在乎這個成本。
阿裏雲RDS PostgreSQL和雲OSS可以深度整合,使用RDS PG的OSS_FDW外部表,用戶的流水數據可以存入OSS,而通過RDS PG可以無縫的查詢。
例如,我們將一年前的數據定義為冷數據,將一年前的數據通過oss_fdw外部表接口寫入OSS,然後將RDS PG本地對應的數據刪掉,釋放空間。
當用戶需要查詢一年前的冷數據時,通過OSS_FDW定義的外部表即可查詢。(用法和SQL查詢普通表一樣)。
OSS_FDW的用法參考
https://help.aliyun.com/document_detail/44461.html
一個簡單的DEMO
# 創建插件
create extension oss_fdw;
# 創建 server
CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS
(host 'oss-cn-hangzhou.aliyuncs.com' , id 'xxx', key 'xxx',bucket 'mybucket');
# 創建 oss 外部表的定義
CREATE FOREIGN TABLE ossexample
(date text, time text, open float,
high float, low float, volume int)
SERVER ossserver
OPTIONS ( filepath 'osstest/example.csv', delimiter ',' ,
format 'csv', encoding 'utf8', PARSE_ERRORS '100');
# 查詢外部表
select * from ossexample where .....;
例子2
例子2,使用數據庫自帶的流水,例如MySQL數據庫的binlog,或者PostgreSQL數據庫的WAL日誌,都存儲了數據變更前後,插入時,刪除時的記錄。
MYSQL用戶場景
MySQL用戶,在數據庫僅僅存儲賬戶的最終狀態,通過binlog將用戶insert\update\delete等產生的日誌數據解出來,作為流水日誌數據。
流水日誌數據寫入OSS,通過RDS PG對接OSS,即可實現流水數據從MySQL到RDS PG的對接。
RDS PG實例作為SQL查詢接口,用戶就可以愉快的查詢任何時間點的數據了。
使用RDS PG的好處是可以兼容SQL語法,同時PG在數據分析方麵的能力非常強,例如:
1、有地表最強SQL標準支持,地表最強ORACLE兼容性。
2、支持多維分析語法(grouping sets, cube, rollup),遞歸查詢語法,科學計算函數庫,多核並行,向量計算,JIT,哈希JOIN,MERGE JOIN等。
3、支持並行的讀寫OSS。
4、支持數組、JSON、KV、地理位置、全文檢索等擴展數據類型。
5、支持9種索引,加速幾乎任何一種數據類型的查詢。
RDS PG可以幫助業務實現更多的場景需求。
小結
對接OSS,使得用戶可以廉價的存儲數據庫的binlog流水。
OSS和RDS PG對接,使得用戶可以使用通用的SQL語法,分析流水數據。
同時用戶還可以享受RDS PG帶來的額外特性,包括OLAP分析能力,更強大的語法支持,更強大的計算能力等。
參考
《PostgreSQL 海量時序數據(任意滑動窗口實時統計分析) - 傳感器、人群、物體等對象跟蹤》
最後更新:2017-07-07 00:02:23