PostgreSQL "物聯網"應用 - 1 實時流式數據處理案例(萬億每天)
物聯網的一個特點是萬物聯網,會產生大量的數據。
例如 :
一盒藥,從生產,到運輸,到藥店,到售賣。每流經一個節點,都會記錄它的信息。
又如 :
健康手環,兒童防丟手表,一些動物遷徙研究的傳感器(如中華鱘),水紋監測,電網監測,煤氣管道監測,氣象監測等等這些信息。
股價的實時預測。
車流實時數據統計,車輛軌跡實時合並。
商場人流實時統計。
數據監控實時處理,例如數據庫的監控,服務器的監控,操作係統的監控等。
等等。。。。。。
傳感器種類繁多,采集的數據量已經達到了海量。
這些數據比電商雙十一的量有過之而不及,怎樣才能處理好這些數據呢?如何做到實時的流式數據處理?
PostgreSQL提供了一個很好的基於流的數據處理產品,實時計算能力達到了單機10W記錄/s(普通X86服務器)。
下麵是應用CASE。
下載並安裝pipelineDB,它是基於PostgreSQL改進的流式數據處理數據庫。
# wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.8.5-centos6-x86_64.rpm
#rpm -ivh pipelinedb-0.8.5-centos6-x86_64.rpm --prefix=/home/digoal/pipelinedb
配置環境變量腳本
$vi env_pipe.sh
export PS1="$USER@`/bin/hostname -s`-> "
export PGPORT=1922
export PGDATA=/disk1/digoal/pipe/pg_root
export LANG=en_US.utf8
export PGHOME=/home/digoal/pipelinedb
export LD_LIBRARY_PATH=/home/digoal/scws/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib:$LD_LIBRARY_PATH
export DATE=`date +"%Y%m%d%H%M"`
export PATH=/home/digoal/scws/bin:$PGHOME/bin:$PATH:.
export MANPATH=$PGHOME/share/man:$MANPATH
export PGHOST=$PGDATA
export PGUSER=postgres
export PGDATABASE=pipeline
alias rm='rm -i'
alias ll='ls -lh'
unalias vi
$ . ./env_pipe.sh
初始化數據庫
$ pipeline-init -D $PGDATA -U postgres -E UTF8 --locale=C -W
配置參數
$ cd $PGDATA
$ vi pipelinedb.conf
listen_addresses = '0.0.0.0' # what IP address(es) to listen on;
port = 1922 # (change requires restart)
max_connections = 200 # (change requires restart)
unix_socket_directories = '.' # comma-separated list of directories
shared_buffers = 8GB # min 128kB
maintenance_work_mem = 640MB # min 1MB
dynamic_shared_memory_type = posix # the default is the first option
synchronous_commit = off # synchronization level;
wal_buffers = 16MB # min 32kB, -1 sets based on shared_buffers
wal_writer_delay = 10ms # 1-10000 milliseconds
checkpoint_segments = 400 # in logfile segments, min 1, 16MB each
log_destination = 'csvlog' # Valid values are combinations of
logging_collector = on # Enable capturing of stderr and csvlog
log_timezone = 'PRC'
datestyle = 'iso, mdy'
timezone = 'PRC'
lc_messages = 'C' # locale for system error message
lc_monetary = 'C' # locale for monetary formatting
lc_numeric = 'C' # locale for number formatting
lc_time = 'C' # locale for time formatting
default_text_search_config = 'pg_catalog.english'
continuous_query_combiner_work_mem = 1GB
continuous_query_batch_size = 100000
continuous_query_num_combiners = 8
continuous_query_num_workers = 4
continuous_queries_enabled = on
啟動數據庫
$ pipeline-ctl start
創建流(從表裏消費數據)
應用場景例子,
.1. 假設傳感器會上傳3個數據,分別是傳感器ID,時間,以及采樣值。
gid, crt_time, val
應用需要實時統計每分鍾,每小時,每天,每個傳感器上傳的值的最大,最小,平均值,以及 count。
創建三個流視圖,每個代表一個統計維度。
如下:
pipeline=# CREATE CONTINUOUS VIEW sv01 AS SELECT gid::int,date_trunc('min',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('min',crt_time);
pipeline=# CREATE CONTINUOUS VIEW sv02 AS SELECT gid::int,date_trunc('hour',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('hour',crt_time);
pipeline=# CREATE CONTINUOUS VIEW sv03 AS SELECT gid::int,date_trunc('day',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('day',crt_time);
激活流
pipeline=# activate;
ACTIVATE
插入數據測試
pipeline=# insert into stream01(gid,val,crt_time) values (1,1,now());
INSERT 0 1
pipeline=# select * from sv01;
gid | date_trunc | max | min | avg | count
-----+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 13:44:00 | 1 | 1 | 1.00000000000000000000 | 1
(1 row)
pipeline=# select * from sv02;
gid | date_trunc | max | min | avg | count
-----+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 13:00:00 | 1 | 1 | 1.00000000000000000000 | 1
(1 row)
pipeline=# select * from sv03;
gid | date_trunc | max | min | avg | count
-----+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 00:00:00 | 1 | 1 | 1.00000000000000000000 | 1
(1 row)
壓力測試:
假設有10萬個傳感器,傳感器上傳的取值範圍1到100。
$ vi test.sql
\setrandom gid 1 100000
\setrandom val 1 100
insert into stream01(gid,val,crt_time) values (:gid,:val,now());
./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100
progress: 5.0 s, 95949.9 tps, lat 0.247 ms stddev 0.575
progress: 10.0 s, 98719.9 tps, lat 0.240 ms stddev 0.549
progress: 15.0 s, 100207.8 tps, lat 0.237 ms stddev 0.573
progress: 20.0 s, 101596.4 tps, lat 0.234 ms stddev 0.517
progress: 25.0 s, 102830.4 tps, lat 0.231 ms stddev 0.492
progress: 30.0 s, 103055.0 tps, lat 0.230 ms stddev 0.488
progress: 35.0 s, 102786.0 tps, lat 0.231 ms stddev 0.482
progress: 40.0 s, 99066.3 tps, lat 0.240 ms stddev 0.578
progress: 45.0 s, 102912.5 tps, lat 0.231 ms stddev 0.494
progress: 50.0 s, 100398.2 tps, lat 0.236 ms stddev 0.530
progress: 55.0 s, 105719.8 tps, lat 0.224 ms stddev 0.425
progress: 60.0 s, 99041.0 tps, lat 0.240 ms stddev 0.617
progress: 65.0 s, 97087.0 tps, lat 0.245 ms stddev 0.619
progress: 70.0 s, 95312.6 tps, lat 0.249 ms stddev 0.653
progress: 75.0 s, 98768.3 tps, lat 0.240 ms stddev 0.593
progress: 80.0 s, 106203.8 tps, lat 0.223 ms stddev 0.435
progress: 85.0 s, 103423.1 tps, lat 0.229 ms stddev 0.480
progress: 90.0 s, 106143.5 tps, lat 0.223 ms stddev 0.429
progress: 95.0 s, 103514.5 tps, lat 0.229 ms stddev 0.478
progress: 100.0 s, 100222.8 tps, lat 0.237 ms stddev 0.547
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 24
number of threads: 24
duration: 100 s
number of transactions actually processed: 10114821
latency average: 0.235 ms
latency stddev: 0.530 ms
tps = 101089.580065 (including connections establishing)
tps = 101101.483296 (excluding connections establishing)
statement latencies in milliseconds:
0.003051 \setrandom gid 1 100000
0.000866 \setrandom val 1 100
0.230430 insert into stream01(gid,val,crt_time) values (:gid,:val,now());
每秒約處理10萬記錄,統計維度見上麵的流SQL。
多輪測試後
pipeline=# select sum(count) from sv03;
sum
----------
53022588
(1 row)
pipeline=# select * from sv01 limit 10;
gid | date_trunc | max | min | avg | count
-------+---------------------+-----+-----+------------------------+-------
1 | 2015-12-15 13:44:00 | 1 | 1 | 1.00000000000000000000 | 1
53693 | 2015-12-15 13:47:00 | 68 | 1 | 28.0000000000000000 | 6
588 | 2015-12-15 13:47:00 | 88 | 11 | 47.6250000000000000 | 8
60154 | 2015-12-15 13:47:00 | 95 | 1 | 40.9090909090909091 | 11
38900 | 2015-12-15 13:47:00 | 90 | 17 | 57.2000000000000000 | 5
12784 | 2015-12-15 13:47:00 | 93 | 13 | 64.1250000000000000 | 8
79782 | 2015-12-15 13:47:00 | 60 | 16 | 43.1666666666666667 | 6
5122 | 2015-12-15 13:47:00 | 100 | 3 | 46.8333333333333333 | 12
97444 | 2015-12-15 13:47:00 | 98 | 9 | 59.5833333333333333 | 12
34209 | 2015-12-15 13:47:00 | 86 | 13 | 52.2857142857142857 | 7
(10 rows)
pipeline=# select * from sv02 limit 10;
gid | date_trunc | max | min | avg | count
-------+---------------------+-----+-----+---------------------+-------
91065 | 2015-12-15 14:00:00 | 100 | 0 | 51.4299065420560748 | 321
24081 | 2015-12-15 14:00:00 | 100 | 0 | 52.1649831649831650 | 297
29013 | 2015-12-15 14:00:00 | 100 | 0 | 50.9967213114754098 | 305
13134 | 2015-12-15 14:00:00 | 100 | 0 | 49.6968750000000000 | 320
84691 | 2015-12-15 14:00:00 | 100 | 0 | 49.5547445255474453 | 274
91059 | 2015-12-15 14:00:00 | 100 | 1 | 47.7536764705882353 | 272
50115 | 2015-12-15 14:00:00 | 100 | 1 | 49.4219269102990033 | 301
92610 | 2015-12-15 14:00:00 | 100 | 0 | 50.1197183098591549 | 284
36616 | 2015-12-15 14:00:00 | 100 | 1 | 48.8750000000000000 | 312
46390 | 2015-12-15 14:00:00 | 99 | 0 | 48.3246268656716418 | 268
(10 rows)
pipeline=# select * from sv03 limit 10;
gid | date_trunc | max | min | avg | count
-------+---------------------+-----+-----+---------------------+-------
68560 | 2015-12-15 00:00:00 | 100 | 0 | 51.2702702702702703 | 555
42241 | 2015-12-15 00:00:00 | 100 | 0 | 49.5266903914590747 | 562
64946 | 2015-12-15 00:00:00 | 100 | 0 | 48.2409177820267686 | 523
2451 | 2015-12-15 00:00:00 | 100 | 0 | 49.8153564899451554 | 547
11956 | 2015-12-15 00:00:00 | 100 | 0 | 51.2382739212007505 | 533
21578 | 2015-12-15 00:00:00 | 100 | 0 | 49.2959558823529412 | 544
36451 | 2015-12-15 00:00:00 | 100 | 0 | 51.1292035398230088 | 565
62380 | 2015-12-15 00:00:00 | 100 | 0 | 48.9099437148217636 | 533
51946 | 2015-12-15 00:00:00 | 100 | 0 | 51.0318091451292247 | 503
35084 | 2015-12-15 00:00:00 | 100 | 0 | 49.3613766730401530 | 523
(10 rows)
.2. 假設車輛運行過程中,每隔一段時間會上傳位置信息,
gid, crt_time, poi
應用需要按天,繪製車輛的路徑信息(把多個point聚合成路徑類型,或者數組類型,或者字符串,。。。)。
假設有1000萬量車,每輛車每次上傳一個坐標和時間信息,(或者是一批信息)。
應用需求,
.2.1. 按天繪製車輛的路徑信息
.2.2. 按小時統計每個區域有多少量車經過
創建流 (這裏假設點信息已經經過了二進製編碼,用一個INT8來表示,方便壓力測試)
CREATE CONTINUOUS VIEW sv04 AS SELECT gid::int,date_trunc('day',crt_time::timestamp),array_agg(poi::int8||' -> '||crt_time) FROM stream02 group by gid,date_trunc('day',crt_time);
壓力測試
$ vi test.sql
\setrandom gid 1 10000000
\setrandom poi 1 1000000000
insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());
./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100
progress: 5.0 s, 106005.0 tps, lat 0.223 ms stddev 0.370
progress: 10.0 s, 109884.8 tps, lat 0.216 ms stddev 0.347
progress: 15.0 s, 111122.1 tps, lat 0.213 ms stddev 0.368
progress: 20.0 s, 111987.0 tps, lat 0.212 ms stddev 0.353
progress: 25.0 s, 111835.4 tps, lat 0.212 ms stddev 0.363
progress: 30.0 s, 111759.7 tps, lat 0.212 ms stddev 0.366
progress: 35.0 s, 112110.4 tps, lat 0.211 ms stddev 0.358
progress: 40.0 s, 112185.4 tps, lat 0.211 ms stddev 0.352
progress: 45.0 s, 113080.0 tps, lat 0.210 ms stddev 0.345
progress: 50.0 s, 113205.4 tps, lat 0.209 ms stddev 0.353
progress: 55.0 s, 113415.1 tps, lat 0.209 ms stddev 0.352
progress: 60.0 s, 113519.8 tps, lat 0.209 ms stddev 0.342
progress: 65.0 s, 112683.6 tps, lat 0.210 ms stddev 0.358
progress: 70.0 s, 112748.3 tps, lat 0.210 ms stddev 0.360
progress: 75.0 s, 112158.9 tps, lat 0.211 ms stddev 0.373
progress: 80.0 s, 112580.8 tps, lat 0.210 ms stddev 0.355
progress: 85.0 s, 111895.5 tps, lat 0.212 ms stddev 0.370
progress: 90.0 s, 112229.2 tps, lat 0.211 ms stddev 0.442
progress: 95.0 s, 104915.8 tps, lat 0.226 ms stddev 2.852
progress: 100.0 s, 103079.9 tps, lat 0.230 ms stddev 2.054
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 24
number of threads: 24
duration: 100 s
number of transactions actually processed: 11112035
latency average: 0.213 ms
latency stddev: 0.836 ms
tps = 111106.652772 (including connections establishing)
tps = 111118.651135 (excluding connections establishing)
statement latencies in milliseconds:
0.002939 \setrandom gid 1 10000000
0.000887 \setrandom poi 1 1000000000
0.209177 insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());
pipeline=# select * from sv04 limit 3;
448955 | 2015-12-15 00:00:00 | {"306029686 -> 2015-12-15 14:53:01.273121","885962518 -> 2015-12-15 14:53:03.352406"}
7271368 | 2015-12-15 00:00:00 | {"615447469 -> 2015-12-15 14:53:01.2616","944473391 -> 2015-12-15 14:53:04.543387"}
8613957 | 2015-12-15 00:00:00 | {"473349491 -> 2015-12-15 14:53:01.288332","125413709 -> 2015-12-15 14:53:08.742894"}
.3. 按交警探頭為單位,統計每個探頭采集的車輛信息。
例如
.3.1 以車輛為單位,統計車輛的探頭位置信息,串成軌跡數據。
.3.2 以探頭為單位,統計每個路口的車流信息。(假設一個探頭對應一個路口)
第一個需求和前麵的繪製車輛軌跡例子一樣,統計路口流量信息則是以探頭ID為單位進行統計。
用法都差不多,不再舉例
.4. 實時股價預測。
可以結合madlib或者plr進行多元回歸,選擇最好的R2,根據對應的截距和斜率推測下一組股價。
需要用到UDF,具體的用法參考我以前寫的文章。
這裏不再舉例。
.5. 商場WIFI傳感器的信息實時統計。
根據WIFI提供的位置信息,實時統計每個店鋪的人流量。店鋪的人均駐留時間,總計駐留時間。
.6. 假設你的數據處理場景,PG現有的函數無法處理怎麼辦?沒問題,PG提供了自定義UDF,數據類型,操作符,索引方法等一係列API。你可以根據業務的需求,在此基礎上實現。
用法還有很多,無法窮舉。
下麵再結合一個當下非常流行的消息隊列,pipelineDB可以實時的從消息隊列取數據並進行實時計算。
例子:
在本地起一個nginx服務端,並且使用siege模擬HTTP請求,nginx將記錄這些行為,存儲為JSON格式到文件中。
在本地起kafka服務端,使用kafkacat將nginx的訪問日誌不斷的push到kafka。
在pipelinedb中訂閱kafka的消息,並實時處理為想要的統計信息,(WEB頁麵的訪問人數,延遲,等信息)
安裝kafka
https://kafka.apache.org/07/quickstart.html
# wget https://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
# tar -zxvf kafka_2.10-0.8.2.2.tgz
# git clone https://github.com/edenhill/librdkafka.git
# cd librdkafka
./configure
make
make install
# git clone https://github.com/lloyd/yajl.git
# cd yajl
./configure
make
make install
# vi /etc/ld.so.conf
/usr/local/lib
# ldconfig
# git clone https://github.com/edenhill/kafkacat.git
# cd kafkacat
./configure
make
make install
安裝siege和nginx
# yum install -y siege nginx
創建一個nginx配置文件,記錄訪問日誌到/tmp/access.log,格式為json
cd /tmp
cat <<EOF > nginx.conf
worker_processes 4;
pid $PWD/nginx.pid;
events {}
http {
log_format json
'{'
'"ts": "\$time_iso8601", '
'"user_agent": "\$http_user_agent", '
'"url": "\$request_uri", '
'"latency": "\$request_time", '
'"user": "\$arg_user"'
'}';
access_log $PWD/access.log json;
error_log $PWD/error.log;
server {
location ~ ^/ {
return 200;
}
}
}
EOF
啟動nginx
nginx -c $PWD/nginx.conf -p $PWD/
配置主機名
# hostname
digoal.org
# vi /etc/hosts
127.0.0.1 digoal.org
啟動kafka
cd /opt/soft_bak/kafka_2.10-0.8.2.2
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
產生一個隨機URL文件
for x in {0..1000000}; do echo "https://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done
使用siege模擬訪問這些URL,nginx會產生訪問日誌到/tmp/access.log
siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1
/tmp/access.log舉例,格式為JSON
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002", "user": "18583"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003", "user": "24827"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003", "user": "3988"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003", "user": "18433"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001", "user": "10801"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001", "user": "4915"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001", "user": "5367"}
將訪問日誌輸出到kafkacat,推送到kafka消息係統,對應的topic為logs_topic。
( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &
原始的消費方式如下:
# cd /opt/soft_bak/kafka_2.10-0.8.2.2
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning
# Ctrl+C
接下來我們使用pipelinedb來實時消費這些消息,並轉化為需要的統計結果。
CREATE EXTENSION pipeline_kafka;
SELECT kafka_add_broker('localhost:9092'); -- 添加一個kafka broker(kafka集群的一個節點)
CREATE STREAM logs_stream (payload json); -- 創建一個流映射到,kafka消息係統。
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream; -- 創建一個流視圖,實時消費,處理kafka消息。
SELECT kafka_consume_begin('logs_topic', 'logs_stream'); -- 開始消費指定的topic,logs_topic,
kafka_consume_begin
------------------
success
(1 row)
查詢流視圖,可以獲得當前NGINX的訪問統計。
SELECT * FROM message_count;
count
--------
24
(1 row)
SELECT * FROM message_count;
count
--------
36
success
(1 row)
接下來做一個更深入的實時分析,分析每個URL的訪問次數,用戶數,99%用戶的訪問延遲低於多少。
/*
* This function will strip away any query parameters from each url,
* as we're not interested in them.
*/
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')
RETURNS text
AS 'textregexreplace_noopt' -- textregexreplace_noopt@src/backend/utils/adt/regexp.c
LANGUAGE internal;
CREATE CONTINUOUS VIEW url_stats AS
SELECT
url, -- url地址
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99, -- 99%的URL訪問延遲小於多少
count(DISTINCT user) AS uniques, -- 唯一用戶數
count(*) total_visits -- 總共訪問次數
FROM
(SELECT
url(payload->>'url'), -- 地址
payload->>'user' AS user, -- 用戶ID
(payload->>'latency')::float * 1000 AS latency_ms, -- 訪問延遲
arrival_timestamp
FROM logs_stream) AS unpacked
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'
GROUP BY url;
CREATE CONTINUOUS VIEW user_stats AS
SELECT
day(arrival_timestamp),
payload->>'user' AS user,
sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,
sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,
count(DISTINCT url(payload->>'url')) AS unique_urls,
count(*) AS total_visits
FROM logs_stream GROUP BY payload->>'user', day;
-- What are the top-10 most visited urls?
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;
url | total_visits
---------------+--------------
/page62/path4 | 10182
/page51/path4 | 10181
/page24/path5 | 10180
/page93/path3 | 10180
/page81/path0 | 10180
/page2/path5 | 10180
/page75/path2 | 10179
/page28/path3 | 10179
/page40/path2 | 10178
/page74/path0 | 10176
(10 rows)
-- What is the 99th percentile latency across all urls?
SELECT combine(p99) FROM url_stats;
combine
------------------
6.95410494731137
(1 row)
-- What is the average conversion rate each day for the last month?
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;
day | avg
------------------------+----------------------------
2015-09-15 00:00:00-07 | 1.7455000000000000000000000
(1 row)
-- How many unique urls were visited each day for the last week?
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;
day | combine
------------------------+---------
2015-09-15 00:00:00-07 | 100000
(1 row)
-- Is there a relationship between the number of unique urls visited and the highest conversion rates?
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats
GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;
unique_urls | conversion_rate
-------------+-------------------
41 | 2.67121005785842
36 | 2.02713894173361
34 | 2.02034637010851
31 | 2.01958418072859
27 | 2.00045348712296
24 | 1.99714899522942
19 | 1.99438839453606
16 | 1.98083502184886
15 | 1.87983011139079
14 | 1.84906254929873
(1 row)
使用PipelineDB + kafka,應用場景又更豐富了。
如何構建更大的實時消息處理集群?
規劃好數據的分片規則(避免跨節點的統計),如果有跨節點訪問需求,可以在每個節點使用維度表,來實現。
例如每天要處理 萬億 條消息,怎麼辦?
根據以上壓力測試,平均每台機器每秒處理10萬記錄(每天處理86億),計算需要用到116台PostgreSQL。是不是很省心呢?
一個圖例:
每一層都可以擴展
從lvs到 haproxy到 kafka到 PostgreSQL到 離線分析HAWQ。
最後更新:2017-04-01 13:38:49