Apache Storm 官方文檔 —— 分布式 RPC
分布式 RPC(DRPC)的設計目標是充分利用 Storm 的計算能力實現高密度的並行實時計算。Storm 接收若幹個函數參數作為輸入流,然後通過 DRPC 輸出這些函數調用的結果。嚴格來說,DRPC 並不能算作是 Storm 的一個特性,因為它隻是一種基於 Storm 原語 (Stream、Spout、Bolt、Topology) 實現的計算模式。雖然可以將 DRPC 從 Storm 中打包出來作為一個獨立的庫,但是與 Storm 集成在一起顯然更有用。
概述
DRPC 是通過一個 DRPC 服務端(DRPC server)來實現分布式 RPC 功能的。DRPC server 負責接收 RPC 請求,並將該請求發送到 Storm 中運行的 Topology,等待接收 Topology 發送的處理結果,並將該結果返回給發送請求的客戶端。因此,從客戶端的角度來說,DPRC 與普通的 RPC 調用並沒有什麼區別。例如,以下是一個使用參數 “https://twitter.com” 調用 “reach” 函數計算結果的例子:
DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("reach", "https://twitter.com");
下圖是 DRPC 的原理示意圖。
客戶端通過向 DRPC 服務器發送待執行函數的名稱以及該函數的參數來獲取處理結果。實現該函數的拓撲使用一個DRPCSpout
從 DRPC 服務器中接收一個函數調用流。DRPC 服務器會為每個函數調用都標記了一個唯一的 id。隨後拓撲會執行函數來計算結果,並在拓撲的最後使用一個名為 ReturnResults
的 bolt 連接到 DRPC 服務器,根據函數調用的 id 來將函數調用的結果返回。
定義 DRPC 拓撲
可以直接使用普通的拓撲構造方法來構造 DRPC 拓撲,如下所示:
public static class ExclaimBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // builder.setSpout(drpcSpout); // builder.setBolt(new ExclaimBolt(), 3); // submit(builder.createTopology()); }
本地模式 DRPC
DRPC 可以在本地模式下運行。以下是使用本地模式構造拓撲的例子:
LocalDRPC drpc = new LocalDRPC(); DRPCSpout spout = new DRPCSpout("exclamation", drpc); builder.setSpout("drpc", spout); builder.setBolt("exclaim", new ExclaimBolt(), 3) .shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults(), 3) .shuffleGrouping("exclaim"); LocalCluster cluster = new LocalCluster(); Config conf = new Config(); cluster.submitTopology("drpc-demo", conf, builder.createTopology()); // local mode 測試代碼 System.out.println(drpc.execute("exclamation", "hello")); cluster.shutdown(); drpc.shutdown();
在這種模式下,首先你會創建一個 LocalDPRC
對象,該對象會在進程中模擬一個 DRPC 服務器,其作用類似於LocalCluster
在進程中模擬 Storm 集群的功能。在定義好拓撲的各個組件之後,就可以使用 LocalCluster
來提交拓撲。在本地模式下 LocalDPRC
對象不會綁定到任何一個實際的端口,所以需要通過向 DRPCSpout
傳入參數的方式來關聯到拓撲中。
在啟動拓撲後,你可以使用 execute
方法來完成 DRPC 調用。
遠程模式 DRPC
在一個實際的集群中使用 DRPC 有以下三個步驟:
- 配置並啟動 DRPC 服務器;
- 在集群的各個服務器上配置 DRPC 服務器的地址;
- 將 DRPC 拓撲提交到集群運行。
可以像 Nimbus、Supervisor 那樣使用 storm
命令來啟動 DRPC 服務器(注意,此 server 的基本配置,如 nimbus,ZooKeeper 等參數應該與 Storm 集群其他機器相同):
bin/storm drpc
接下來,你需要在集群的各個服務器上配置 DRPC 服務器的地址。這是為了讓 DRPCSpout
了解從哪裏獲取函數調用的方法。可以通過編輯 storm.yaml
或者添加拓撲配置的方式實現配置。配置 storm.yaml
的方式類似於下麵這樣:
drpc.servers:
- "drpc1.foo.com"
- "drpc2.foo.com"
最後,你可以像其他拓撲一樣使用 StormSubmitter
來啟動拓撲。
以下是使用遠程模式構造拓撲的一個例子:
TopologyBuilder builder = new TopologyBuilder(); DRPCSpout spout = new DRPCSpout("exclamation"); builder.setSpout("drpc", spout, 3); builder.setBolt("exclaim", new ExclamationBolt(), 3) .shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults(), 7) .shuffleGrouping("exclaim"); Config conf = new Config(); conf.setNumWorkers(2); StormSubmitter.submitTopology("drpc-demo", conf, builder.createTopology());
更複雜的例子
請參考Trident 教程一文中計算指定 URL 的 Reach 數的例子。
最後更新:2017-05-22 13:32:21