閱讀932 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Apache Storm 官方文檔 —— 分布式 RPC

分布式 RPC(DRPC)的設計目標是充分利用 Storm 的計算能力實現高密度的並行實時計算。Storm 接收若幹個函數參數作為輸入流,然後通過 DRPC 輸出這些函數調用的結果。嚴格來說,DRPC 並不能算作是 Storm 的一個特性,因為它隻是一種基於 Storm 原語 (Stream、Spout、Bolt、Topology) 實現的計算模式。雖然可以將 DRPC 從 Storm 中打包出來作為一個獨立的庫,但是與 Storm 集成在一起顯然更有用。

概述

DRPC 是通過一個 DRPC 服務端(DRPC server)來實現分布式 RPC 功能的。DRPC server 負責接收 RPC 請求,並將該請求發送到 Storm 中運行的 Topology,等待接收 Topology 發送的處理結果,並將該結果返回給發送請求的客戶端。因此,從客戶端的角度來說,DPRC 與普通的 RPC 調用並沒有什麼區別。例如,以下是一個使用參數 “https://twitter.com” 調用 “reach” 函數計算結果的例子:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "https://twitter.com");

下圖是 DRPC 的原理示意圖。

DRPC

客戶端通過向 DRPC 服務器發送待執行函數的名稱以及該函數的參數來獲取處理結果。實現該函數的拓撲使用一個DRPCSpout 從 DRPC 服務器中接收一個函數調用流。DRPC 服務器會為每個函數調用都標記了一個唯一的 id。隨後拓撲會執行函數來計算結果,並在拓撲的最後使用一個名為 ReturnResults 的 bolt 連接到 DRPC 服務器,根據函數調用的 id 來將函數調用的結果返回。

定義 DRPC 拓撲

可以直接使用普通的拓撲構造方法來構造 DRPC 拓撲,如下所示:

public static class ExclaimBolt extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String input = tuple.getString(1);
        collector.emit(new Values(tuple.getValue(0), input + "!"));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("id", "result"));
    }
}
public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    // builder.setSpout(drpcSpout);
    // builder.setBolt(new ExclaimBolt(), 3);
    // submit(builder.createTopology());
}

本地模式 DRPC

DRPC 可以在本地模式下運行。以下是使用本地模式構造拓撲的例子:

LocalDRPC drpc = new LocalDRPC();
DRPCSpout spout = new DRPCSpout("exclamation", drpc);
builder.setSpout("drpc", spout);
builder.setBolt("exclaim", new ExclaimBolt(), 3)
        .shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 3)
        .shuffleGrouping("exclaim");

LocalCluster cluster = new LocalCluster();
Config conf = new Config();
cluster.submitTopology("drpc-demo", conf, builder.createTopology());

// local mode 測試代碼
System.out.println(drpc.execute("exclamation", "hello"));

cluster.shutdown();
drpc.shutdown();

在這種模式下,首先你會創建一個 LocalDPRC 對象,該對象會在進程中模擬一個 DRPC 服務器,其作用類似於LocalCluster 在進程中模擬 Storm 集群的功能。在定義好拓撲的各個組件之後,就可以使用 LocalCluster 來提交拓撲。在本地模式下 LocalDPRC 對象不會綁定到任何一個實際的端口,所以需要通過向 DRPCSpout 傳入參數的方式來關聯到拓撲中。

在啟動拓撲後,你可以使用 execute 方法來完成 DRPC 調用。

遠程模式 DRPC

在一個實際的集群中使用 DRPC 有以下三個步驟:

  1. 配置並啟動 DRPC 服務器;
  2. 在集群的各個服務器上配置 DRPC 服務器的地址;
  3. 將 DRPC 拓撲提交到集群運行。

可以像 Nimbus、Supervisor 那樣使用 storm 命令來啟動 DRPC 服務器(注意,此 server 的基本配置,如 nimbus,ZooKeeper 等參數應該與 Storm 集群其他機器相同):

bin/storm drpc

接下來,你需要在集群的各個服務器上配置 DRPC 服務器的地址。這是為了讓 DRPCSpout 了解從哪裏獲取函數調用的方法。可以通過編輯 storm.yaml 或者添加拓撲配置的方式實現配置。配置 storm.yaml 的方式類似於下麵這樣:

drpc.servers:
  - "drpc1.foo.com"
  - "drpc2.foo.com"

最後,你可以像其他拓撲一樣使用 StormSubmitter 來啟動拓撲。

以下是使用遠程模式構造拓撲的一個例子:

TopologyBuilder builder = new TopologyBuilder();

DRPCSpout spout = new DRPCSpout("exclamation");
builder.setSpout("drpc", spout, 3);
builder.setBolt("exclaim", new ExclamationBolt(), 3)
        .shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults(), 7)
        .shuffleGrouping("exclaim");

Config conf = new Config();
conf.setNumWorkers(2);

StormSubmitter.submitTopology("drpc-demo", conf, builder.createTopology());

更複雜的例子

請參考Trident 教程一文中計算指定 URL 的 Reach 數的例子。

最後更新:2017-05-22 13:32:21

  上一篇:go  Apache Storm 官方文檔 —— Storm 與 Kestrel
  下一篇:go  Apache Storm 官方文檔 —— Hooks