閱讀486 返回首頁    go 阿裏雲 go 技術社區[雲棲]


顛覆大數據分析之Storm的設計模式

顛覆大數據分析之Storm的設計模式

譯者:吳京潤    購書

我們將要學習如何實現基於Storm的一些通用設計模式。設計模式,我們也稱之為軟件工程意識,是在給定上下文環境中,針對覺設計問題的可重用的通常解決方案。(Gamma et al. 1995)。它們是分布式遠程過程調用(DRPCs),持續計算,以及機器學習。

分布式遠程過程調用

過程調用為單機運行的程序提供了一個傳輸控製與數據的靈巧機製。把這一概念擴展到分布式係統中,出現了遠程過程調用(RPC)——過程調用的概念可以跨越網絡邊界。客戶機發起一次RPC時發生了下述事件順序:

  1. 調用環境要麼掛起要麼忙等待。
  2. 參數被編組並通過網絡傳輸到目的機、服務器或被調用者,也就是程序將要執行的地方。
  3. 參數被整理後,程序在遠程節點執行。
  4. 遠程節點的程序執行結束時,結果被傳回客戶機或源。
  5. 客戶端程序就像剛從一個本地過程調用返回一樣繼續執行。

實現RPC時要解決的典型問題包括:(1)參數編組與解組,(2)調用語義或在不同地址空間的參數傳遞語義,(3)在客戶端與服務器之間的控製與數據傳輸協議,還有(4)綁定或如何發現一個服務提供者,以及如何從客戶端連接它。

類似Cedar這樣的係統這幾個問題通過五個組件實現:(1)客戶端程序,(2)存根或客戶端代理,(3)RPC運行時,後來被稱做中間件,(4)服務端存根,還有(5)服務器(以服務的方式提供過程調用)。這一分層模式從用戶的通訊細節抽象出來。從之前的第二點也可以看出來,客戶端存根實現了參數編組,而RPC運行時負責向服務器傳輸請求並收集執行後的結果。服務器存根負責服務端的參數解組以及向RPC客戶端回傳結果。

最早的RPC係統包括施樂的Cebar係統(比勒爾和納爾遜 1984);同樣來自施樂的Courier係統(施樂 1981);以及由Barabara Liskov開發的。SunRPC是廣泛應用的開源RPC係統。它可以構建與UDP或TCP協議之上,並提供“至少一次”的語義(程序至少被執行一次)。它還使用SUN的外部數據表示(XDR)作為客戶端和服務器之間的數據交換格式。它通過一個被稱作port_mapper的程序綁定,通過rpcgen程序生成客戶端與服務器存根/代理。

DRPC提供了一個在Storm之上的分布式RPC實現。基本概念是高度運算密集型的程序可以從RPC的分布式實現中獲益,因為計算過程分布到整個Storm集群了。集群通過一個DRPC服務器協調DRPC請求。DRPC服務器接收來自客戶端的RPC請求,並把它們分到Storm集群,由集群節點並行的執行程序;DRPC服務器接收來自Storm集群的結果,並用它們響應客戶端。圖4.1是一個簡單的示意圖。

圖4.1  DRPC服務器與Storm集群的連接

實現了RPC功能的拓撲使用DRPCSpout從DRPC服務器拉取函數調用數據流。DRPC服務器為每一次函數調用提供惟一性ID。叫做ReturnResults的bolt連接DRPC服務器並為特定的請求ID返回結果。DRPC服務器匹配等待這一結果的客戶端請求ID,解除客戶端阻塞,回傳結果。

Storm提供了一個內建類,LinearDRPCTopologyBuilder,自動化大部分前置任務,包括設置spout,使用ReturnResults bolt返回結果,在元組分組之間為bolts提供有限的聚合功能。下麵是使用這個類的代碼片段:

01 public static class StringReverserBolt extends BaseBasicBolt {
02     public void execute(Tuple current_tuple, BasicOutputCollector collector){
03         String incoming_s = current_tuple.getString(1);
04         collector.emit(new Values(current_tuple.getValue(0),
05             new StringBuffer(incoming_s))).reverse().toString());
06     }
07  
08     public void declareOutputFields(OutputFieldsDeclarer declarer) {
09         declarer.declare(new Fields("id","result"));
10     }
11  
12     public static void main(String[] args) throws Exception {
13         LinearDRPCTopologyBuilder drpc_top = new LinearDRPCTopologyBuilder("exclamation");
14         drpc_top.addBolt(new ExclaimBolt(),3);
15         //..
16     }
17 }

Storm允許像啟動Nimbus一樣啟動DRPC服務器:

bin/storm drpc

DRPC服務器的位置通過參數drpc.servers在storm.yaml指定。最終,stringReverser DRPC拓撲可以像任意其它拓撲一樣使用下述命令啟動:

1 storm jar path/to/allmycode.jar impetus.open.StringReverse stringToBeReversed

顯然從名字來看,LinearDRPCTopologyBuilder類隻有在輸入數據是線性步驟/操作序列的情況下工作。對於更複雜的DRPC場景bolt組合,我們可以使用CoordinatedBolt類並實現一個自定義的拓撲構建器。

Trident:基於Storm的實時聚合

在簡要解釋之前,Trident為Storm生態係統提供嚴格一致的一性語義,類似於Pig Latin(譯者注:一種操作Map-Reduce的語言)。Trident允許諸如聚合、過濾、連接、分組等數據流操作。下麵的代碼是使用TridentTopology的一個簡單的例子:

1 TridentTopology topology = new TridentTopology();
2 TridentState wordCounts = topology.newStream("input1",spout)
3      .each(new Fields("sentence"), new Split(), new Fields("word"))
4      .groupBy(new Fields("word"))
5      .persistentAggregate(MemcachedState.transactional(serverLocations),
6      new Count(), new Fields("count"));
7 MemcachedState.transactional();

上述代碼說明了使用Trident的精髓——第一行創建拓撲的一個新實例。第二行,調用newStream方法從名為“input1”的spout讀取數據。這個spout我們假設之前已經定義過了,它可以是一個KafkaSpout或者是之前提到過的Twitter fire hose(譯者注:Twitter對自己的API的稱唿,這是我根據百度搜索的結果推斷出來的)。第三行調用Split(),把構成句子的單詞分割出來,單詞計數(單詞計數是一個聚合功能)保存在一個Memcached域中。

最後更新:2017-05-22 17:01:24

  上一篇:go  並發編程圖書
  下一篇:go  域名行業將帶來高達98億美元的巨大商機