Storm源碼結構 (來源Storm Github Wiki)
寫在前麵
本文譯自Storm Github Wiki: Structure of the codebase,有助於深入了解Storm的設計和源碼學習。本人也是參照這個進行學習的,覺得在理解Storm設計的過程中起到了重要作用,所以也帖一份放在自己博客裏。以下的模塊分析裏沒有包括Storm 0.9.0增加的Netty模塊,對應的代碼包在Storm Github下的storm-netty文件夾內,內容比較簡單,關於這塊的release note可以參考Storm 0.9.0 Released Netty Transport,這裏有一篇Storm 的新消息傳輸機製也可以參考(這篇文章的博客裏有不少分析Storm的文章,博主本人貌似是Storm的Committer,好像在阿裏工作)。此外,在Storm Github Wiki Pages裏也可以看到不少需要的基礎內容。
結構層次
Storm的源碼共分為三個不同的層次。
首先,Storm在設計之初就考慮到了兼容多語言開發。Nimbus是一個thrift服務,topologies被定義為Thrift結構體。Thrift的運用使得Storm可以被任意開發語言使用。
其次,Storm的所有接口都是Java語言來定義的。因此,盡管Storm中的很多功能實現都是Clojure代碼(說實話,第一次看Clojure代碼的時候,第一感覺是這亂七八糟的都是些什麼啊!那麼多的括號又是什麼節奏!),但是使用這些功能都必須通過Java API。這意味著Storm的所有特性對於Java來講都是可用的。
第三,Storm的很大一部分實現都是Clojure代碼。從代碼行來看,差不多是一半Java代碼,一半Clojure代碼。但是由於Clojure在表達能力上更為見長,因此,實際上絕大多數邏輯的實現都是Clojure來做的。
接下來的小節裏將會逐個詳細解釋這三個層次。
Storm.thrift
Storm使用了從這裏folk出來的Thrift版本來自動生成代碼。這個Thrift版本實際上是將所有的Java packages都重命名為"org.apache.thrift7"之後的Thrift 7。除此之外,它與Thrfit 7是完全一樣的。之所以單獨出這樣一個Thrift版本一是考慮到Thrift缺少向後兼容,而是為了避免包名衝突以滿足一些用戶在他們自己的topologies中用到其他版本的thrift。
一個topology中的任何一個spout或bolt都會被用戶指定一個唯一標識,稱為"component id"。當描述1個bolt接收其他哪些spout或bolt的輸出時需要用到這個"component id"。StormTopology結構中保存了1個map來保存"component id"到"component"的映射關係,這個映射關係包含所有的component類型(即所有的spout、bolt)。
Thrift對Spout或bolt的定義是相同的,因此我們隻需要看一下bolt的thrift定義。它包含了1個"ComponentObject"結構和1個"ComponentCommon"結構。
union ComponentObject { 1: binary serialized_java; 2: ShellComponent shell; 3: JavaObject java_object; }"ComponentObject"即是bolt的實現實體。它可以是以下三個類型之一:
- 1個序列化的java對象(這個對象實現IBolt接口)
- 1個"ShellComponent"對象,意味著bolt是由其他語言實現的。如果以這種方式來定義1個bolt,Storm將會實例化1個ShellBolt對象來負責處理基於JVM的worker進程與非JVM的component(即該bolt)實現體之間的通訊。
- 1個"JavaObject"結構,這個結構告訴Storm實例化這個bolt所需要的classname和構造函數參數。這一點在你想用非JVM語言來定義topology時比較有用。這樣,在你使用非JVM語言來定義topology時就可以做到既使用基於JVM的spout或bolt,同時又不需要創建並序列化它們的Java對象。
struct ComponentCommon { 1: required map<GlobalStreamId, Grouping> inputs; 2: required map<string, StreamInfo> streams; //key is stream id 3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component // component specific configuration respects: // topology.debug: false // topology.max.task.parallelism: null // can replace isDistributed with this // topology.max.spout.pending: null // topology.kryo.register // this is the only additive one // component specific configuration 4: optional string json_conf; }
- 這個component發射什麼stream以及stream的元數據(是否是direct stream,stream中field的聲明)
- 這個component接收什麼stream(被定義在1個component_id到stream_id的map裏,在stream做分組時用到)
- 這個component的並行度
- 這個component的配置項configuration
Java接口
這樣定義這些接口的主要意圖在於:
- 以Java語言來定義接口
- 基於此接口,可以做到在不同的場合,提供出各自最適合的默認實現基類
Spout和bolt就是按照以上接口描述的方式被序列化到topology的Thrift定義結構中。
值得一提的一個細節是,IBolt、ISpout與IRichBolt、IRichSpout這兩對接口是有區別的。它們主要區別是在"Rich"版本裏增加了"declareOutputFields"方法。這樣設計的原因是所有的輸出stream的輸出field聲明都必須是在Thrift結構裏的(這樣就可以做到使用任何編程語言來聲明了),但是用戶又希望能夠在自己的class中來聲明stream輸出field信息。為解決這個問題,"TopologyBuilder"在構造Thrift結構時就是通過調用"declareOutputFields"方法來得到輸出field的聲明,然後將其轉換納入Thrift結構。這個轉換操作可以從"TopologyBuilder"代碼中的這一段裏看到。
接口實現
應該說,Storm主要是由Clojure語言實現的。盡管從代碼行數上看一半是Java一半是Clojure,但其實裏麵絕大多數的邏輯實現都是Clojure。有兩個值得一提的例外就是DRPC和支持事務的topology,它們二者都純Java實現的。這樣做的主要目的是來展示如何基於Storm,實現Storm之上更高層次的抽象。DRPC和支持事務的topology的實現分別位於backtype.storm.coordination和backtype.storm.transactional包裏。
這裏總結了一份主要的Java包和Clojure命名空間的內容列表:
Java
backtype.storm.drpc: DRPC的更高層次抽象的具體實現
backtype.storm.generated: 自動生成的Thrift代碼(利用這裏folk出來的Thrift版本生成的,主要是把org.apache.thrift包重命名成org.apache.thrift7來避免與其他Thrift版本的衝突)
backtype.storm.grouping: 包含了用戶實現自定義stream分組類時需要用到的接口
backtype.storm.hooks: 定義了處理storm各種事件的鉤子接口,例如當task發射tuple時、當tuple被ack時。關於鉤子的手冊詳見這裏
backtype.storm.serialization: storm序列化/反序列化tuple的實現。在Kryo之上構建。
backtype.storm.spout: spout及相關接口的定義(例如"SpoutOutputCollector")。也包括了"ShellSpout"來實現非JVM語言定義spout的協議。
backtype.storm.task: bolt及相關接口的定義(例如"OutputCollector")。也包括了"ShellBolt"來實現非JVM語言定義bolt的協議。最後,"TopologyContext"也是在這裏定義的,用來在運行時供spout和bolt使用以獲取topology的執行信息。
backtype.storm.testing: 包括了storm單元測試中用到的各種測試bolt及工具。
backtype.storm.topology: 在Thrift結構之上的Java層,用以提供一個純Java API來使用Storm(用戶不需要了解Thrift的細節)。"TopologyBuilder"及不同spout和bolt的基類們也在這裏定義。稍高一層次的接口"IBasicBolt"也在這裏定義,它會使得創建某些特定類型的bolt會更加簡潔。
backtype.storm.transactional: 包括了事務性topology的實現。
backtype.storm.tuple: 包括Storm中tuple數據模型的實現。
backtype.storm.utils: 包含了Storm源碼中用到的數據結構及各種工具類。
Clojure
backtype.storm.clojure: 包括了利用Clojure為Storm定義的特定領域語言(DSL)。
backtype.storm.cluster: Storm守護進程中用到的Zookeeper邏輯都封裝在這個文件中。這部分代碼提供了API來將整個集群的運行狀態映射到Zookeeper的"文件係統"上(例如哪裏運行著怎樣的task,每個task運行的是哪個spout/bolt)。
backtype.storm.command.*: 這些命名空間包括了各種"storm xxx"開頭的客戶端命令行的命令實現。這些實現都很簡短。
backtype.storm.config: Clojure中config的讀取/解析實現。同時也包括了工具函數來告訴nimbus、supervisor等守護進程在各種情況下應該使用哪些本地目錄。例如:"master-inbox"函數會返回本地目錄告訴Nimbus應該將上傳給它的jar包保存到哪裏。
backtype.storm.daemon.acker: "acker" bolt的實現。這是Storm確保數據被完全處理的關鍵組成部分。
backtype.storm.daemon.common: Storm守護進程用到的公共函數,例如根據topology的名字獲取其id,將1個用戶定義的topology映射到真正運行的topology(真正運行的topology是在用戶定義的topology基礎上添加了ack stream及acker bolt,參見system-topology!函數),同時包括了各種心跳及Storm中其他數據結構的定義。
backtype.storm.daemon.drpc: 包括了DRPC服務器的實現,用來與DRPC topology一起使用。
backtype.storm.daemon.nimbus: 包括了Nimbus的實現。
backtype.storm.daemon.supervisor: 包括了Supervisor的實現。
backtype.storm.daemon.task: 包括了spout或bolt的task實例實現。包括了處理消息路由、序列化、為UI提供的統計集合及spout、bolt執行動作的實現。
backtype.storm.daemon.worker: 包括了worker進程(1個worker包含很多的task)的實現。包括了消息傳輸和task啟動的實現。
backtype.storm.event: 包括了1個簡單的異步函數的執行器。Nimbus和Supervisor很多場合都用到了異步函數執行器來避免資源競爭。
backtype.storm.log: 定義了用來輸出log信息給log4j的函數。
backtype.storm.messaging.*: 定義了1個高一層次的接口來實現點對點的消息通訊。工作在本地模式時Storm會使用內存中的Java隊列來模擬消息傳遞。工作在集群模式時,消息傳遞使用的是ZeroMQ。通用的接口在protocol.clj中定義。
backtype.storm.stats: 實現了向Zookeeper中寫入UI使用的統計信息時如何進行匯總。實現了不同粒度的聚合。
backtype.storm.testing: 包括了測試Storm topology的工具。包括時間仿真,運行一組固定數量的tuple然後獲得輸出快照的"complete-topology","tracker topology"可以在集群"空閑"時做更細粒度的控製操作,以及其他工具。
backtype.storm.thrift: 包括了自動生成的Thrift API的Clojure封裝以使得使用Thrift結構更加便利。
backtype.storm.timer: 實現了1個後台定時器來延遲執行函數或者定時輪詢執行。Storm不能使用Java裏的Timer類,因為為了單測Nimbus和Supervisor,必須要與時間仿真集成起來使用。
backtype.storm.ui.*: Storm UI的實現。完全獨立於其他的代碼,通過Nimbus的Thrift API來獲取需要的數據。
backtype.storm.util: 包括了Storm代碼中用到的通用工具函數。
backtype.storm.zookeeper: 包括了Clojure對Zookeeper API的封裝,同時也提供了一些高一層次的操作例如:"mkdirs"、"delete-recursive"
最後更新:2017-04-03 12:54:02