閱讀874 返回首頁    go 阿裏雲 go 技術社區[雲棲]


(流式、lambda、實時)數據處理方案大比拚 - 物聯網(IoT)最佳實踐

標簽

PostgreSQL , 物聯網 , 傳感器 , lambda , 調度 , 實時 , 流式更新 , UPSERT , insert on conflict do update


背景

越來越多的數據要求實時的分析、聚合、展示最新值、展示異常值、實時的搜索。

例如 金融數據、物聯網傳感器的數據、網絡遊戲的在線數據等等。

pic

關於實時搜索,可以參考這篇最佳實踐:

《行為、審計日誌 實時索引/實時搜索 - 最佳實踐》

關於海量數據的"寫入、共享、存儲、計算",以及離線分析,則可以參考這篇最佳實踐:

《海量數據 "寫入、共享、存儲、計算" - 最佳實踐》

關於實時分析、實時更新、實時聚合、實時展示最新值、異常值,是本文的主要內容。

提起實時分析,不得不說流式計算,用戶可以參考本文:

《流計算風雲再起 - PostgreSQL攜PipelineDB力挺IoT》

pipelinedb是一個SQL接口的流計算數據庫,正在進行插件化的改造,未來可以作為PostgreSQL數據庫的插件使用。

本文將以傳感器數據的實時寫入、實時更新最新值、實時統計為例,分析三種不同的方案(流式、lambda式、同步實時)的優缺點。

場景設計

有一百萬個傳感器,每個傳感器定期上報數據,用戶需求:

1. 實時的查看傳感器的最新值,

2. 實時按時間段查看傳感器曆史數據的統計值。

3. 實時查看傳感器的曆史明細數據。

4. 實時按其他維度查看傳感器曆史數據的統計值。

由於數據量可能非常龐大(100TB級),為了實現這4個需求,要求統計數據需要實時或準實時的被計算出來。

表結構設計

明細數據

create table sensor_data(  
  pk serial8 primary key, -- 主鍵  
  ts timestamp,  -- 時間戳  
  sid int,  -- 傳感器ID  
  val numeric(10,2)  -- 數據  
);  

實時聚合設計

1. 每個傳感器最後的value

create table sensor_lastdata(  
  sid int primary key,  -- 傳感器ID,主鍵  
  last_ts timestamp,  -- 時間戳  
  last_val numeric(10,2)  -- 值  
);  

2. 每個傳感器每個時段(例如小時)的所有值,總和,記錄數,最大值,最小值,平均值,方差。

create table sensor_aggdata(  
  sid int,  -- 傳感器ID  
  ts_group varchar(10),  -- 時間維度分組,例如小時(yyyymmddhh24)  
  sum_val numeric,  -- 和  
  min_val numeric(10,2),  -- 最小值  
  max_val numeric(10,2),  -- 最大值  
  avg_val numeric(10,2),  -- 平均值  
  count_val int,  -- 計數  
  all_vals numeric(10,2)[],  -- 明細值  
  unique (sid,ts_group)  -- 唯一約束  
);  

3. 按地域或其他維度,實時統計傳感器上報的數據

如何從明細數據取傳感器的最新值

取出每個傳感器ID的最新值。使用SQL來取,有兩種方法,一種是聚合,另一種是窗口函數。

插入一批測試數據

postgres=#  insert into sensor_data(ts,sid,val) select clock_timestamp(), random()*100, random()*10000 from generate_series(1,100000);  

方法1,聚合。

按SID分組,將VAL聚合為數組(按PK逆序排序),取數組的第一個VALUE。

參考用法:https://www.postgresql.org/docs/9.6/static/functions-aggregate.html

postgres=#  select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from sensor_data group by sid;  
 sid |          last_ts           | last_val   
-----+----------------------------+----------  
   0 | 2017-05-18 14:09:10.625812 |  6480.54  
   1 | 2017-05-18 14:09:10.627607 |  9644.29  
   2 | 2017-05-18 14:09:10.627951 |  3995.04  
   3 | 2017-05-18 14:09:10.627466 |   840.80  
   4 | 2017-05-18 14:09:10.627703 |  1500.59  
   5 | 2017-05-18 14:09:10.627813 |  3109.42  
   6 | 2017-05-18 14:09:10.62754  |  4131.31  
   7 | 2017-05-18 14:09:10.627851 |  9333.88  
......  

方法2,窗口。

postgres=# select sid,ts,val from (select sid,ts,val,row_number() over(partition by sid order by pk desc) as rn from sensor_data) t where rn=1;  
 sid |             ts             |   val     
-----+----------------------------+---------  
   0 | 2017-05-18 14:09:10.625812 | 6480.54  
   1 | 2017-05-18 14:09:10.627607 | 9644.29  
   2 | 2017-05-18 14:09:10.627951 | 3995.04  
   3 | 2017-05-18 14:09:10.627466 |  840.80  
   4 | 2017-05-18 14:09:10.627703 | 1500.59  
   5 | 2017-05-18 14:09:10.627813 | 3109.42  
   6 | 2017-05-18 14:09:10.62754  | 4131.31  
   7 | 2017-05-18 14:09:10.627851 | 9333.88  
......  

這兩種方法哪種好一點呢?請看執行計劃

postgres=# set work_mem ='16MB';  
SET  
postgres=# explain (analyze,verbose,timing,costs,buffers) select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from sensor_data group by sid;  
                                                             QUERY PLAN                                                               
------------------------------------------------------------------------------------------------------------------------------------  
 GroupAggregate  (cost=7117.15..7823.57 rows=101 width=44) (actual time=29.628..88.095 rows=101 loops=1)  
   Output: sid, (array_agg(ts ORDER BY pk DESC))[1], (array_agg(val ORDER BY pk DESC))[1]  
   Group Key: sensor_data.sid  
   Buffers: shared hit=736  
   ->  Sort  (cost=7117.15..7293.38 rows=70490 width=26) (actual time=29.273..36.249 rows=70490 loops=1)  
         Output: sid, ts, pk, val  
         Sort Key: sensor_data.sid  
         Sort Method: quicksort  Memory: 8580kB  
         Buffers: shared hit=736  
         ->  Seq Scan on public.sensor_data  (cost=0.00..1440.90 rows=70490 width=26) (actual time=0.243..9.768 rows=70490 loops=1)  
               Output: sid, ts, pk, val  
               Buffers: shared hit=736  
 Planning time: 0.077 ms  
 Execution time: 88.489 ms  
(14 rows)  
  
postgres=# explain (analyze,verbose,timing,costs,buffers) select sid,ts,val from (select sid,ts,val,row_number() over(partition by sid order by pk desc) as rn from sensor_data) t where rn=1;  
                                                                QUERY PLAN                                                                  
------------------------------------------------------------------------------------------------------------------------------------------  
 Subquery Scan on t  (cost=7117.15..9408.08 rows=352 width=18) (actual time=46.074..81.377 rows=101 loops=1)  
   Output: t.sid, t.ts, t.val  
   Filter: (t.rn = 1)  
   Rows Removed by Filter: 70389  
   Buffers: shared hit=736  
   ->  WindowAgg  (cost=7117.15..8526.95 rows=70490 width=34) (actual time=46.072..76.115 rows=70490 loops=1)  
         Output: sensor_data.sid, sensor_data.ts, sensor_data.val, row_number() OVER (?), sensor_data.pk  
         Buffers: shared hit=736  
         ->  Sort  (cost=7117.15..7293.38 rows=70490 width=26) (actual time=46.065..51.742 rows=70490 loops=1)  
               Output: sensor_data.sid, sensor_data.pk, sensor_data.ts, sensor_data.val  
               Sort Key: sensor_data.sid, sensor_data.pk DESC  
               Sort Method: quicksort  Memory: 8580kB  
               Buffers: shared hit=736  
               ->  Seq Scan on public.sensor_data  (cost=0.00..1440.90 rows=70490 width=26) (actual time=0.245..9.863 rows=70490 loops=1)  
                     Output: sensor_data.sid, sensor_data.pk, sensor_data.ts, sensor_data.val  
                     Buffers: shared hit=736  
 Planning time: 0.100 ms  
 Execution time: 82.480 ms  
(18 rows)  

實時更新、統計設計

1. lambda

lambda方式,傳感器數據寫入明細表,以任務調度的方式,從明細表取出數據並刪除,將取出的數據進行增量統計,合並到統計結果中。

pic

統計維度可能較多,為了並行,剝離數據獲取和刪除部分的功能。

批量獲取並刪除明細數據,按pk排序,批量獲取若幹條。

函數如下:

create or replace function get_sensor_data(i_limit int) returns sensor_data[] as $$  
declare  
  arr_pk int8[];  
  arr_sensor_data sensor_data[];  
begin  
  select array_agg(t.sensor_data), array_agg((t.sensor_data).pk)  
    into arr_sensor_data, arr_pk  
    from (select sensor_data from sensor_data order by pk limit i_limit for update skip locked) t ;  
  delete from sensor_data WHERE pk = any (arr_pk);  
  return arr_sensor_data;  
end;  
$$ language plpgsql strict;  

明細數據獲取到之後,繼續下一步的動作。

存在則更新,不存在則插入,采用PostgreSQL的insert on conflict語法。

1. 實時更新傳感器的最新值

insert into sensor_lastdata  
  select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from   
    unnest(get_sensor_data(1000))   
  group by sid  
on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;  

2. 批量增量統計傳感器的值

統計值的合並方法請關注SQL內容,明細數據按SID聚合為數組按PK順序存放。

insert into sensor_aggdata (sid,ts_group,sum_val,min_val,max_val,avg_val,count_val,all_vals)  
select sid,to_char(ts,'yyyymmddhh24'),sum(val),min(val),max(val),avg(val),count(val),array_agg(val order by pk) from unnest(get_sensor_data(1000))   
  group by sid,to_char(ts,'yyyymmddhh24')  
  on conflict (sid,ts_group) do update set   
    sum_val=sensor_aggdata.sum_val+excluded.sum_val,  
    min_val=least(sensor_aggdata.min_val, excluded.min_val),  
    max_val=greatest(sensor_aggdata.max_val, excluded.max_val),  
    avg_val=(sensor_aggdata.sum_val+excluded.sum_val)/(sensor_aggdata.count_val+excluded.count_val),  
    count_val=sensor_aggdata.count_val+excluded.count_val,  
    all_vals=array_cat(sensor_aggdata.all_vals, excluded.all_vals);  

壓測

create table sensor_data(  
  pk serial8 primary key, -- 主鍵  
  ts timestamp,  -- 時間戳  
  sid int,  -- 傳感器ID  
  val numeric(10,2)  -- 數據  
);  
  
create table sensor_lastdata(  
  sid int primary key,  -- 傳感器ID,主鍵  
  last_ts timestamp,  -- 時間戳  
  last_val numeric(10,2)  -- 值  
);  
  
create table sensor_aggdata(  
  sid int,  -- 傳感器ID  
  ts_group varchar(10),  -- 時間維度分組,例如小時(yyyymmddhh24)  
  sum_val numeric,  -- 和  
  min_val numeric(10,2),  -- 最小值  
  max_val numeric(10,2),  -- 最大值  
  avg_val numeric(10,2),  -- 平均值  
  count_val int,  -- 計數  
  all_vals numeric(10,2)[],  -- 明細值  
  unique (sid,ts_group)  -- 唯一約束  
);  
壓測1,實時寫入,並實時更新傳感器的最新值
vi ins.sql  
\set sid random(1,1000000)  
insert into sensor_data(ts,sid,val) values (clock_timestamp(), :sid, random()*1000);  

每次合並5萬條

vi lambda1.sql  
insert into sensor_lastdata select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val from unnest(get_sensor_data(50000)) group by sid on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;  

寫入約10萬條/s。

pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 64 -j 64 -T 120  
  
transaction type: ./ins.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 120 s  
number of transactions actually processed: 12742596  
latency average = 0.603 ms  
latency stddev = 2.163 ms  
tps = 106184.095420 (including connections establishing)  
tps = 106188.650794 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set sid random(1,1000000)  
         0.602  insert into sensor_data(ts,sid,val) values (clock_timestamp(), :sid, random()*1000);  

增量消費,並更新最新值約5萬條/s。

pgbench -M prepared -n -r -P 1 -f ./lambda1.sql -c 1 -j 1 -T 1200  
  
progress: 236.0 s, 1.0 tps, lat 649.196 ms stddev 0.000  
progress: 237.0 s, 2.0 tps, lat 868.952 ms stddev 6.024  
progress: 238.0 s, 1.0 tps, lat 728.553 ms stddev 0.000  
progress: 239.0 s, 258.1 tps, lat 5.335 ms stddev 44.167  
progress: 240.0 s, 850.9 tps, lat 0.983 ms stddev 14.506  
progress: 241.0 s, 7962.2 tps, lat 0.146 ms stddev 3.672  
progress: 242.0 s, 13488.1 tps, lat 0.074 ms stddev 0.006  
  
postgres=# select count(*) from sensor_data;  
 count   
-------  
     0  
(1 row)  
  
postgres=# select * from sensor_lastdata  limit 10;  
 sid  |          last_ts           | last_val   
------+----------------------------+----------  
  672 | 2017-05-18 16:33:43.569255 |   196.01  
  178 | 2017-05-18 16:33:31.23651  |   593.16  
  686 | 2017-05-18 16:33:38.792138 |   762.95  
 4906 | 2017-05-18 16:33:43.498217 |   150.13  
  544 | 2017-05-18 16:33:45.338635 |   410.31  
  165 | 2017-05-18 16:33:28.393902 |   678.75  
  625 | 2017-05-18 16:33:37.077898 |   229.06  
 1316 | 2017-05-18 16:33:45.218268 |    27.55  
 3091 | 2017-05-18 16:33:33.320828 |   697.75  
  340 | 2017-05-18 16:33:31.567852 |    24.18  
(10 rows)  

每批統計10萬時,性能可以略微提升

progress: 211.0 s, 1.0 tps, lat 1428.401 ms stddev 0.000  
progress: 212.0 s, 0.0 tps, lat -nan ms stddev -nan  
progress: 213.0 s, 1.0 tps, lat 1375.766 ms stddev 0.000  
progress: 214.0 s, 2665.9 tps, lat 0.699 ms stddev 23.234  
progress: 215.0 s, 8963.1 tps, lat 0.083 ms stddev 0.008  
progress: 216.0 s, 1699.4 tps, lat 0.741 ms stddev 12.434  
progress: 217.0 s, 13247.9 tps, lat 0.075 ms stddev 0.006  
壓測2,實時寫入,並批量增量統計傳感器的值

每次合並10萬條

vi lambda2.sql  
insert into sensor_aggdata (sid,ts_group,sum_val,min_val,max_val,avg_val,count_val,all_vals) select sid,to_char(ts,'yyyymmddhh24'),sum(val),min(val),max(val),avg(val),count(val),array_agg(val order by pk) from unnest(get_sensor_data(100000))   group by sid,to_char(ts,'yyyymmddhh24')  on conflict (sid,ts_group) do update set     sum_val=sensor_aggdata.sum_val+excluded.sum_val,    min_val=least(sensor_aggdata.min_val, excluded.min_val),    max_val=greatest(sensor_aggdata.max_val, excluded.max_val),    avg_val=(sensor_aggdata.sum_val+excluded.sum_val)/(sensor_aggdata.count_val+excluded.count_val),    count_val=sensor_aggdata.count_val+excluded.count_val,    all_vals=array_cat(sensor_aggdata.all_vals, excluded.all_vals);  

寫入約10萬條/s。

pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 64 -j 64 -T 120  
  
transaction type: ./ins.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 120 s  
number of transactions actually processed: 12753950  
latency average = 0.602 ms  
latency stddev = 2.733 ms  
tps = 106272.985233 (including connections establishing)  
tps = 106277.604416 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set sid random(1,1000000)  
         0.601  insert into sensor_data(ts,sid,val) values (clock_timestamp(), :sid, random()*1000);  

增量消費,並統計約4.4萬條/s。

pgbench -M prepared -n -r -P 1 -f ./lambda2.sql -c 1 -j 1 -T 1200  
  
progress: 287.0 s, 1.0 tps, lat 2107.584 ms stddev 0.000  
progress: 288.0 s, 0.0 tps, lat -nan ms stddev -nan  
progress: 289.0 s, 100.1 tps, lat 29.854 ms stddev 213.634  
progress: 290.0 s, 1855.0 tps, lat 0.540 ms stddev 5.677  
progress: 291.0 s, 8447.0 tps, lat 0.118 ms stddev 0.005  
  
postgres=# select * from sensor_aggdata limit 10;  
  sid   |  ts_group  | sum_val  | min_val | max_val | avg_val | count_val |                                                                      all_vals                                                                        
--------+------------+----------+---------+---------+---------+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------  
      6 | 2017051816 |  1842.71 |   42.47 |  577.09 |  307.12 |         6 | {42.47,559.47,577.09,193.62,75.74,394.32}  
      2 | 2017051816 |  5254.01 |   69.98 |  861.77 |  437.83 |        12 | {628.03,77.15,662.74,69.98,337.83,563.70,750.44,423.81,158.27,861.77,649.27,71.02}  
    226 | 2017051816 |  2756.42 |  144.00 |  680.45 |  344.55 |         8 | {350.57,144.00,194.23,352.52,680.45,302.66,420.01,311.98}  
    509 | 2017051816 |  6235.10 |   44.98 |  939.43 |  566.83 |        11 | {939.43,598.33,741.12,535.66,44.98,732.00,694.66,440.00,327.80,312.98,868.14}  
     20 | 2017051816 |  4684.00 |    7.01 |  878.64 |  425.82 |        11 | {209.70,288.67,76.35,544.31,289.33,7.01,841.21,878.64,418.05,651.01,479.72}  
 934042 | 2017051816 | 10210.41 |   46.44 |  945.59 |  486.21 |        21 | {235.86,656.24,450.73,945.59,932.06,256.10,46.44,903.74,694.43,713.79,523.25,325.82,333.67,603.01,743.63,137.48,238.60,321.65,466.50,70.49,611.33}  
    960 | 2017051816 |  3621.60 |   20.59 |  895.01 |  603.60 |         6 | {347.70,876.07,895.01,20.59,871.64,610.59}  
     81 | 2017051816 |  4209.38 |  459.06 |  949.42 |  701.56 |         6 | {716.38,949.42,706.20,459.06,613.36,764.96}  
 723065 | 2017051816 |  7176.00 |   12.37 |  983.84 |  512.57 |        14 | {869.29,715.48,323.42,595.29,983.84,700.06,716.37,741.55,137.88,12.37,334.74,951.94,46.85,46.92}  
     77 | 2017051816 |  5394.54 |   87.43 |  872.90 |  490.41 |        11 | {301.87,777.52,872.90,219.96,87.43,525.80,308.87,509.80,383.90,608.52,797.97}  
(10 rows)  

2. 流式計算

流式計算,使用PipelineDB,創建stream(即明細表),然後創建實時更新表,以及統計表。

pic

1. 創建傳感器明細數據stream。

create sequence seq;  -- 創建PK序列  
  
pipeline=# create stream sensor_data(  
pk int8, -- 用於排序,取最新值的PK  
ts timestamp, -- 時間戳  
sid int, -- 傳感器ID  
val numeric(10,2)  -- 值  
);  
  
CREATE STREAM  

2. 創建實時更新傳感器最新值的CONTINUOUS VIEW

-- pipelinedb目前對數組聚合支持不佳,測試寫入時報錯  
CREATE CONTINUOUS VIEW sensor_lastdata1 AS   
  select sid, (array_agg(ts order by pk desc))[1] as last_ts, (array_agg(val order by pk desc))[1] as last_val   
    from  sensor_data  
  group by sid;  
  
  
-- pipelinedb目前不支持window function  
CREATE CONTINUOUS VIEW sensor_lastdata2 AS   
  select sid,ts as last_ts,val as last_val from sensor_data  
  where row_number() over(partition by sid order by pk desc)=1;  
  
ERROR:  subqueries in continuous views cannot contain window functions  

3. 創建實時統計傳感器數值的CONTINUOUS VIEW

-- pipelinedb目前對數組聚合支持不佳,測試寫入時報錯  
CREATE CONTINUOUS VIEW sensor_aggdata1 AS   
  select   
  sid,  
  to_char(ts,'yyyymmddhh24') as ts_group,  
  sum(val) as sum_val,  
  min(val) as min_val,  
  max(val) as max_val,  
  avg(val) as avg_val,  
  count(val) as count_val,  
  array_agg(val order by pk) as all_vals  
    from sensor_data  
  group by sid,to_char(ts,'yyyymmddhh24');  
  
-- 不聚合明細,隻進行統計  
CREATE CONTINUOUS VIEW sensor_aggdata2 AS   
  select   
  sid,  
  to_char(ts,'yyyymmddhh24') as ts_group,  
  sum(val) as sum_val,  
  min(val) as min_val,  
  max(val) as max_val,  
  avg(val) as avg_val,  
  count(val) as count_val  
    from sensor_data  
  group by sid,to_char(ts,'yyyymmddhh24');  

4. 激活CONTINUOUS VIEW

pipeline=# activate sensor_lastdata1;  
ACTIVATE  
pipeline=# activate sensor_aggdata1;  
ACTIVATE  
pipeline=# activate sensor_aggdata2;  
ACTIVATE  

壓測1

vi ins.sql  
  
\set sid random(1,1000000)  
insert into sensor_data(pk,ts,sid,val) values (nextval('seq'), clock_timestamp(), :sid, random()*1000);  

pipelinedb目前對數組\string_agg等聚合操作支持不佳。

/home/digoal/pgsql10/bin/pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 1 -j 1 -T 100  
  
progress: 1.0 s, 12.0 tps, lat 1.302 ms stddev 0.455  
WARNING:  a background worker crashed while processing this batch  
HINT:  Some of the tuples inserted in this batch might have been lost.  
progress: 2.0 s, 16.0 tps, lat 70.528 ms stddev 253.719  
WARNING:  a background worker crashed while processing this batch  
HINT:  Some of the tuples inserted in this batch might have been lost.  
WARNING:  a background worker crashed while processing this batch  
HINT:  Some of the tuples inserted in this batch might have been lost.  
WARNING:  a background worker crashed while processing this batch  
HINT:  Some of the tuples inserted in this batch might have been lost.  

壓測2

僅僅保留sensor_aggdata2,另外兩個帶數組統計的CONTINUOUS VIEW禁用掉。

pipeline=# deactivate sensor_lastdata1;  
DEACTIVATE  
pipeline=# deactivate sensor_aggdata1;  
DEACTIVATE  

壓測結果,寫入速度20萬/s。

/home/digoal/pgsql10/bin/pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 256 -j 256 -T 100  
  
transaction type: ./ins.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 256  
number of threads: 256  
duration: 100 s  
number of transactions actually processed: 20940292  
latency average = 1.222 ms  
latency stddev = 0.423 ms  
tps = 208834.531839 (including connections establishing)  
tps = 208854.792937 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set sid random(1,1000000)  
         1.222  insert into sensor_data(pk,ts,sid,val) values (nextval('seq'), clock_timestamp(), :sid, random()*1000);  
  
  
pipeline=# select * from sensor_aggdata2;  
 sid  |  ts_group  |   sum_val   | min_val | max_val |       avg_val        | count_val   
------+------------+-------------+---------+---------+----------------------+-----------  
  196 | 2017051815 | 11462397.00 |    0.00 |  999.99 | 503.1780948200175593 |     22780  
  833 | 2017051815 | 11479990.49 |    0.07 |  999.99 | 498.4365443730461966 |     23032  
  700 | 2017051815 | 11205820.52 |    0.04 |  999.97 | 497.1967574762623125 |     22538  
   83 | 2017051815 | 11466423.01 |    0.01 |  999.93 | 501.3959075604530150 |     22869  
  526 | 2017051815 | 11389541.40 |    0.01 |  999.99 | 503.4496485877204615 |     22623  
  996 | 2017051815 | 11416373.92 |    0.03 |  999.99 | 502.1938996172964413 |     22733  
  262 | 2017051815 | 11458700.05 |    0.03 |  999.98 | 499.5509656465254163 |     22938  
  542 | 2017051815 | 11365373.33 |    0.00 |  999.95 | 499.6427366246098387 |     22747  
......  

3. 實時

實時的寫入明細,同步更新最終狀態。

(同步統計不推薦使用)

實時更新傳感器最終狀態表

create table sensor_lastdata(  
  sid int primary key,  
  last_ts timestamp,  
  last_val numeric(10,2)  
);  

壓測1,更新傳感器實時狀態

vi ins.sql  
  
\set sid random(1,1000000)  
insert into sensor_lastdata values (:sid, now(), random()*1000) on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;  

性能,約18萬/s。

/home/digoal/pgsql10/bin/pgbench -M prepared -n -r -P 1 -f ./ins.sql -c 128 -j 128 -T 100  
  
transaction type: ./ins.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 128  
number of threads: 128  
duration: 100 s  
number of transactions actually processed: 18659587  
latency average = 0.686 ms  
latency stddev = 2.566 ms  
tps = 186557.140033 (including connections establishing)  
tps = 186565.458460 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set sid random(1,1000000)  
         0.684  insert into sensor_lastdata values (:sid, now(), random()*1000) on conflict (sid) do update set last_ts=excluded.last_ts,last_val=excluded.last_val;  

三種方案對比小結

性能對比

1. 寫入明細速度

lambda方式:10.6萬/s

2. 更新最終狀態速度

lambda方式:5.98萬/s

實時方式:18.6萬/s

3. 統計速度

lambda方式:4.4萬/s

流計算方式:20.8萬/s

優劣與適用場合對比

1. lambda方式

性能中規中矩,通過UDF + 增量調度,支持所有的統計模式。

目前這個方案有成熟的用戶案例,支持了每天數TB的數據準實時統計。

同時也期待PG社區開發這樣的功能:

delete from table order by pk limit xxx skip locked returning array_agg(ts),array_agg(val) group by sid;

這種QUERY將以最小的開銷,從數據中刪除並返回一批記錄。相比本例,也許能提升一倍性能。

2. 流計算方式

性能最高,但是目前對數組、string_agg等聚合的支持較弱,如果沒有明細聚合的需求,隻有統計類的需求,可以考慮使用。

3. 實時方式

如果隻是用來更新最終狀態,建議使用。

參考

《PostgreSQL upsert功能(insert on conflict do)的用法》

《PostgreSQL 如何實現upsert與新舊數據自動分離》

《[轉載]postgresql 9.5版本之前實現upsert功能》

《流計算風雲再起 - PostgreSQL攜PipelineDB力挺IoT》

《行為、審計日誌 實時索引/實時搜索 - 最佳實踐》

《海量數據 "寫入、共享、存儲、計算" - 最佳實踐》

https://docs.pipelinedb.com/streams.html

最後更新:2017-05-20 01:01:37

  上一篇:go  PostgrSQL 遞歸SQL的幾個應用 - 極客與正常人的思維
  下一篇:go  520:一入侯門“深”似海,深度學習深幾許(入門係列之一)