雲端海量任務調度係統數據庫設計 - 阿裏雲RDS PostgreSQL案例
標簽
PostgreSQL , 任務調度係統 , 數據庫設計 , schemaless
背景
任務調度係統中的任務狀態管理,通常會用到數據庫來存儲任務調度的過程狀態,控製任務的鎖等。
如果是小量任務,是挺好實現的,但是每小時處理幾十億或者幾億的任務,如何設計這樣的任務狀態管理數據庫呢?
挑戰
對於一個麵向多個用戶的任務調度平台(例如雲端的任務調度平台,將麵向所有租戶使用)。
較大的挑戰是任務數據的寫入(海量),另一個是任務狀態的更新(海量,每個任務至少被更新一次)。
雲端海量任務調度數據庫設計
雲端任務調度存在一些特性:
1、用戶和用戶之間的任務是沒有關係的,單個用戶的任務在調度時可能有依賴關係。
2、數據量龐大。
3、任務通常都有最終穩定狀態,穩定後,對應的任務記錄就不會變化了。
針對以上幾個特點,采樣PostgreSQL設計:
1、任務數據生成後寫入任務處理表
2、任務處理表使用rotate設計(例如每小時一個rotate表),處理完的數據直接清除,不需要VACUUM。
3、分區方麵,任務處理表采樣用戶級分區,在獲取需要處理的任務時更加的精煉(減少冗餘掃描)。
4、當任務達到最終狀態時,從任務運行表刪除,寫入曆史表。
5、早期的曆史表,從RDS PG中刪除,寫入阿裏雲OSS,使用RDS PG OSS外部表接口可以訪問到這些曆史數據。
DEMO設計
1、初始任務表,用於存儲用戶生成的任務。
create table task_init ( -- 任務初始表
uid int, -- 用戶id
ptid serial8, -- 父任務id
tid serial, -- 子任務ID
state int default 1, -- 任務狀態,1表示初始狀態,-1表示正在處理, 0表示處理結束
retry int default -1, -- 重試次數
info text, -- 其他信息
ts timestamp -- 時間
);
2、任務曆史表,用於存儲任務的最終狀態。
create table task_hist ( -- 任務曆史表
uid int, -- 用戶id
ptid int8, -- 父任務id
tid int, -- 子任務ID
state int default 1, -- 任務狀態,1表示初始狀態,-1表示正在處理, 0表示處理結束
retry int default -1, -- 重試次數
info text, -- 其他信息
ts timestamp -- 時間
);
3、為了簡化測試,按用戶ID進行分區。(前麵提到的rotate設計,多級分區設計,請參考本文末尾的文章)
do language plpgsql $$
declare
begin
for i in 1..1000 loop
execute 'create table task_init_'||i||' ( like task_init including all)';
execute 'create table task_hist_'||i||' ( like task_hist including all)';
end loop;
end;
$$;
4、為了測試方便,使用schemaless的設計,將用戶任務的初始數據生成寫入放在PLPGSQL邏輯中。
create or replace function ins_task_init(
uid int,
info text,
ts timestamp
) returns void as $$
declare
target name;
begin
target := format('%I', 'task_init_'||uid);
execute format('insert into %I (uid,info,ts) values (%L,%L,%L)', target, uid,info,ts);
end;
$$ language plpgsql strict;
5、運行任務,分為幾個步驟。
5.1、從任務表讀取任務。
5.2、用戶執行任務。
5.3、反饋執行的結果,不成功的任務更新task_init表,對於執行成功(並結束)的任務,數據從task_init遷移到task_hist。
為了測試數據庫的性能,我講這三步的邏輯寫到plpgsql裏麵。同時使用delete limit的特性,一次批量取出若幹條任務。
這裏使用CTID行號定位,達到最佳的性能。不僅免去了索引的使用,而且性能更佳。
這裏使用了advisory lock,使得單個用戶不會出現並行任務。(實際業務中,可以並行。)
這裏沒有測試更新狀態,task_init還有少量更新(相比insert和delete,比例很少,可以忽略),比如任務失敗的情況。
關閉task_init表的autovacuum,采用rotate的形式進行處理。
create or replace function run_task(
uid int,
batch int
) returns void as $$
declare
target1 name;
target2 name;
begin
target1 := format('%I', 'task_init_'||uid);
target2 := format('%I', 'task_hist_'||uid);
execute format('with t1 as (select ctid from %I where pg_try_advisory_xact_lock(%L) limit %s) , t2 as (delete from %I where ctid = any (array(select ctid from t1)) returning *) insert into %I select * from t2;', target1, uid, batch, target1, target2);
end;
$$ language plpgsql strict;
6、測試分解動作。
寫入初始任務
postgres=# select ins_task_init(1,'test',now()::timestamp);
ins_task_init
---------------
(1 row)
postgres=# select ins_task_init(1,'test',now()::timestamp);
ins_task_init
---------------
(1 row)
運行任務
postgres=# select run_task(1,100);
run_task
----------
(1 row)
查看任務是否結束並遷移到曆史表
postgres=# select * from task_init_1;
uid | ptid | tid | state | retry | info | ts
-----+------+-----+-------+-------+------+----
(0 rows)
postgres=# select * from task_hist_1;
uid | ptid | tid | state | retry | info | ts
-----+------+-----+-------+-------+------+----------------------------
1 | 1 | 1 | 1 | -1 | test | 2017-07-20 15:26:32.739766
1 | 2 | 2 | 1 | -1 | test | 2017-07-20 15:26:33.233469
(2 rows)
性能壓測
1、生成任務的性能
vi ins.sql
\set uid random(1,1000)
select ins_task_init(:uid,'test',now()::timestamp);
pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 32 -j 32 -T 120
query mode: prepared
number of clients: 64
number of threads: 64
duration: 360 s
number of transactions actually processed: 86074880
latency average = 0.268 ms
latency stddev = 0.295 ms
tps = 239079.558174 (including connections establishing)
tps = 239088.708200 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set uid random(1,1000)
0.267 select ins_task_init(:uid,'test',now()::timestamp);
postgres=# select count(*) from task_init_1;
count
-------
88861
(1 row)
postgres=# select count(*) from task_init_2;
count
-------
88196
(1 row)
....
postgres=# select count(*) from task_init_1000;
count
-------
88468
(1 row)
2、運行任務的性能(一次批量取10000條任務)
vi run.sql
\set uid random(1,1000)
select run_task(:uid,10000);
pgbench -M prepared -n -r -P 1 -f ./run.sql -c 32 -j 32 -T 120
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 3294
latency average = 1171.228 ms
latency stddev = 361.056 ms
tps = 27.245606 (including connections establishing)
tps = 27.247560 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.003 \set uid random(1,1000)
1171.225 select run_task(:uid,10000);
postgres=# select count(*) from task_init_1000;
count
-------
18468
(1 row)
postgres=# select count(*) from task_hist_1000;
count
--------
224207
(1 row)
單獨的測試數據
1、生成任務,23.9萬條/s
2、消耗任務,27.2萬條/s
生成與消耗任務同時運行的測試數據
1、生成任務,16.8萬條/s
2、消耗任務,大於16.8萬條/s
沒有任何任務堆積。
小結
PostgreSQL在雲端海量任務調度係統中,發揮了重要的作用。
單個PostgreSQL實例,已經可以處理每個小時 的任務生成,以及 的任務消耗。
任務調度係統比MQ更加複雜,類似MQ的超集,所以用戶如果有MQ的需求,實際上使用RDS PostgreSQL也是可以的。性能指標比上麵的測試更好。
參考
《PostgreSQL schemaless 的實現(類mongodb collection)》
《行為、審計日誌 (實時索引/實時搜索)建模 - 最佳實踐 2》
《在PostgreSQL中實現update | delete limit》
《塊級(ctid)掃描在IoT(物聯網)極限寫和消費讀並存場景的應用》
《PostgreSQL 10.0 preview 功能增強 - 內置分區表》
《PostgreSQL 9.5+ 高效分區表實現 - pg_pathman》
《PostgreSQL 數據rotate用法介紹 - 按時間覆蓋曆史數據》
最後更新:2017-07-21 11:03:18