PostgreSQL 海量時序數據(任意滑動窗口實時統計分析) - 傳感器、人群、物體等對象跟蹤
標簽
PostgreSQL , 物聯網 , feed , 網遊 , 熱力 , 商場駐留 , 人群分析 , 實時熱力圖 , 實時在線圖 , 實時分段最大最小區間圖 , 任意滑動窗口實時最高、最低在線數
背景
在現實生活中,經常有聚集分析的需求。
例如:
某個商場,每個時間點,商場的每個商鋪位置的人群駐留數量。(有技術手段可以感知人的駐留位置,當走進某個區域時,將寫入一條記錄,表示你進入了這個區域,離開時記錄一條離開的記錄,如果長時間不動,則定時寫心跳記錄)。
某個網遊,每個時間點,在線人數。(上線時寫一條上線記錄,下線時寫一條下線記錄。)
某個共享單車公司,每個時間點,在線和不在線的車輛數量。(借車時寫一條上線記錄,還車時寫一條下線記錄。同時每隔一段時間詢問車輛狀態。)
某個物聯網企業,每個分鍾單位內,最小、最大在線傳感器的數量。(傳感器上線時寫一條上線記錄,下線時寫一條下線記錄,同時每隔一段時間詢問傳感器狀態。)
這種屬於非常典型的FEED應用,要求輸出每個時間點這個世界(係統)的在線數。(如果按時間段輸出,則輸出每個時間段內的最大,最小在線數,實際上就是取range的邊界)。
設計
場景:
某個物聯網企業,有一些傳感器,傳感器上線時寫一條上線記錄,下線時寫一條下線記錄,同時每隔小時詢問傳感器狀態,也就是說1小時內沒有記錄的傳感器視為不在線。
企業需要統計每個分鍾單位內,最小、最大在線傳感器的數量。
1、表結構
create table sensor_stat(
sid int, -- 傳感器ID
state boolean, -- 傳感器狀態,true在線,false離線
crt_time timestamp -- 狀態上傳時間
);
2、索引
create index idx_sensor_stat_1 on sensor_stat(sid, crt_time desc);
寫入1.101億測試數據(我們假設這是1小時的數據寫入量,全天寫入26.424億記錄),1001個傳感器ID。
insert into sensor_stat select random()*1000, (random()*1)::int::boolean, clock_timestamp() from generate_series(1,110100000);
3、數據TTL,確保表比較瘦,隻包含心跳時間範圍內的數據。
由於每小時接收心跳,所以1小時內,必有數據,沒有數據的傳感器不計狀態。因此我們保留1小時內的狀態即可。
一種保留方法是pipelinedb,用法如下。
另一種保留方法,使用兩張表,輪詢使用即可。
create table sensor_stat1 (
sid int, -- 傳感器ID
state boolean, -- 傳感器狀態,true在線,false離線
crt_time timestamp -- 狀態上傳時間
);
create table sensor_stat2 (
sid int, -- 傳感器ID
state boolean, -- 傳感器狀態,true在線,false離線
crt_time timestamp -- 狀態上傳時間
);
類似的用法如下
《PostgreSQL 數據rotate用法介紹 - 按時間覆蓋曆史數據》
4、使用遞歸查詢,高效查詢傳感器的最終狀態
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).* from t where t.* is not null;
執行計劃如下
explain (analyze,verbose,timing,costs,buffers) with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat where state is true order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid and t1.state is true order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).* from t where t.* is not null;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CTE Scan on t (cost=70.86..72.88 rows=100 width=13) (actual time=0.037..10.975 rows=1001 loops=1)
Output: (t.sensor_stat).sid, (t.sensor_stat).state, (t.sensor_stat).crt_time
Filter: (t.* IS NOT NULL)
Rows Removed by Filter: 1
Buffers: shared hit=5926
CTE t
-> Recursive Union (cost=0.57..70.86 rows=101 width=37) (actual time=0.030..10.293 rows=1002 loops=1)
Buffers: shared hit=5926
-> Subquery Scan on "*SELECT* 1" (cost=0.57..0.63 rows=1 width=37) (actual time=0.029..0.029 rows=1 loops=1)
Output: "*SELECT* 1".sensor_stat
Buffers: shared hit=5
-> Limit (cost=0.57..0.62 rows=1 width=49) (actual time=0.028..0.028 rows=1 loops=1)
Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time
Buffers: shared hit=5
-> Index Scan using idx_sensor_stat_1 on public.sensor_stat (cost=0.57..3180100.70 rows=55369290 width=49) (actual time=0.027..0.027 rows=1 loops=1)
Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time
Filter: (sensor_stat.state IS TRUE)
Buffers: shared hit=5
-> WorkTable Scan on t t_1 (cost=0.00..6.82 rows=10 width=32) (actual time=0.010..0.010 rows=1 loops=1002)
Output: (SubPlan 1)
Filter: ((t_1.sensor_stat).sid IS NOT NULL)
Rows Removed by Filter: 0
Buffers: shared hit=5921
SubPlan 1
-> Limit (cost=0.57..0.66 rows=1 width=49) (actual time=0.009..0.009 rows=1 loops=1001)
Output: t1.*, t1.sid, t1.crt_time
Buffers: shared hit=5921
-> Index Scan using idx_sensor_stat_1 on public.sensor_stat t1 (cost=0.57..1746916.71 rows=18456430 width=49) (actual time=0.009..0.009 rows=1 loops=1001)
Output: t1.*, t1.sid, t1.crt_time
Index Cond: (t1.sid > (t_1.sensor_stat).sid)
Filter: (t1.state IS TRUE)
Rows Removed by Filter: 1
Buffers: shared hit=5921
Planning time: 0.180 ms
Execution time: 11.083 ms
(35 rows)
樣例
sid | state | crt_time
------+-------+----------------------------
0 | t | 2017-07-05 10:29:09.470687
1 | f | 2017-07-05 10:29:09.465721
2 | t | 2017-07-05 10:29:09.474216
3 | f | 2017-07-05 10:29:09.473176
4 | t | 2017-07-05 10:29:09.473179
5 | t | 2017-07-05 10:29:09.473842
......
996 | t | 2017-07-05 10:29:09.469787
997 | f | 2017-07-05 10:29:09.470983
998 | t | 2017-07-05 10:29:09.47268
999 | t | 2017-07-05 10:29:09.469192
1000 | t | 2017-07-05 10:29:09.472195
(1001 rows)
Time: 11.067 ms
效率很高,1.101億數據,11毫秒獲取最終在線狀態。
在線的設備為state=t的。
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;
count
-------
491
(1 row)
Time: 10.182 ms
5、統計任意時間點的傳感器在線數量,如果每個設備上線的時間精確到秒(crt_time精確到秒),那麼不管有多少條記錄,一天最多需要統計86400個時間點的傳感器在線數量。
例如統計 2017-07-05 10:29:09
時間點的傳感器在線數量,加一個時間限製即可。
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat where crt_time <= '2017-07-05 10:29:09' order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.crt_time <= '2017-07-05 10:29:09' and t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;
count
-------
501
(1 row)
Time: 20.743 ms
新增這個時間限製,會帶來一定的性能影響,特別是如果這個時間是過去很久以前的時間,過濾會越多,性能下降越嚴重。
因此,建議實時,每秒發起一次查詢請求,就不要加這個時間限製了。
6、一次性生成過去每一秒的在線數。
使用窗口查詢的幀查詢技術。(幀表示按時間排序,截止到當前記錄的區間。)
7、統計每分鍾內,最高在線數、最低在線數。
每秒查詢一次,將數據寫入結果表。
create table result (crt_time timestamp(0) default now(), state boolean, cnt int);
create index idx_result_1 on result using brin (crt_time);
insert into result (state,cnt)
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).state, count(*) from t where t.* is not null group by 1;
INSERT 0 2
Time: 12.061 ms
postgres=# select * from result ;
crt_time | state | cnt
---------------------+-------+-----
2017-07-05 11:11:03 | f | 510
2017-07-05 11:11:03 | t | 491
(2 rows)
Time: 0.274 ms
由於每次查詢僅需12毫秒,每秒調用一次沒有問題。
統計某一分鍾內,最高在線數、最低在線數。
select '2017-07-05 11:11:00', min(cnt), max(cnt) from result where crt_time between '2017-07-05 11:11:00' and '2017-07-05 11:12:00';
or
select to_char(crt_time, 'yyyy-mm-dd hh24:mi:00'), min(cnt), max(cnt) from result where crt_time between ? and ? group by 1;
傳感器ID很多很多時,如何優化
當傳感器ID達到10萬級別時,查詢性能會下降到250毫秒。
如果傳感器ID特別多,例如有百萬以上,那麼會下降到2.5秒。就不適合每秒查詢一次了。
因此傳感器數量特別多時,如何優化?
有一個比較好的方法是數據按傳感器ID進行哈希分布,例如每張分區表負責1萬個傳感器ID。在查詢在線數時,並發的查詢所有的分區表,從而降低RT。
小結
使用本文提到的方法(遞歸查詢),我們可以實現非常細粒度的,大量被跟蹤物的狀態實時統計。
用於繪製被跟蹤物的實時狀態圖,例如:
1、實時熱力圖
2、實時傳感器(或用戶)在線、離線數,任意滑動窗口的最大最小在線、離線值。
最後更新:2017-07-05 12:02:32