閱讀973 返回首頁    go 阿裏雲 go 技術社區[雲棲]


雲端海量任務調度係統數據庫設計 - 阿裏雲RDS PostgreSQL案例

標簽

PostgreSQL , 任務調度係統 , 數據庫設計 , schemaless


背景

任務調度係統中的任務狀態管理,通常會用到數據庫來存儲任務調度的過程狀態,控製任務的鎖等。

《advisory lock 實現高並發非堵塞式 業務鎖》

如果是小量任務,是挺好實現的,但是每小時處理幾十億或者幾億的任務,如何設計這樣的任務狀態管理數據庫呢?

挑戰

對於一個麵向多個用戶的任務調度平台(例如雲端的任務調度平台,將麵向所有租戶使用)。

較大的挑戰是任務數據的寫入(海量),另一個是任務狀態的更新(海量,每個任務至少被更新一次)。

雲端海量任務調度數據庫設計

pic

雲端任務調度存在一些特性:

1、用戶和用戶之間的任務是沒有關係的,單個用戶的任務在調度時可能有依賴關係。

2、數據量龐大。

3、任務通常都有最終穩定狀態,穩定後,對應的任務記錄就不會變化了。

針對以上幾個特點,采樣PostgreSQL設計:

1、任務數據生成後寫入任務處理表

2、任務處理表使用rotate設計(例如每小時一個rotate表),處理完的數據直接清除,不需要VACUUM。

3、分區方麵,任務處理表采樣用戶級分區,在獲取需要處理的任務時更加的精煉(減少冗餘掃描)。

4、當任務達到最終狀態時,從任務運行表刪除,寫入曆史表。

5、早期的曆史表,從RDS PG中刪除,寫入阿裏雲OSS,使用RDS PG OSS外部表接口可以訪問到這些曆史數據。

DEMO設計

1、初始任務表,用於存儲用戶生成的任務。

create table task_init (    -- 任務初始表  
  uid  int,        -- 用戶id  
  ptid serial8,    -- 父任務id  
  tid serial,      -- 子任務ID    
  state int default 1,    -- 任務狀態,1表示初始狀態,-1表示正在處理, 0表示處理結束    
  retry int default -1,   -- 重試次數    
  info text,              -- 其他信息    
  ts timestamp            -- 時間     
);     

2、任務曆史表,用於存儲任務的最終狀態。

create table task_hist (    -- 任務曆史表  
  uid  int,   -- 用戶id  
  ptid int8,  -- 父任務id  
  tid int,    -- 子任務ID    
  state int default 1,    -- 任務狀態,1表示初始狀態,-1表示正在處理, 0表示處理結束    
  retry int default -1,   -- 重試次數    
  info text,              -- 其他信息    
  ts timestamp      -- 時間      
);     

3、為了簡化測試,按用戶ID進行分區。(前麵提到的rotate設計,多級分區設計,請參考本文末尾的文章)

do language plpgsql $$  
declare  
begin  
  for i in 1..1000 loop  
    execute 'create table task_init_'||i||' ( like task_init including all)';  
    execute 'create table task_hist_'||i||' ( like task_hist including all)';  
  end loop;  
end;  
$$;  

4、為了測試方便,使用schemaless的設計,將用戶任務的初始數據生成寫入放在PLPGSQL邏輯中。

create or replace function ins_task_init(  
  uid int,  
  info text,  
  ts timestamp  
)  returns void as $$  
declare  
  target name;  
begin  
  target := format('%I', 'task_init_'||uid);  
  execute format('insert into %I (uid,info,ts) values (%L,%L,%L)', target, uid,info,ts);  
end;  
$$ language plpgsql strict;  

5、運行任務,分為幾個步驟。

5.1、從任務表讀取任務。

5.2、用戶執行任務。

5.3、反饋執行的結果,不成功的任務更新task_init表,對於執行成功(並結束)的任務,數據從task_init遷移到task_hist。

為了測試數據庫的性能,我講這三步的邏輯寫到plpgsql裏麵。同時使用delete limit的特性,一次批量取出若幹條任務。

這裏使用CTID行號定位,達到最佳的性能。不僅免去了索引的使用,而且性能更佳。

這裏使用了advisory lock,使得單個用戶不會出現並行任務。(實際業務中,可以並行。)

這裏沒有測試更新狀態,task_init還有少量更新(相比insert和delete,比例很少,可以忽略),比如任務失敗的情況。

關閉task_init表的autovacuum,采用rotate的形式進行處理。

create or replace function run_task(  
  uid int,  
  batch int  
) returns void as $$  
declare  
  target1 name;  
  target2 name;  
begin  
  target1 := format('%I', 'task_init_'||uid);  
  target2 := format('%I', 'task_hist_'||uid);  
  execute format('with t1 as (select ctid from %I where pg_try_advisory_xact_lock(%L) limit %s) , t2 as (delete from %I where ctid = any (array(select ctid from t1)) returning *)  insert into %I select * from t2;', target1, uid, batch, target1, target2);  
end;  
$$ language plpgsql strict;  

6、測試分解動作。

寫入初始任務  
  
postgres=# select ins_task_init(1,'test',now()::timestamp);  
 ins_task_init   
---------------  
   
(1 row)  
  
postgres=# select ins_task_init(1,'test',now()::timestamp);  
 ins_task_init   
---------------  
   
(1 row)  
  
運行任務  
  
postgres=# select run_task(1,100);  
 run_task   
----------  
   
(1 row)  
  
查看任務是否結束並遷移到曆史表  
  
postgres=# select * from task_init_1;  
 uid | ptid | tid | state | retry | info | ts   
-----+------+-----+-------+-------+------+----  
(0 rows)  
  
postgres=# select * from task_hist_1;  
 uid | ptid | tid | state | retry | info |             ts               
-----+------+-----+-------+-------+------+----------------------------  
   1 |    1 |   1 |     1 |    -1 | test | 2017-07-20 15:26:32.739766  
   1 |    2 |   2 |     1 |    -1 | test | 2017-07-20 15:26:33.233469  
(2 rows)  

性能壓測

1、生成任務的性能

vi ins.sql  
\set uid random(1,1000)  
select ins_task_init(:uid,'test',now()::timestamp);   
  
pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 32 -j 32 -T 120  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 360 s  
number of transactions actually processed: 86074880  
latency average = 0.268 ms  
latency stddev = 0.295 ms  
tps = 239079.558174 (including connections establishing)  
tps = 239088.708200 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set uid random(1,1000)  
         0.267  select ins_task_init(:uid,'test',now()::timestamp);  
  
postgres=# select count(*) from task_init_1;  
 count   
-------  
 88861  
(1 row)  
  
postgres=# select count(*) from task_init_2;  
 count   
-------  
 88196  
(1 row)  
  
....  
  
postgres=# select count(*) from task_init_1000;  
 count   
-------  
 88468  
(1 row)  

2、運行任務的性能(一次批量取10000條任務)

vi run.sql  
\set uid random(1,1000)  
select run_task(:uid,10000);  
  
pgbench -M prepared -n -r -P 1 -f ./run.sql -c 32 -j 32 -T 120  
  
query mode: prepared  
number of clients: 32  
number of threads: 32  
duration: 120 s  
number of transactions actually processed: 3294  
latency average = 1171.228 ms  
latency stddev = 361.056 ms  
tps = 27.245606 (including connections establishing)  
tps = 27.247560 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.003  \set uid random(1,1000)  
      1171.225  select run_task(:uid,10000);  
  
postgres=# select count(*) from task_init_1000;  
 count   
-------  
 18468  
(1 row)  
  
postgres=# select count(*) from task_hist_1000;  
 count    
--------  
 224207  
(1 row)  

單獨的測試數據

1、生成任務,23.9萬條/s

2、消耗任務,27.2萬條/s

生成與消耗任務同時運行的測試數據

1、生成任務,16.8萬條/s

2、消耗任務,大於16.8萬條/s

沒有任何任務堆積。

小結

PostgreSQL在雲端海量任務調度係統中,發揮了重要的作用。

單個PostgreSQL實例,已經可以處理每個小時 的任務生成,以及 的任務消耗。

任務調度係統比MQ更加複雜,類似MQ的超集,所以用戶如果有MQ的需求,實際上使用RDS PostgreSQL也是可以的。性能指標比上麵的測試更好。

參考

《advisory lock 實現高並發非堵塞式 業務鎖》

《PostgreSQL schemaless 的實現(類mongodb collection)》

《行為、審計日誌 (實時索引/實時搜索)建模 - 最佳實踐 2》

《在PostgreSQL中實現update | delete limit》

《塊級(ctid)掃描在IoT(物聯網)極限寫和消費讀並存場景的應用》

《PostgreSQL 10.0 preview 功能增強 - 內置分區表》

《PostgreSQL 9.5+ 高效分區表實現 - pg_pathman》

《PostgreSQL 數據rotate用法介紹 - 按時間覆蓋曆史數據》

最後更新:2017-07-21 11:03:18

  上一篇:go  JEESZ分布式架構集成阿裏雲OSS存儲
  下一篇:go  開發者論壇一周精粹(第三期):淘寶商家專場官方應用--讓一個人頂N個人