阿裏雲大數據利器之-使用sql實現流計算做實時展現業務( flume故障轉移版 )
實時業務處理的需求越來越多,也有各種處理方案,比如storm,spark等都可以。那以數據流的方向可以總結成數據源-數據搜集-緩存隊列-實時處理計算-數據展現。本文就用阿裏雲產品簡單實現了一個實時處理的方案。
按照數據流向
數據采集:flume(配置故障轉移)
緩存隊列:datahub
https://help.aliyun.com/product/53345.html?spm=5176.7618386.3.4.cigK2v
數據計算:阿裏流計算(StreamCompute)
https://help.aliyun.com/video_list/54212.html?spm=5176.7618386.3.2.COgP6l
數據落地:rds(mysql)
https://help.aliyun.com/document_detail/26092.html?spm=5176.7841871.6.539.9FTjxU
二,搭建過程
1,flume配置搭建
flume在數據采集的開源框架中還是比較常用的,但是在采集輸送到datahub中有可能網絡斷了或者服務器掛了。那這裏配置了故障轉移,如圖,其中sink1和sink2為上麵架構中的agentA和agentB.把agentA和agentB分別部署在兩台服務器上。
在搭建flume時需要安裝DatahubSink插件,參考https://help.aliyun.com/knowledge_detail/42843.html
那看下配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source這裏監控一個文件變化,寫了一個定時腳本每秒插入一條
a1.sources.r1.type = exec
a1.sources.r1.channels=c1
a1.sources.r1.command=tail -F /usr/local/shangdan/test.txt
#define sinkgroups,在這裏配置故障轉移的sink組
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=10//這裏設置sink的優先級,優先發送到級別高的sink裏
a1.sinkgroups.g1.processor.priority.k2=5
a1.sinkgroups.g1.processor.maxpenalty=10000
#define the sink 1,發送到agentA
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=agentA的ip
a1.sinks.k1.port=5555
#define the sink 2 ,發送到agentB
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=agentB的ip
a1.sinks.k2.port=5555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
~
agentA和agentB的配置文件出了ip地址不一樣,其他完全一致,這裏貼其中一個
A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels=c1
a1.sources.r1.bind= agentA的ip
a1.sources.r1.port= 5555
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = ******
a1.sinks.k1.datahub.accessKey = **********
a1.sinks.k1.datahub.endPoint = https://dh-cn-hangzhou.aliyun-inc.com
a1.sinks.k1.datahub.project = shangdantest
a1.sinks.k1.datahub.topic = databubtest
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,//這裏配置數據的分隔符
a1.sinks.k1.serializer.fieldnames = line//配置數據的字段
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
三台服務配置完成後啟動flume(先啟動agentA和agentB)預期結果是agent1發送數據到agentA(優先級高的),如果停止agentA服務,會自動轉換發送到agentB。重啟agegtA的服務後,再次切回到agentA。
如圖:正常啟動數據正常傳輸經過agent1-agentB-datahub
2,datahub創建,
在datahub控製台創建項目和topic,
設置分片和生命周期,具體方法見鏈接
https://help.aliyun.com/document_detail/47448.html?spm=5176.doc47443.6.584.UrSX1A;
datahub中看到有flume傳過來的數據
3,配置阿裏流計算
登錄阿裏流計算控製台
注冊數據源datahub/rds(也支持阿裏其他類型數據源)-編寫流計算腳本-調試-上線-啟動
如圖先注冊數據源供腳本使用。必須要有數據來源表和數據結果表。
在編寫腳本時,可以直接引用表,會自動插入表結構和配置信息,非常方便
那開始編寫腳本必須包括三部分
1,創建數據來源表,這裏是datahub表
2,創建數據結果表,這裏是rds表
3,將來源表數據寫入結果表,並進行計算
腳本編寫完畢,點擊上方【調試】,可以自己先準備一些數據上傳測試。也可以直接線上測試,點擊上麵【上線】,上線成功後在【運維】中能看到項目,點擊啟動,項目啟動幾秒就工作了如圖:
然後可以看到監控狀態,計算延遲,數據是否傾斜等指標,也有更詳細的鏈路可以查看
好神奇,幾句sql數據就源源不斷的流過來,那麼前端或者其他業務層可以過來拿數據展示了,如果有複雜邏輯計算的,可以申請開通流計算的udf功能,這樣看來,學好sql和java,走遍天下都不怕。
數據可視化部分可以參考使用阿裏雲產品dataV,實現類似雙十一大屏效果,也可以使用產品Quick-BI做實時報表。
最後更新:2017-07-26 09:04:15