閱讀379 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Apache Storm 官方文檔 —— 源碼組織結構

Strom 的代碼有三個層次:

第一,Storm 在一開始就是按照兼容多語言的目的來設計的。Nimbus 是一個 Thrift 服務,拓撲也被定義為 Thrift 架構。Thrift 的使用使得 Storm 可以用於任何一種語言。

第二,所有的 Storm 接口都設計為 Java 接口。所以,盡管 Storm 核心代碼中有大量的 Clojure 實現,所有的訪問都必須經過 Java API。這就意味著 Storm 的每個特性都可以通過 Java 來實現。

第三,Storm 的實現中大量使用了 Clojure。可以說,Storm 的代碼結構大概是一半的 Java 代碼加上一半的 Clojure 代碼。但是由於 Clojure 更具有表現力,所以實際上 Storm 的核心邏輯大多是采用 Clojure 來實現的。

下麵詳細說明了每個層次的細節信息。

storm.thrift

要理解 Storm 的代碼架構,首先需要了解 storm.thrift 文件。

Storm 使用這個 fork 版本的 Thrift(“storm” 分支)來生成代碼。這個 “fork” 版本實際上就是 Thrift7,其中所有的 Java package 也都重命名成了 org.apache.thrift7。在其他方麵,它與 Thrift7 完全相同。這個 fork 主要是為了解決 Thrift 缺乏向後兼容的機製的問題,同時,也可以讓用戶在自己的 Storm 拓撲中使用其他版本的 Thrift。

拓撲中的每個 spout 或者 bolt 都有一個特定的標識,這個標識稱為“組件 id”。組件 id 主要為了從拓撲中 spout 和 bolt 的輸出流中選擇一個或多個流作為某個 bolt 訂閱的輸入流。Storm 拓撲中就包含有一個組件 id 與每種類型的組件(spout 與 bolt)相關聯的 map。

Spout 和 Bolt 有相同的 Thrift 定義。我們來看看 Bolt 的 Thrift 定義。它包含一個 ComponentObject 結構和一個ComponentCommon 結構。

ComponentObject 定義了 bolt 的實現,這個實現可以是以下三種類型中的一種:

  1. 一個 Java 序列化對象(實現了 IBolt 接口的對象)。
  2. 一個用於表明其他語言的實現的 ShellComponent 對象。以這種方式指定一個 bolt 會讓 Storm 實例化一個 ShellBolt 對象來處理基於 JVM 的 worker 進程與組件的非 JVM 實現之間的通信。
  3. 一個帶有類名與構造器參數的 Java 對象結構,Storm 可以使用這個結構來實例化 bolt。如果你需要定義一個非 JVM 語言的拓撲這個類型會很有用。使用這種方式,你可以在不創建並且序列化一個 Java 對象的情況下使用基於 JVM 的 spout 與 bolt。

ComponentCommon 定義了組件的其他方麵特性,包括:

  1. 該組件的輸出流以及每個流的 metadata(無論是一個直接流還是基於域定義的流);
  2. 該組件消費的輸入流(使用流分組所定義的一個將組件 id 與流 id 相關聯的 map 來指定);
  3. 該組件的並行度;
  4. 該組件的組件級配置。

注意,spout 的結構也有一個 ComponentCommon 域,所以理論上說 spout 也可以聲明一個輸入流。然而 Storm 的 Java API 並沒有為 spout 提供消費其他的流的方法,並且如果你為 spout 聲明了輸入流,在提交拓撲的時候也會報錯。這是因為 spout 的輸入流聲明不是為了用戶的使用,而是為了 Storm 內部的使用。Storm 會為拓撲添加隱含的流與 bolt 來設置應答框架(acking framework)。這些隱含的流中就有兩個流用於從 acker bolt 向拓撲中的每個 spout 發送消息。在發現 tuple 樹完成或者失敗之後,acker 就會通過這些隱含的流發送 “ack” 或者 “fail” 消息。將用戶的拓撲轉化為運行時拓撲的代碼在這裏

Java 接口

Storm 的對外接口基本上為 Java 接口,主要的幾個接口有:

  1. IRichBolt
  2. IRichSpout
  3. TopologyBuilder

大部分接口的策略為:

  1. 使用一個 Java 接口來定義接口;
  2. 實現一個具有適當的默認實現的 Base 類。

你可以從 BaseRichSpout 類中觀察到這種策略的工作機製。

如上所述,Spout 和 Bolt 都已經根據拓撲的 Thrift 定義進行了序列化。

在這些接口中,IBoltISpout 與 IRichBoltIRichSpout 之間存在著一些細微的差別。其中最主要的區別是帶有 “Rich” 的接口中增加了 declareOutputFields 方法。這種區別的原因主要在於每個輸出流的輸出域聲明必須是 Thrift 結構的一部分(這樣才能實現跨語言操作),而用戶本身隻需要將流聲明為自己的類的一部分即可。TopologyBuilder 在構造 Thrift 結構時所做的就是調用 declareOutputFields 方法來獲取聲明並將其轉化為 Thrift 結構。這種轉化過程可以在TopologyBuilder 的源碼中看到。

實現

通過 Java 接口來詳細說明所有的功能可以確保 Storm 的每個特征都是有效的。更重要的是,關注 Java 接口可以讓有 Java 使用經驗的用戶更易上手。

另一方麵,Storm 的核心架構主要是通過 Clojure 實現的。盡管按照一般的計數規則來說代碼庫中 Java 與 Clojure 各占 50%,但是大部分邏輯實現還是基於 Clojure 的。不過也有兩個例外,分別是 DRPC 和事務型拓撲的實現。這兩個部分是完全使用 Java 實現的。這是為了說明在 Storm 中如何實現高級抽象。DRPC 和事務型拓撲的實現分別位於backtype.storm.coordinationbacktype.storm.drpc 和 backtype.storm.transactional 包中。

以下是主要的 Java 包和 Clojure 命名空間的總結。

Java packages

backtype.storm.coordination: 實現了用於將批處理整合到 Storm 上層的功能,DRPC 和事務型拓撲都需要這個功能。CoordinatedBolt 是其中最重要的類。

backtype.storm.drpc: DRPC 高級抽象的實現。

backtype.storm.generated: 為 Storm 生成的 Thrift 代碼(使用了這個 fork 版本的 Thrift,其中僅僅將包名重命名為 org.apache.thrift7 來避免與其他 Thrift 版本的衝突)。

backtype.storm.grouping: 包含自定義流分組的接口。

backtype.storm.hooks: 用於在 Storm 中添加事件鉤子的接口,這些事件包括任務發送 tuple、tuple 被 ack 等等。

backtype.storm.serialization: Storm 序列化/反序列化 tuple 的接口。這是在 Kryo 的基礎上構建的。

backtype.storm.spout: Spout 與一些關聯接口的定義(例如 SpoutOutputCollector)。其中也包含有用於實現非 JVM 語言 spout 的協議的 ShellSpout

backtype.storm.task: Bolt 與關聯接口的定義(例如 OutputCollector)。其中也包含有用於實現非 JVM 語言 bolt 的協議的 ShellBolt。最後,TopologyContext 也是在這裏定義的,該類可以用於在拓撲運行時為 spout 和 bolt 提供拓撲以及他們自身執行的相關信息。

backtype.storm.testing: 包含很多 bolt 測試類以及用於 Storm 單元測試的工具類。

backtype.storm.topology: 在 Thrift 結構上層的 Java 層,用於為 Storm 提供完全的 Java API(用戶不必了解 Thrift)。TopologyBuilder 和一些為不同的 spout 和 bolt 提供幫助的基礎類都在這裏。稍微高級一點的 IBasicBolt 接口也在這裏,該接口是一種實現基本的 bolt 的簡單方式。

backtype.storm.transactional: 事務型拓撲的實現。

backtype.storm.tuple: Storm tuple 數據模型的實現。

backtype.storm.utils: 整個代碼庫中通用的數據結構和各種工具類。

Clojure namespaces

譯者注:Clojure 部分內容暫不提供翻譯。

 

最後更新:2017-05-22 13:31:55

  上一篇:go  ThreadLocal使用
  下一篇:go  Java構造器必知必會