閱讀146 返回首頁    go 阿裏雲 go 技術社區[雲棲]


PostgreSQL "物聯網"應用 - 1 實時流式數據處理案例(萬億每天)

物聯網的一個特點是萬物聯網,會產生大量的數據。
例如 :
一盒藥,從生產,到運輸,到藥店,到售賣。每流經一個節點,都會記錄它的信息。
又如 :
健康手環,兒童防丟手表,一些動物遷徙研究的傳感器(如中華鱘),水紋監測,電網監測,煤氣管道監測,氣象監測等等這些信息。
股價的實時預測。
車流實時數據統計,車輛軌跡實時合並。
商場人流實時統計。
數據監控實時處理,例如數據庫的監控,服務器的監控,操作係統的監控等。
等等。。。。。。
傳感器種類繁多,采集的數據量已經達到了海量。
這些數據比電商雙十一的量有過之而不及,怎樣才能處理好這些數據呢?如何做到實時的流式數據處理?

PostgreSQL提供了一個很好的基於流的數據處理產品,實時計算能力達到了單機10W記錄/s(普通X86服務器)。

下麵是應用CASE。

下載並安裝pipelineDB,它是基於PostgreSQL改進的流式數據處理數據庫。

# wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.8.5-centos6-x86_64.rpm  
#rpm -ivh pipelinedb-0.8.5-centos6-x86_64.rpm  --prefix=/home/digoal/pipelinedb  

配置環境變量腳本

$vi env_pipe.sh   

export PS1="$USER@`/bin/hostname -s`-> "  
export PGPORT=1922  
export PGDATA=/disk1/digoal/pipe/pg_root  
export LANG=en_US.utf8  
export PGHOME=/home/digoal/pipelinedb  
export LD_LIBRARY_PATH=/home/digoal/scws/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib:$LD_LIBRARY_PATH  
export DATE=`date +"%Y%m%d%H%M"`  
export PATH=/home/digoal/scws/bin:$PGHOME/bin:$PATH:.  
export MANPATH=$PGHOME/share/man:$MANPATH  
export PGHOST=$PGDATA  
export PGUSER=postgres  
export PGDATABASE=pipeline  
alias rm='rm -i'  
alias ll='ls -lh'  
unalias vi  

$ . ./env_pipe.sh  

初始化數據庫

$ pipeline-init -D $PGDATA -U postgres -E UTF8 --locale=C -W  

配置參數

$ cd $PGDATA  
$ vi pipelinedb.conf  
listen_addresses = '0.0.0.0'            # what IP address(es) to listen on;  
port = 1922                            # (change requires restart)  
max_connections = 200                   # (change requires restart)  
unix_socket_directories = '.'   # comma-separated list of directories  
shared_buffers = 8GB                    # min 128kB  
maintenance_work_mem = 640MB            # min 1MB  
dynamic_shared_memory_type = posix      # the default is the first option  
synchronous_commit = off                # synchronization level;  
wal_buffers = 16MB                      # min 32kB, -1 sets based on shared_buffers  
wal_writer_delay = 10ms         # 1-10000 milliseconds  
checkpoint_segments = 400               # in logfile segments, min 1, 16MB each  
log_destination = 'csvlog'              # Valid values are combinations of  
logging_collector = on          # Enable capturing of stderr and csvlog  
log_timezone = 'PRC'  
datestyle = 'iso, mdy'  
timezone = 'PRC'  
lc_messages = 'C'                       # locale for system error message  
lc_monetary = 'C'                       # locale for monetary formatting  
lc_numeric = 'C'                        # locale for number formatting  
lc_time = 'C'                           # locale for time formatting  
default_text_search_config = 'pg_catalog.english'  
continuous_query_combiner_work_mem = 1GB  
continuous_query_batch_size = 100000  
continuous_query_num_combiners = 8  
continuous_query_num_workers = 4  
continuous_queries_enabled = on  

啟動數據庫

$ pipeline-ctl start  

創建流(從表裏消費數據)
應用場景例子,

.1. 假設傳感器會上傳3個數據,分別是傳感器ID,時間,以及采樣值。
gid, crt_time, val
應用需要實時統計每分鍾,每小時,每天,每個傳感器上傳的值的最大,最小,平均值,以及 count。
創建三個流視圖,每個代表一個統計維度。
如下:

pipeline=# CREATE CONTINUOUS VIEW sv01  AS SELECT gid::int,date_trunc('min',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('min',crt_time);   

pipeline=# CREATE CONTINUOUS VIEW sv02  AS SELECT gid::int,date_trunc('hour',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('hour',crt_time);   

pipeline=# CREATE CONTINUOUS VIEW sv03  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),max(val::int),min(val),avg(val),count(val) FROM stream01 group by gid,date_trunc('day',crt_time);   

激活流

pipeline=# activate;  
ACTIVATE  

插入數據測試

pipeline=# insert into stream01(gid,val,crt_time) values (1,1,now());  
INSERT 0 1  
pipeline=# select * from sv01;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

pipeline=# select * from sv02;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 13:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

pipeline=# select * from sv03;  
 gid |     date_trunc      | max | min |          avg           | count   
-----+---------------------+-----+-----+------------------------+-------  
   1 | 2015-12-15 00:00:00 |   1 |   1 | 1.00000000000000000000 |     1  
(1 row)  

壓力測試:
假設有10萬個傳感器,傳感器上傳的取值範圍1到100。

$ vi test.sql  
\setrandom gid 1 100000  
\setrandom val 1 100  
insert into stream01(gid,val,crt_time) values (:gid,:val,now());  

./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 95949.9 tps, lat 0.247 ms stddev 0.575  
progress: 10.0 s, 98719.9 tps, lat 0.240 ms stddev 0.549  
progress: 15.0 s, 100207.8 tps, lat 0.237 ms stddev 0.573  
progress: 20.0 s, 101596.4 tps, lat 0.234 ms stddev 0.517  
progress: 25.0 s, 102830.4 tps, lat 0.231 ms stddev 0.492  
progress: 30.0 s, 103055.0 tps, lat 0.230 ms stddev 0.488  
progress: 35.0 s, 102786.0 tps, lat 0.231 ms stddev 0.482  
progress: 40.0 s, 99066.3 tps, lat 0.240 ms stddev 0.578  
progress: 45.0 s, 102912.5 tps, lat 0.231 ms stddev 0.494  
progress: 50.0 s, 100398.2 tps, lat 0.236 ms stddev 0.530  
progress: 55.0 s, 105719.8 tps, lat 0.224 ms stddev 0.425  
progress: 60.0 s, 99041.0 tps, lat 0.240 ms stddev 0.617  
progress: 65.0 s, 97087.0 tps, lat 0.245 ms stddev 0.619  
progress: 70.0 s, 95312.6 tps, lat 0.249 ms stddev 0.653  
progress: 75.0 s, 98768.3 tps, lat 0.240 ms stddev 0.593  
progress: 80.0 s, 106203.8 tps, lat 0.223 ms stddev 0.435  
progress: 85.0 s, 103423.1 tps, lat 0.229 ms stddev 0.480  
progress: 90.0 s, 106143.5 tps, lat 0.223 ms stddev 0.429  
progress: 95.0 s, 103514.5 tps, lat 0.229 ms stddev 0.478  
progress: 100.0 s, 100222.8 tps, lat 0.237 ms stddev 0.547  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 10114821  
latency average: 0.235 ms  
latency stddev: 0.530 ms  
tps = 101089.580065 (including connections establishing)  
tps = 101101.483296 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003051        \setrandom gid 1 100000  
        0.000866        \setrandom val 1 100  
        0.230430        insert into stream01(gid,val,crt_time) values (:gid,:val,now());  

每秒約處理10萬記錄,統計維度見上麵的流SQL。

多輪測試後

pipeline=# select sum(count) from sv03;  
   sum      
----------  
 53022588  
(1 row)  

pipeline=# select * from sv01 limit 10;  
  gid  |     date_trunc      | max | min |          avg           | count   
-------+---------------------+-----+-----+------------------------+-------  
     1 | 2015-12-15 13:44:00 |   1 |   1 | 1.00000000000000000000 |     1  
 53693 | 2015-12-15 13:47:00 |  68 |   1 |    28.0000000000000000 |     6  
   588 | 2015-12-15 13:47:00 |  88 |  11 |    47.6250000000000000 |     8  
 60154 | 2015-12-15 13:47:00 |  95 |   1 |    40.9090909090909091 |    11  
 38900 | 2015-12-15 13:47:00 |  90 |  17 |    57.2000000000000000 |     5  
 12784 | 2015-12-15 13:47:00 |  93 |  13 |    64.1250000000000000 |     8  
 79782 | 2015-12-15 13:47:00 |  60 |  16 |    43.1666666666666667 |     6  
  5122 | 2015-12-15 13:47:00 | 100 |   3 |    46.8333333333333333 |    12  
 97444 | 2015-12-15 13:47:00 |  98 |   9 |    59.5833333333333333 |    12  
 34209 | 2015-12-15 13:47:00 |  86 |  13 |    52.2857142857142857 |     7  
(10 rows)  

pipeline=# select * from sv02 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 91065 | 2015-12-15 14:00:00 | 100 |   0 | 51.4299065420560748 |   321  
 24081 | 2015-12-15 14:00:00 | 100 |   0 | 52.1649831649831650 |   297  
 29013 | 2015-12-15 14:00:00 | 100 |   0 | 50.9967213114754098 |   305  
 13134 | 2015-12-15 14:00:00 | 100 |   0 | 49.6968750000000000 |   320  
 84691 | 2015-12-15 14:00:00 | 100 |   0 | 49.5547445255474453 |   274  
 91059 | 2015-12-15 14:00:00 | 100 |   1 | 47.7536764705882353 |   272  
 50115 | 2015-12-15 14:00:00 | 100 |   1 | 49.4219269102990033 |   301  
 92610 | 2015-12-15 14:00:00 | 100 |   0 | 50.1197183098591549 |   284  
 36616 | 2015-12-15 14:00:00 | 100 |   1 | 48.8750000000000000 |   312  
 46390 | 2015-12-15 14:00:00 |  99 |   0 | 48.3246268656716418 |   268  
(10 rows)  

pipeline=# select * from sv03 limit 10;  
  gid  |     date_trunc      | max | min |         avg         | count   
-------+---------------------+-----+-----+---------------------+-------  
 68560 | 2015-12-15 00:00:00 | 100 |   0 | 51.2702702702702703 |   555  
 42241 | 2015-12-15 00:00:00 | 100 |   0 | 49.5266903914590747 |   562  
 64946 | 2015-12-15 00:00:00 | 100 |   0 | 48.2409177820267686 |   523  
  2451 | 2015-12-15 00:00:00 | 100 |   0 | 49.8153564899451554 |   547  
 11956 | 2015-12-15 00:00:00 | 100 |   0 | 51.2382739212007505 |   533  
 21578 | 2015-12-15 00:00:00 | 100 |   0 | 49.2959558823529412 |   544  
 36451 | 2015-12-15 00:00:00 | 100 |   0 | 51.1292035398230088 |   565  
 62380 | 2015-12-15 00:00:00 | 100 |   0 | 48.9099437148217636 |   533  
 51946 | 2015-12-15 00:00:00 | 100 |   0 | 51.0318091451292247 |   503  
 35084 | 2015-12-15 00:00:00 | 100 |   0 | 49.3613766730401530 |   523  
(10 rows)  

.2. 假設車輛運行過程中,每隔一段時間會上傳位置信息,
gid, crt_time, poi
應用需要按天,繪製車輛的路徑信息(把多個point聚合成路徑類型,或者數組類型,或者字符串,。。。)。

假設有1000萬量車,每輛車每次上傳一個坐標和時間信息,(或者是一批信息)。
應用需求,
.2.1. 按天繪製車輛的路徑信息
.2.2. 按小時統計每個區域有多少量車經過

創建流 (這裏假設點信息已經經過了二進製編碼,用一個INT8來表示,方便壓力測試)

CREATE CONTINUOUS VIEW sv04  AS SELECT gid::int,date_trunc('day',crt_time::timestamp),array_agg(poi::int8||' -> '||crt_time) FROM stream02 group by gid,date_trunc('day',crt_time);  

壓力測試

$ vi test.sql  
\setrandom gid 1 10000000  
\setrandom poi 1 1000000000  
insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  

./pgsql9.5/bin/pgbench -M prepared -n -r -f ./test.sql -P 5 -c 24 -j 24 -T 100  
progress: 5.0 s, 106005.0 tps, lat 0.223 ms stddev 0.370  
progress: 10.0 s, 109884.8 tps, lat 0.216 ms stddev 0.347  
progress: 15.0 s, 111122.1 tps, lat 0.213 ms stddev 0.368  
progress: 20.0 s, 111987.0 tps, lat 0.212 ms stddev 0.353  
progress: 25.0 s, 111835.4 tps, lat 0.212 ms stddev 0.363  
progress: 30.0 s, 111759.7 tps, lat 0.212 ms stddev 0.366  
progress: 35.0 s, 112110.4 tps, lat 0.211 ms stddev 0.358  
progress: 40.0 s, 112185.4 tps, lat 0.211 ms stddev 0.352  
progress: 45.0 s, 113080.0 tps, lat 0.210 ms stddev 0.345  
progress: 50.0 s, 113205.4 tps, lat 0.209 ms stddev 0.353  
progress: 55.0 s, 113415.1 tps, lat 0.209 ms stddev 0.352  
progress: 60.0 s, 113519.8 tps, lat 0.209 ms stddev 0.342  
progress: 65.0 s, 112683.6 tps, lat 0.210 ms stddev 0.358  
progress: 70.0 s, 112748.3 tps, lat 0.210 ms stddev 0.360  
progress: 75.0 s, 112158.9 tps, lat 0.211 ms stddev 0.373  
progress: 80.0 s, 112580.8 tps, lat 0.210 ms stddev 0.355  
progress: 85.0 s, 111895.5 tps, lat 0.212 ms stddev 0.370  
progress: 90.0 s, 112229.2 tps, lat 0.211 ms stddev 0.442  
progress: 95.0 s, 104915.8 tps, lat 0.226 ms stddev 2.852  
progress: 100.0 s, 103079.9 tps, lat 0.230 ms stddev 2.054  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 24  
number of threads: 24  
duration: 100 s  
number of transactions actually processed: 11112035  
latency average: 0.213 ms  
latency stddev: 0.836 ms  
tps = 111106.652772 (including connections establishing)  
tps = 111118.651135 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.002939        \setrandom gid 1 10000000  
        0.000887        \setrandom poi 1 1000000000  
        0.209177        insert into stream02(gid,poi,crt_time) values (:gid,:poi,now());  

pipeline=# select * from sv04 limit 3;  
  448955 | 2015-12-15 00:00:00 | {"306029686 -> 2015-12-15 14:53:01.273121","885962518 -> 2015-12-15 14:53:03.352406"}  
 7271368 | 2015-12-15 00:00:00 | {"615447469 -> 2015-12-15 14:53:01.2616","944473391 -> 2015-12-15 14:53:04.543387"}  
 8613957 | 2015-12-15 00:00:00 | {"473349491 -> 2015-12-15 14:53:01.288332","125413709 -> 2015-12-15 14:53:08.742894"}  

.3. 按交警探頭為單位,統計每個探頭采集的車輛信息。
例如
.3.1 以車輛為單位,統計車輛的探頭位置信息,串成軌跡數據。
.3.2 以探頭為單位,統計每個路口的車流信息。(假設一個探頭對應一個路口)

第一個需求和前麵的繪製車輛軌跡例子一樣,統計路口流量信息則是以探頭ID為單位進行統計。
用法都差不多,不再舉例

.4. 實時股價預測。
可以結合madlib或者plr進行多元回歸,選擇最好的R2,根據對應的截距和斜率推測下一組股價。
需要用到UDF,具體的用法參考我以前寫的文章。
這裏不再舉例。

.5. 商場WIFI傳感器的信息實時統計。
根據WIFI提供的位置信息,實時統計每個店鋪的人流量。店鋪的人均駐留時間,總計駐留時間。

.6. 假設你的數據處理場景,PG現有的函數無法處理怎麼辦?沒問題,PG提供了自定義UDF,數據類型,操作符,索引方法等一係列API。你可以根據業務的需求,在此基礎上實現。

用法還有很多,無法窮舉。

下麵再結合一個當下非常流行的消息隊列,pipelineDB可以實時的從消息隊列取數據並進行實時計算。
例子:
在本地起一個nginx服務端,並且使用siege模擬HTTP請求,nginx將記錄這些行為,存儲為JSON格式到文件中。
在本地起kafka服務端,使用kafkacat將nginx的訪問日誌不斷的push到kafka。
在pipelinedb中訂閱kafka的消息,並實時處理為想要的統計信息,(WEB頁麵的訪問人數,延遲,等信息)

安裝kafka

https://kafka.apache.org/07/quickstart.html  

# wget https://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz  
# tar -zxvf kafka_2.10-0.8.2.2.tgz  

# git clone https://github.com/edenhill/librdkafka.git  
# cd librdkafka  
./configure  
make  
make install  

# git clone https://github.com/lloyd/yajl.git  
# cd yajl  
./configure  
make  
make install  

# vi /etc/ld.so.conf  
/usr/local/lib  
# ldconfig  

# git clone https://github.com/edenhill/kafkacat.git  
# cd kafkacat  
./configure  
make  
make install  

安裝siege和nginx

# yum install -y siege nginx  

創建一個nginx配置文件,記錄訪問日誌到/tmp/access.log,格式為json

cd /tmp  

cat <<EOF > nginx.conf  
worker_processes 4;  
pid $PWD/nginx.pid;  
events {}  
http {  

    log_format json   
    '{'  
        '"ts": "\$time_iso8601", '  
        '"user_agent": "\$http_user_agent", '  
        '"url": "\$request_uri", '  
        '"latency": "\$request_time",  '  
        '"user": "\$arg_user"'  
    '}';  

    access_log $PWD/access.log json;  
    error_log $PWD/error.log;  

    server {  
        location ~ ^/ {  
            return 200;  
        }  
    }  
}  
EOF  

啟動nginx

nginx -c $PWD/nginx.conf -p $PWD/  

配置主機名

# hostname  
digoal.org  
# vi /etc/hosts  
127.0.0.1 digoal.org  

啟動kafka

cd /opt/soft_bak/kafka_2.10-0.8.2.2  
bin/zookeeper-server-start.sh config/zookeeper.properties &  
bin/kafka-server-start.sh config/server.properties &  

產生一個隨機URL文件

for x in {0..1000000}; do echo "https://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done  

使用siege模擬訪問這些URL,nginx會產生訪問日誌到/tmp/access.log

siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1  

/tmp/access.log舉例,格式為JSON  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002",  "user": "18583"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003",  "user": "24827"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003",  "user": "3988"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003",  "user": "18433"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001",  "user": "10801"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001",  "user": "4915"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001",  "user": "5367"}  

將訪問日誌輸出到kafkacat,推送到kafka消息係統,對應的topic為logs_topic。

( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &  

原始的消費方式如下:

# cd /opt/soft_bak/kafka_2.10-0.8.2.2  
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning  
# Ctrl+C  

接下來我們使用pipelinedb來實時消費這些消息,並轉化為需要的統計結果。

CREATE EXTENSION pipeline_kafka;  
SELECT kafka_add_broker('localhost:9092');  -- 添加一個kafka broker(kafka集群的一個節點)  
CREATE STREAM logs_stream (payload json);  -- 創建一個流映射到,kafka消息係統。  
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;   -- 創建一個流視圖,實時消費,處理kafka消息。  
SELECT kafka_consume_begin('logs_topic', 'logs_stream');  -- 開始消費指定的topic,logs_topic,  
 kafka_consume_begin   
------------------  
 success  
(1 row)  

查詢流視圖,可以獲得當前NGINX的訪問統計。

SELECT * FROM message_count;  
 count   
--------  
  24  
(1 row)  

SELECT * FROM message_count;  
 count  
--------  
  36  
 success  
(1 row)  

接下來做一個更深入的實時分析,分析每個URL的訪問次數,用戶數,99%用戶的訪問延遲低於多少。

/*   
 * This function will strip away any query parameters from each url,  
 * as we're not interested in them.  
 */  
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')  
    RETURNS text  
AS 'textregexreplace_noopt'    -- textregexreplace_noopt@src/backend/utils/adt/regexp.c  
LANGUAGE internal;  

CREATE CONTINUOUS VIEW url_stats AS  
    SELECT  
        url, -- url地址  
    percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99,  -- 99%的URL訪問延遲小於多少  
        count(DISTINCT user) AS uniques,  -- 唯一用戶數  
    count(*) total_visits  -- 總共訪問次數  
  FROM  
    (SELECT   
        url(payload->>'url'),  -- 地址  
        payload->>'user' AS user,  -- 用戶ID  
        (payload->>'latency')::float * 1000 AS latency_ms,  -- 訪問延遲  
        arrival_timestamp  
    FROM logs_stream) AS unpacked  
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'  
 GROUP BY url;  

CREATE CONTINUOUS VIEW user_stats AS  
    SELECT  
        day(arrival_timestamp),  
        payload->>'user' AS user,  
        sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,  
        sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,  
        count(DISTINCT url(payload->>'url')) AS unique_urls,  
        count(*) AS total_visits  
    FROM logs_stream GROUP BY payload->>'user', day;  

-- What are the top-10 most visited urls?  
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;  
      url      | total_visits   
---------------+--------------  
 /page62/path4 |        10182  
 /page51/path4 |        10181  
 /page24/path5 |        10180  
 /page93/path3 |        10180  
 /page81/path0 |        10180  
 /page2/path5  |        10180  
 /page75/path2 |        10179  
 /page28/path3 |        10179  
 /page40/path2 |        10178  
 /page74/path0 |        10176  
(10 rows)  


-- What is the 99th percentile latency across all urls?  
SELECT combine(p99) FROM url_stats;  
     combine        
------------------  
 6.95410494731137  
(1 row)  

-- What is the average conversion rate each day for the last month?  
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;  
          day           |            avg               
------------------------+----------------------------  
 2015-09-15 00:00:00-07 | 1.7455000000000000000000000  
(1 row)  

-- How many unique urls were visited each day for the last week?  
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;  
          day           | combine   
------------------------+---------  
 2015-09-15 00:00:00-07 |  100000  
(1 row)  

-- Is there a relationship between the number of unique urls visited and the highest conversion rates?  
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats  
    GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;  
 unique_urls |  conversion_rate    
-------------+-------------------  
          41 |  2.67121005785842  
          36 |  2.02713894173361  
          34 |  2.02034637010851  
          31 |  2.01958418072859  
          27 |  2.00045348712296  
          24 |  1.99714899522942  
          19 |  1.99438839453606  
          16 |  1.98083502184886  
          15 |  1.87983011139079  
          14 |  1.84906254929873  
(1 row)  

使用PipelineDB + kafka,應用場景又更豐富了。

如何構建更大的實時消息處理集群?
規劃好數據的分片規則(避免跨節點的統計),如果有跨節點訪問需求,可以在每個節點使用維度表,來實現。
例如每天要處理 萬億 條消息,怎麼辦?
根據以上壓力測試,平均每台機器每秒處理10萬記錄(每天處理86億),計算需要用到116台PostgreSQL。是不是很省心呢?
一個圖例:
每一層都可以擴展
從lvs到 haproxy到 kafka到 PostgreSQL到 離線分析HAWQ。

1

最後更新:2017-04-01 13:38:49

  上一篇:go PostgreSQL 多元線性回歸 - 1 MADlib的安裝
  下一篇:go PostgreSQL 多元線性回歸 - 2 股票預測