閱讀982 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Storm源碼結構 (來源Storm Github Wiki)

寫在前麵

本文譯自Storm Github Wiki: Structure of the codebase,有助於深入了解Storm的設計和源碼學習。本人也是參照這個進行學習的,覺得在理解Storm設計的過程中起到了重要作用,所以也帖一份放在自己博客裏。以下的模塊分析裏沒有包括Storm 0.9.0增加的Netty模塊,對應的代碼包在Storm Github下的storm-netty文件夾內,內容比較簡單,關於這塊的release note可以參考Storm 0.9.0 Released Netty Transport,這裏有一篇Storm 的新消息傳輸機製也可以參考(這篇文章的博客裏有不少分析Storm的文章,博主本人貌似是Storm的Committer,好像在阿裏工作)。此外,在Storm Github Wiki Pages裏也可以看到不少需要的基礎內容。


結構層次

Storm的源碼共分為三個不同的層次。
首先,Storm在設計之初就考慮到了兼容多語言開發。Nimbus是一個thrift服務,topologies被定義為Thrift結構體。Thrift的運用使得Storm可以被任意開發語言使用。
其次,Storm的所有接口都是Java語言來定義的。因此,盡管Storm中的很多功能實現都是Clojure代碼(說實話,第一次看Clojure代碼的時候,第一感覺是這亂七八糟的都是些什麼啊!那麼多的括號又是什麼節奏!),但是使用這些功能都必須通過Java API。這意味著Storm的所有特性對於Java來講都是可用的。
第三,Storm的很大一部分實現都是Clojure代碼。從代碼行來看,差不多是一半Java代碼,一半Clojure代碼。但是由於Clojure在表達能力上更為見長,因此,實際上絕大多數邏輯的實現都是Clojure來做的。
接下來的小節裏將會逐個詳細解釋這三個層次。


Storm.thrift

要理解Storm的代碼結構,首先需要看的是storm.thrift文件。(在storm-core/src下)

Storm使用了從這裏folk出來的Thrift版本來自動生成代碼。這個Thrift版本實際上是將所有的Java packages都重命名為"org.apache.thrift7"之後的Thrift 7。除此之外,它與Thrfit 7是完全一樣的。之所以單獨出這樣一個Thrift版本一是考慮到Thrift缺少向後兼容,而是為了避免包名衝突以滿足一些用戶在他們自己的topologies中用到其他版本的thrift。

一個topology中的任何一個spout或bolt都會被用戶指定一個唯一標識,稱為"component id"。當描述1個bolt接收其他哪些spout或bolt的輸出時需要用到這個"component id"。StormTopology結構中保存了1個map來保存"component id"到"component"的映射關係,這個映射關係包含所有的component類型(即所有的spout、bolt)。

Thrift對Spout或bolt的定義是相同的,因此我們隻需要看一下bolt的thrift定義。它包含了1個"ComponentObject"結構和1個"ComponentCommon"結構。
union ComponentObject {
  1: binary serialized_java;
  2: ShellComponent shell;
  3: JavaObject java_object;
}
"ComponentObject"即是bolt的實現實體。它可以是以下三個類型之一:
  • 1個序列化的java對象(這個對象實現IBolt接口)
  • 1個"ShellComponent"對象,意味著bolt是由其他語言實現的。如果以這種方式來定義1個bolt,Storm將會實例化1個ShellBolt對象來負責處理基於JVM的worker進程與非JVM的component(即該bolt)實現體之間的通訊。
  • 1個"JavaObject"結構,這個結構告訴Storm實例化這個bolt所需要的classname和構造函數參數。這一點在你想用非JVM語言來定義topology時比較有用。這樣,在你使用非JVM語言來定義topology時就可以做到既使用基於JVM的spout或bolt,同時又不需要創建並序列化它們的Java對象。
struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs;
  2: required map<string, StreamInfo> streams; //key is stream id
  3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component

  // component specific configuration respects:
  // topology.debug: false
  // topology.max.task.parallelism: null // can replace isDistributed with this
  // topology.max.spout.pending: null
  // topology.kryo.register // this is the only additive one
  
  // component specific configuration
  4: optional string json_conf;
}
"ComponentCommon"定義了這個component的其他所有屬性。包括:
  • 這個component發射什麼stream以及stream的元數據(是否是direct stream,stream中field的聲明)
  • 這個component接收什麼stream(被定義在1個component_id到stream_id的map裏,在stream做分組時用到)
  • 這個component的並行度
  • 這個component的配置項configuration
注意,在spout的結構中同樣有"ComponentCommon"字段,因此,spout也是可以被聲明接收其他的stream輸入。然而,Storm Java API並沒有提供一種方式指定spout接收什麼stream,同時如果你在這裏指定1個spout的輸入聲明,在提交這個topology時將會出現報錯信息。之所以這樣設計,是因為spout的輸入聲明不是讓用戶自己來使用的,而是Storm內部使用的。Storm會在內部自動向topology添加stream和bolt來構造acking framework,其中的兩個stream就是從acker bolt發出給topology中的所有spout節點的。隻要1個tuple樹被檢測到完成了或失敗了,acker就會通過這兩個stream分別發出"ack"或"fail"消息。將用戶提交的topology轉換成運行時的topology的代碼可參見這裏

Java接口

Storm的接口定義都是Java接口。主要的接口如下:
這樣定義這些接口的主要意圖在於:
  • 以Java語言來定義接口
  • 基於此接口,可以做到在不同的場合,提供出各自最適合的默認實現基類
這一策略的實際運用可以參考BaseRichSpout

Spout和bolt就是按照以上接口描述的方式被序列化到topology的Thrift定義結構中。

值得一提的一個細節是,IBolt、ISpout與IRichBolt、IRichSpout這兩對接口是有區別的。它們主要區別是在"Rich"版本裏增加了"declareOutputFields"方法。這樣設計的原因是所有的輸出stream的輸出field聲明都必須是在Thrift結構裏的(這樣就可以做到使用任何編程語言來聲明了),但是用戶又希望能夠在自己的class中來聲明stream輸出field信息。為解決這個問題,"TopologyBuilder"在構造Thrift結構時就是通過調用"declareOutputFields"方法來得到輸出field的聲明,然後將其轉換納入Thrift結構。這個轉換操作可以從"TopologyBuilder"代碼中的這一段裏看到。

接口實現

通過將Storm所有的接口都由Java語言來定義確保了Storm的所有功能對於Java來講都是可使用的。同時,Java接口的使用也使得Java用戶在使用Storm時體驗更好。

應該說,Storm主要是由Clojure語言實現的。盡管從代碼行數上看一半是Java一半是Clojure,但其實裏麵絕大多數的邏輯實現都是Clojure。有兩個值得一提的例外就是DRPC支持事務的topology,它們二者都純Java實現的。這樣做的主要目的是來展示如何基於Storm,實現Storm之上更高層次的抽象。DRPC和支持事務的topology的實現分別位於backtype.storm.coordinationbacktype.storm.transactional包裏。

這裏總結了一份主要的Java包和Clojure命名空間的內容列表:

Java

backtype.storm.coordination: 實現了DRPC和事務性topology裏用到的基於Storm的批處理功能。這個包裏最重要得類是CoordinatedBolt
backtype.storm.drpc: DRPC的更高層次抽象的具體實現
backtype.storm.generated: 自動生成的Thrift代碼(利用這裏folk出來的Thrift版本生成的,主要是把org.apache.thrift包重命名成org.apache.thrift7來避免與其他Thrift版本的衝突)
backtype.storm.grouping: 包含了用戶實現自定義stream分組類時需要用到的接口
backtype.storm.hooks: 定義了處理storm各種事件的鉤子接口,例如當task發射tuple時、當tuple被ack時。關於鉤子的手冊詳見這裏
backtype.storm.serialization: storm序列化/反序列化tuple的實現。在Kryo之上構建。
backtype.storm.spout: spout及相關接口的定義(例如"SpoutOutputCollector")。也包括了"ShellSpout"來實現非JVM語言定義spout的協議。
backtype.storm.task: bolt及相關接口的定義(例如"OutputCollector")。也包括了"ShellBolt"來實現非JVM語言定義bolt的協議。最後,"TopologyContext"也是在這裏定義的,用來在運行時供spout和bolt使用以獲取topology的執行信息。
backtype.storm.testing: 包括了storm單元測試中用到的各種測試bolt及工具。
backtype.storm.topology: 在Thrift結構之上的Java層,用以提供一個純Java API來使用Storm(用戶不需要了解Thrift的細節)。"TopologyBuilder"及不同spout和bolt的基類們也在這裏定義。稍高一層次的接口"IBasicBolt"也在這裏定義,它會使得創建某些特定類型的bolt會更加簡潔。
backtype.storm.transactional: 包括了事務性topology的實現。
backtype.storm.tuple: 包括Storm中tuple數據模型的實現。
backtype.storm.utils: 包含了Storm源碼中用到的數據結構及各種工具類。


Clojure

backtype.storm.bootstrap: 包括了1個有用的宏來引入源碼中用到的所有類及命名空間。
backtype.storm.clojure: 包括了利用Clojure為Storm定義的特定領域語言(DSL)。
backtype.storm.cluster: Storm守護進程中用到的Zookeeper邏輯都封裝在這個文件中。這部分代碼提供了API來將整個集群的運行狀態映射到Zookeeper的"文件係統"上(例如哪裏運行著怎樣的task,每個task運行的是哪個spout/bolt)。
backtype.storm.command.*: 這些命名空間包括了各種"storm xxx"開頭的客戶端命令行的命令實現。這些實現都很簡短。
backtype.storm.config: Clojure中config的讀取/解析實現。同時也包括了工具函數來告訴nimbus、supervisor等守護進程在各種情況下應該使用哪些本地目錄。例如:"master-inbox"函數會返回本地目錄告訴Nimbus應該將上傳給它的jar包保存到哪裏。
backtype.storm.daemon.acker: "acker" bolt的實現。這是Storm確保數據被完全處理的關鍵組成部分。
backtype.storm.daemon.common: Storm守護進程用到的公共函數,例如根據topology的名字獲取其id,將1個用戶定義的topology映射到真正運行的topology(真正運行的topology是在用戶定義的topology基礎上添加了ack stream及acker bolt,參見system-topology!函數),同時包括了各種心跳及Storm中其他數據結構的定義。
backtype.storm.daemon.drpc: 包括了DRPC服務器的實現,用來與DRPC topology一起使用。
backtype.storm.daemon.nimbus: 包括了Nimbus的實現。
backtype.storm.daemon.supervisor: 包括了Supervisor的實現。
backtype.storm.daemon.task: 包括了spout或bolt的task實例實現。包括了處理消息路由、序列化、為UI提供的統計集合及spout、bolt執行動作的實現。
backtype.storm.daemon.worker: 包括了worker進程(1個worker包含很多的task)的實現。包括了消息傳輸和task啟動的實現。
backtype.storm.event: 包括了1個簡單的異步函數的執行器。Nimbus和Supervisor很多場合都用到了異步函數執行器來避免資源競爭。
backtype.storm.log: 定義了用來輸出log信息給log4j的函數。
backtype.storm.messaging.*: 定義了1個高一層次的接口來實現點對點的消息通訊。工作在本地模式時Storm會使用內存中的Java隊列來模擬消息傳遞。工作在集群模式時,消息傳遞使用的是ZeroMQ。通用的接口在protocol.clj中定義。
backtype.storm.stats: 實現了向Zookeeper中寫入UI使用的統計信息時如何進行匯總。實現了不同粒度的聚合。
backtype.storm.testing: 包括了測試Storm topology的工具。包括時間仿真,運行一組固定數量的tuple然後獲得輸出快照的"complete-topology","tracker topology"可以在集群"空閑"時做更細粒度的控製操作,以及其他工具。
backtype.storm.thrift: 包括了自動生成的Thrift API的Clojure封裝以使得使用Thrift結構更加便利。
backtype.storm.timer: 實現了1個後台定時器來延遲執行函數或者定時輪詢執行。Storm不能使用Java裏的Timer類,因為為了單測Nimbus和Supervisor,必須要與時間仿真集成起來使用。
backtype.storm.ui.*: Storm UI的實現。完全獨立於其他的代碼,通過Nimbus的Thrift API來獲取需要的數據。
backtype.storm.util: 包括了Storm代碼中用到的通用工具函數。
backtype.storm.zookeeper: 包括了Clojure對Zookeeper API的封裝,同時也提供了一些高一層次的操作例如:"mkdirs"、"delete-recursive"


(全文完)

最後更新:2017-04-03 12:54:02

  上一篇:go android的listview item點擊詳解
  下一篇:go Oracle報錯:ORA-01033:ORACLE initialization or shutdown in process