閱讀565 返回首頁    go 阿裏雲 go 技術社區[雲棲]


PostgreSQL 海量時序數據(任意滑動窗口實時統計分析) - 傳感器、人群、物體等對象跟蹤

標簽

PostgreSQL , 物聯網 , feed , 網遊 , 熱力 , 商場駐留 , 人群分析 , 實時熱力圖 , 實時在線圖 , 實時分段最大最小區間圖 , 任意滑動窗口實時最高、最低在線數


背景

在現實生活中,經常有聚集分析的需求。

例如:

某個商場,每個時間點,商場的每個商鋪位置的人群駐留數量。(有技術手段可以感知人的駐留位置,當走進某個區域時,將寫入一條記錄,表示你進入了這個區域,離開時記錄一條離開的記錄,如果長時間不動,則定時寫心跳記錄)。

某個網遊,每個時間點,在線人數。(上線時寫一條上線記錄,下線時寫一條下線記錄。)

某個共享單車公司,每個時間點,在線和不在線的車輛數量。(借車時寫一條上線記錄,還車時寫一條下線記錄。同時每隔一段時間詢問車輛狀態。)

某個物聯網企業,每個分鍾單位內,最小、最大在線傳感器的數量。(傳感器上線時寫一條上線記錄,下線時寫一條下線記錄,同時每隔一段時間詢問傳感器狀態。)

這種屬於非常典型的FEED應用,要求輸出每個時間點這個世界(係統)的在線數。(如果按時間段輸出,則輸出每個時間段內的最大,最小在線數,實際上就是取range的邊界)。

設計

場景:

某個物聯網企業,有一些傳感器,傳感器上線時寫一條上線記錄,下線時寫一條下線記錄,同時每隔小時詢問傳感器狀態,也就是說1小時內沒有記錄的傳感器視為不在線。

企業需要統計每個分鍾單位內,最小、最大在線傳感器的數量。

1、表結構

create table sensor_stat(  
  sid int,             -- 傳感器ID  
  state boolean,       -- 傳感器狀態,true在線,false離線  
  crt_time timestamp   -- 狀態上傳時間  
);  

2、索引

create index idx_sensor_stat_1 on sensor_stat(sid, crt_time desc);  

寫入1.101億測試數據(我們假設這是1小時的數據寫入量,全天寫入26.424億記錄),1001個傳感器ID。

insert into sensor_stat select random()*1000, (random()*1)::int::boolean, clock_timestamp() from generate_series(1,110100000);  

3、數據TTL,確保表比較瘦,隻包含心跳時間範圍內的數據。

由於每小時接收心跳,所以1小時內,必有數據,沒有數據的傳感器不計狀態。因此我們保留1小時內的狀態即可。

一種保留方法是pipelinedb,用法如下。

《數據保留時間窗口的使用》

另一種保留方法,使用兩張表,輪詢使用即可。

create table sensor_stat1 (  
  sid int,             -- 傳感器ID  
  state boolean,       -- 傳感器狀態,true在線,false離線  
  crt_time timestamp   -- 狀態上傳時間  
);  
  
create table sensor_stat2 (  
  sid int,             -- 傳感器ID  
  state boolean,       -- 傳感器狀態,true在線,false離線  
  crt_time timestamp   -- 狀態上傳時間  
);  

類似的用法如下

《PostgreSQL 數據rotate用法介紹 - 按時間覆蓋曆史數據》

4、使用遞歸查詢,高效查詢傳感器的最終狀態

with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select (t.sensor_stat).* from t where t.* is not null;  

執行計劃如下

explain (analyze,verbose,timing,costs,buffers) with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat where state is true order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid and t1.state is true order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select (t.sensor_stat).* from t where t.* is not null;  
                                                                                      QUERY PLAN                                                                                        
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 CTE Scan on t  (cost=70.86..72.88 rows=100 width=13) (actual time=0.037..10.975 rows=1001 loops=1)  
   Output: (t.sensor_stat).sid, (t.sensor_stat).state, (t.sensor_stat).crt_time  
   Filter: (t.* IS NOT NULL)  
   Rows Removed by Filter: 1  
   Buffers: shared hit=5926  
   CTE t  
     ->  Recursive Union  (cost=0.57..70.86 rows=101 width=37) (actual time=0.030..10.293 rows=1002 loops=1)  
           Buffers: shared hit=5926  
           ->  Subquery Scan on "*SELECT* 1"  (cost=0.57..0.63 rows=1 width=37) (actual time=0.029..0.029 rows=1 loops=1)  
                 Output: "*SELECT* 1".sensor_stat  
                 Buffers: shared hit=5  
                 ->  Limit  (cost=0.57..0.62 rows=1 width=49) (actual time=0.028..0.028 rows=1 loops=1)  
                       Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time  
                       Buffers: shared hit=5  
                       ->  Index Scan using idx_sensor_stat_1 on public.sensor_stat  (cost=0.57..3180100.70 rows=55369290 width=49) (actual time=0.027..0.027 rows=1 loops=1)  
                             Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time  
                             Filter: (sensor_stat.state IS TRUE)  
                             Buffers: shared hit=5  
           ->  WorkTable Scan on t t_1  (cost=0.00..6.82 rows=10 width=32) (actual time=0.010..0.010 rows=1 loops=1002)  
                 Output: (SubPlan 1)  
                 Filter: ((t_1.sensor_stat).sid IS NOT NULL)  
                 Rows Removed by Filter: 0  
                 Buffers: shared hit=5921  
                 SubPlan 1  
                   ->  Limit  (cost=0.57..0.66 rows=1 width=49) (actual time=0.009..0.009 rows=1 loops=1001)  
                         Output: t1.*, t1.sid, t1.crt_time  
                         Buffers: shared hit=5921  
                         ->  Index Scan using idx_sensor_stat_1 on public.sensor_stat t1  (cost=0.57..1746916.71 rows=18456430 width=49) (actual time=0.009..0.009 rows=1 loops=1001)  
                               Output: t1.*, t1.sid, t1.crt_time  
                               Index Cond: (t1.sid > (t_1.sensor_stat).sid)  
                               Filter: (t1.state IS TRUE)  
                               Rows Removed by Filter: 1  
                               Buffers: shared hit=5921  
 Planning time: 0.180 ms  
 Execution time: 11.083 ms  
(35 rows)  

樣例

 sid  | state |          crt_time            
------+-------+----------------------------  
    0 | t     | 2017-07-05 10:29:09.470687  
    1 | f     | 2017-07-05 10:29:09.465721  
    2 | t     | 2017-07-05 10:29:09.474216  
    3 | f     | 2017-07-05 10:29:09.473176  
    4 | t     | 2017-07-05 10:29:09.473179  
    5 | t     | 2017-07-05 10:29:09.473842  
......  
  996 | t     | 2017-07-05 10:29:09.469787  
  997 | f     | 2017-07-05 10:29:09.470983  
  998 | t     | 2017-07-05 10:29:09.47268  
  999 | t     | 2017-07-05 10:29:09.469192  
 1000 | t     | 2017-07-05 10:29:09.472195  
(1001 rows)  
  
Time: 11.067 ms  

效率很高,1.101億數據,11毫秒獲取最終在線狀態。

在線的設備為state=t的。

with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;  
 count   
-------  
   491  
(1 row)  
  
Time: 10.182 ms  

5、統計任意時間點的傳感器在線數量,如果每個設備上線的時間精確到秒(crt_time精確到秒),那麼不管有多少條記錄,一天最多需要統計86400個時間點的傳感器在線數量。

例如統計 2017-07-05 10:29:09 時間點的傳感器在線數量,加一個時間限製即可。

with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat where crt_time <= '2017-07-05 10:29:09' order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.crt_time <= '2017-07-05 10:29:09' and t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;  
  
 count   
-------  
   501  
(1 row)  
  
Time: 20.743 ms  

新增這個時間限製,會帶來一定的性能影響,特別是如果這個時間是過去很久以前的時間,過濾會越多,性能下降越嚴重。

因此,建議實時,每秒發起一次查詢請求,就不要加這個時間限製了。

6、一次性生成過去每一秒的在線數。

使用窗口查詢的幀查詢技術。(幀表示按時間排序,截止到當前記錄的區間。)

7、統計每分鍾內,最高在線數、最低在線數。

每秒查詢一次,將數據寫入結果表。

create table result (crt_time timestamp(0) default now(), state boolean, cnt int);  
create index idx_result_1 on result using brin (crt_time);  
  
insert into result (state,cnt)  
with recursive t as   
(  
  (  
    select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1  
  )  
  union all  
  (  
    select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null  
  )  
)  
select (t.sensor_stat).state, count(*) from t where t.* is not null group by 1;  
  
INSERT 0 2  
Time: 12.061 ms  
  
postgres=# select * from result ;  
      crt_time       | state | cnt   
---------------------+-------+-----  
 2017-07-05 11:11:03 | f     | 510  
 2017-07-05 11:11:03 | t     | 491  
(2 rows)  
  
Time: 0.274 ms  

由於每次查詢僅需12毫秒,每秒調用一次沒有問題。

統計某一分鍾內,最高在線數、最低在線數。

select '2017-07-05 11:11:00', min(cnt), max(cnt) from result where crt_time between '2017-07-05 11:11:00' and '2017-07-05 11:12:00';  
  
or  
  
select to_char(crt_time, 'yyyy-mm-dd hh24:mi:00'), min(cnt), max(cnt) from result where crt_time between ? and ? group by 1;  

傳感器ID很多很多時,如何優化

當傳感器ID達到10萬級別時,查詢性能會下降到250毫秒。

如果傳感器ID特別多,例如有百萬以上,那麼會下降到2.5秒。就不適合每秒查詢一次了。

因此傳感器數量特別多時,如何優化?

有一個比較好的方法是數據按傳感器ID進行哈希分布,例如每張分區表負責1萬個傳感器ID。在查詢在線數時,並發的查詢所有的分區表,從而降低RT。

小結

使用本文提到的方法(遞歸查詢),我們可以實現非常細粒度的,大量被跟蹤物的狀態實時統計。

用於繪製被跟蹤物的實時狀態圖,例如:

1、實時熱力圖

2、實時傳感器(或用戶)在線、離線數,任意滑動窗口的最大最小在線、離線值。

最後更新:2017-07-05 12:02:32

  上一篇:go  Vim技能修煉教程(4) - 基本功
  下一篇:go  PostgreSQL 10 GIN索引 鎖優化