486
阿裏雲
技術社區[雲棲]
顛覆大數據分析之Storm的設計模式
顛覆大數據分析之Storm的設計模式
譯者:吳京潤 購書
我們將要學習如何實現基於Storm的一些通用設計模式。設計模式,我們也稱之為軟件工程意識,是在給定上下文環境中,針對覺設計問題的可重用的
通常解決方案。(Gamma et al. 1995)。它們是分布式遠程過程調用(DRPCs),持續計算,以及機器學習。
分布式遠程過程調用
過程調用為單機運行的程序提供了一個傳輸控製與數據的靈巧機製。把這一概念擴展到分布式係統中,出現了遠程過程調用(RPC)——過程調用的概念可以跨越網絡邊界。客戶機發起一次RPC時發生了下述事件順序:
- 調用環境要麼掛起要麼忙等待。
- 參數被編組並通過網絡傳輸到目的機、服務器或被調用者,也就是程序將要執行的地方。
- 參數被整理後,程序在遠程節點執行。
- 遠程節點的程序執行結束時,結果被傳回客戶機或源。
- 客戶端程序就像剛從一個本地過程調用返回一樣繼續執行。
實現RPC時要解決的典型問題包括:(1)參數編組與解組,(2)調用語義或在不同地址空間的參數傳遞語義,(3)在客戶端與服務器之間的控製與數據傳輸協議,還有(4)綁定或如何發現一個服務提供者,以及如何從客戶端連接它。
類似Cedar這樣的係統這幾個問題通過五個組件實現:(1)客戶端程序,(2)存根或客戶端代理,(3)RPC運行時,後來被稱做中間件,(4)服務端存根,還有(5)服務器(以服務的方式提供過程調用)。這一分層模式從用戶的通訊細節抽象出來。從之前的第二點也可以看出來,客戶端存根實現了參數編組,而RPC運行時負責向服務器傳輸請求並收集執行後的結果。服務器存根負責服務端的參數解組以及向RPC客戶端回傳結果。
最早的RPC係統包括施樂的Cebar係統(比勒爾和納爾遜 1984);同樣來自施樂的Courier係統(施樂 1981);以及由Barabara Liskov開發的。SunRPC是廣泛應用的開源RPC係統。它可以構建與UDP或TCP協議之上,並提供“至少一次”的語義(程序至少被執行一次)。它還使用SUN的外部數據表示(XDR)作為客戶端和服務器之間的數據交換格式。它通過一個被稱作port_mapper的程序綁定,通過rpcgen程序生成客戶端與服務器存根/代理。
DRPC提供了一個在Storm之上的分布式RPC實現。基本概念是高度運算密集型的程序可以從RPC的分布式實現中獲益,因為計算過程分布到整個Storm集群了。集群通過一個DRPC服務器協調DRPC請求。DRPC服務器接收來自客戶端的RPC請求,並把它們分到Storm集群,由集群節點並行的執行程序;DRPC服務器接收來自Storm集群的結果,並用它們響應客戶端。圖4.1是一個簡單的示意圖。

圖4.1 DRPC服務器與Storm集群的連接
實現了RPC功能的拓撲使用DRPCSpout從DRPC服務器拉取函數調用數據流。DRPC服務器為每一次函數調用提供惟一性ID。叫做ReturnResults的bolt連接DRPC服務器並為特定的請求ID返回結果。DRPC服務器匹配等待這一結果的客戶端請求ID,解除客戶端阻塞,回傳結果。
Storm提供了一個內建類,LinearDRPCTopologyBuilder,自動化大部分前置任務,包括設置spout,使用ReturnResults bolt返回結果,在元組分組之間為bolts提供有限的聚合功能。下麵是使用這個類的代碼片段:
01 |
public static class StringReverserBolt extends BaseBasicBolt {
|
02 |
public void execute(Tuple current_tuple, BasicOutputCollector collector){
|
03 |
String incoming_s = current_tuple.getString( 1 );
|
04 |
collector.emit( new Values(current_tuple.getValue( 0 ),
|
05 |
new StringBuffer(incoming_s))).reverse().toString());
|
08 |
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
09 |
declarer.declare( new Fields( "id" , "result" ));
|
12 |
public static void main(String[] args) throws Exception {
|
13 |
LinearDRPCTopologyBuilder drpc_top = new LinearDRPCTopologyBuilder( "exclamation" );
|
14 |
drpc_top.addBolt( new ExclaimBolt(), 3 );
|
Storm允許像啟動Nimbus一樣啟動DRPC服務器:
bin/storm drpc
DRPC服務器的位置通過參數drpc.servers在storm.yaml指定。最終,stringReverser DRPC拓撲可以像任意其它拓撲一樣使用下述命令啟動:
1 |
storm jar path/to/allmycode.jar impetus. open .StringReverse stringToBeReversed
|
顯然從名字來看,LinearDRPCTopologyBuilder類隻有在輸入數據是線性步驟/操作序列的情況下工作。對於更複雜的DRPC場景bolt組合,我們可以使用CoordinatedBolt類並實現一個自定義的拓撲構建器。
Trident:基於Storm的實時聚合
在簡要解釋之前,Trident為Storm生態係統提供嚴格一致的一性語義,類似於Pig Latin(譯者注:一種操作Map-Reduce的語言)。Trident允許諸如聚合、過濾、連接、分組等數據流操作。下麵的代碼是使用TridentTopology的一個簡單的例子:
1 |
TridentTopology topology = new TridentTopology();
|
2 |
TridentState wordCounts = topology.newStream( "input1" ,spout)
|
3 |
.each( new Fields( "sentence" ), new Split(), new Fields( "word" ))
|
4 |
.groupBy( new Fields( "word" ))
|
5 |
.persistentAggregate(MemcachedState.transactional(serverLocations),
|
6 |
new Count(), new Fields( "count" ));
|
7 |
MemcachedState.transactional(); |
上述代碼說明了使用Trident的精髓——第一行創建拓撲的一個新實例。第二行,調用newStream方法從名為“input1”的spout讀取數據。這個spout我們假設之前已經定義過了,它可以是一個KafkaSpout或者是之前提到過的Twitter fire hose(譯者注:Twitter對自己的API的稱唿,這是我根據百度搜索的結果推斷出來的)。第三行調用Split(),把構成句子的單詞分割出來,單詞計數(單詞計數是一個聚合功能)保存在一個Memcached域中。
最後更新:2017-05-22 17:01:24