Apache Storm 官方文檔 —— 基礎概念
Storm 係統中包含以下幾個基本概念:
- 拓撲(Topologies)
- 流(Streams)
- 數據源(Spouts)
- 數據流處理組件(Bolts)
- 數據流分組(Stream groupings)
- 可靠性(Reliability)
- 任務(Tasks)
- 工作進程(Workers)
譯者注:由於 Storm 的幾個基礎概念無論是直譯還是意譯均不夠清晰,而且還會讓習慣了 Storm 編程模型的讀者感到困惑,因此後文在提及這些概念時大多還會以英文原文出現,希望大家能夠諒解。
拓撲(Topologies)
Storm 的拓撲是對實時計算應用邏輯的封裝,它的作用與 MapReduce 的任務(Job)很相似,區別在於 MapReduce 的一個 Job 在得到結果之後總會結束,而拓撲會一直在集群中運行,直到你手動去終止它。拓撲還可以理解成由一係列通過數據流(Stream Grouping)相互關聯的 Spout 和 Bolt 組成的的拓撲結構。Spout 和 Bolt 稱為拓撲的組件(Component)。我們會在後文中給出這些概念的解釋。
相關資料
- TopologyBuilder:在 Java 中使用此類構造拓撲
- 在生產環境中運行拓撲
- 本地模式:通過本文學習如何在本地模式中開發、測試拓撲
數據流(Streams)
數據流(Streams)是 Storm 中最核心的抽象概念。一個數據流指的是在分布式環境中並行創建、處理的一組元組(tuple)的無界序列。數據流可以由一種能夠表述數據流中元組的域(fields)的模式來定義。在默認情況下,元組(tuple)包含有整型(Integer)數字、長整型(Long)數字、短整型(Short)數字、字節(Byte)、雙精度浮點數(Double)、單精度浮點數(Float)、布爾值以及字節數組等基本類型對象。當然,你也可以通過定義可序列化的對象來實現自定義的元組類型。
在聲明數據流的時候需要給數據流定義一個有效的 id。不過,由於在實際應用中使用最多的還是單一數據流的 Spout 與 Bolt,這種場景下不需要使用 id 來區分數據流,因此可以直接使用 OutputFieldsDeclarer來定義“無 id”的數據流。實際上,係統默認會給這種數據流定義一個名為“default”的 id。
相關資料
- 元組(Tuple):數據流由多個元組構成
- OutputFieldsDeclarer:用於聲明數據流和數據流對應的模式
- 序列化(Serialization):關於 Storm 元組的動態類型以及聲明自定義序列化模型的相關內容
- ISerialization:自定義的序列化模型必須實現該接口
- CONFIG.TOPOLOGY_SERIALIZATIONS:自定義的序列化模型可以通過這個配置項實現注冊
數據源(Spouts)
數據源(Spout)是拓撲中數據流的來源。一般 Spout 會從一個外部的數據源讀取元組然後將他們發送到拓撲中。根據需求的不同,Spout 既可以定義為可靠的數據源,也可以定義為不可靠的數據源。一個可靠的 Spout 能夠在它發送的元組處理失敗時重新發送該元組,以確保所有的元組都能得到正確的處理;相對應的,不可靠的 Spout 就不會在元組發送之後對元組進行任何其他的處理。
一個 Spout 可以發送多個數據流。為了實現這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream
方法來聲明定義不同的數據流,然後在發送數據時在 SpoutOutputCollector 的 emit
方法中將數據流 id 作為參數來實現數據發送的功能。
Spout 中的關鍵方法是 nextTuple
。顧名思義,nextTuple
要麼會向拓撲中發送一個新的元組,要麼會在沒有可發送的元組時直接返回。需要特別注意的是,由於 Storm 是在同一個線程中調用所有的 Spout 方法,nextTuple
不能被 Spout 的任何其他功能方法所阻塞,否則會直接導致數據流的中斷(關於這一點,阿裏的 JStorm 修改了 Spout 的模型,使用不同的線程來處理消息的發送,這種做法有利有弊,好處在於可以更加靈活地實現 Spout,壞處在於係統的調度模型更加複雜,如何取舍還是要看具體的需求場景吧——譯者注)。
Spout 中另外兩個關鍵方法是 ack
和 fail
,他們分別用於在 Storm 檢測到一個發送過的元組已經被成功處理或處理失敗後的進一步處理。注意,ack
和 fail
方法僅僅對上述“可靠的” Spout 有效。
相關資料
- IRichSpout:這是實現 Spout 的接口
- 消息的可靠性處理
數據流處理組件(Bolts)
拓撲中所有的數據處理均是由 Bolt 完成的。通過數據過濾(filtering)、函數處理(functions)、聚合(aggregations)、聯結(joins)、數據庫交互等功能,Bolt 幾乎能夠完成任何一種數據處理需求。
一個 Bolt 可以實現簡單的數據流轉換,而更複雜的數據流變換通常需要使用多個 Bolt 並通過多個步驟完成。例如,將一個微博數據流轉換成一個趨勢圖像的數據流至少包含兩個步驟:其中一個 Bolt 用於對每個圖片的微博轉發進行滾動計數,另一個或多個 Bolt 將數據流輸出為“轉發最多的圖片”結果(相對於使用2個Bolt,如果使用3個 Bolt 你可以讓這種轉換具有更好的可擴展性)。
與 Spout 相同,Bolt 也可以輸出多個數據流。為了實現這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream
方法來聲明定義不同的數據流,然後在發送數據時在 OutputCollector 的 emit
方法中將數據流 id 作為參數來實現數據發送的功能。
在定義 Bolt 的輸入數據流時,你需要從其他的 Storm 組件中訂閱指定的數據流。如果你需要從其他所有的組件中訂閱數據流,你就必須要在定義 Bolt 時分別注冊每一個組件。對於聲明為默認 id(即上文中提到的“default”——譯者注)的數據流,InputDeclarer支持訂閱此類數據流的語法糖。也就是說,如果需要訂閱來自組件“1”的數據流,declarer.shuffleGrouping("1")
與 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)
兩種聲明方式是等價的。
Bolt 的關鍵方法是 execute
方法。execute
方法負責接收一個元組作為輸入,並且使用 OutputCollector 對象發送新的元組。如果有消息可靠性保障的需求,Bolt 必須為它所處理的每個元組調用 OutputCollector
的 ack
方法,以便 Storm 能夠了解元組是否處理完成(並且最終決定是否可以響應最初的 Spout 輸出元組樹)。一般情況下,對於每個輸入元組,在處理之後可以根據需要選擇不發送還是發送多個新元組,然後再響應(ack)輸入元組。IBasicBolt 接口能夠實現元組的自動應答。
在 Bolt 中啟動新線程來進行異步處理是一種非常好的方式,因為 OutputCollector 是線程安全的對象,可以在任意時刻被調用(此處譯者保留意見,由於 Storm 的並發設計和集群的彈性擴展機製,在 Bolt 中新建的線程可能存在一定的不可控風險——譯者注)。
請注意 OutputCollector 不是線程安全的對象,所有的 emit、ack 和 fail 操作都需要在同一個線程中進行處理。更多信息請參考問題與解決一文。
相關資料
- IRichBolt:用於定義 Bolt 的基本接口
- IBasicBolt: 用於定義帶有過濾或者其他簡單的函數操作功能的 Bolt 的簡便接口
- OutputCollector:Bolt 使用此類來發送數據流
- 消息的可靠性處理
數據流分組(Stream groupings)
為拓撲中的每個 Bolt 的確定輸入數據流是定義一個拓撲的重要環節。數據流分組定義了在 Bolt 的不同任務(tasks)中劃分數據流的方式。
在 Storm 中有八種內置的數據流分組方式(原文有誤,現在已經已經有八種分組模型——譯者注),而且你還可以通過CustomStreamGrouping 接口實現自定義的數據流分組模型。這八種分組分時分別為:
- 隨機分組(Shuffle grouping):這種方式下元組會被盡可能隨機地分配到 Bolt 的不同任務(tasks)中,使得每個任務所處理元組數量能夠能夠保持基本一致,以確保集群的負載均衡。
- 域分組(Fields grouping):這種方式下數據流根據定義的“域”來進行分組。例如,如果某個數據流是基於一個名為“user-id”的域進行分組的,那麼所有包含相同的“user-id”的元組都會被分配到同一個任務中,這樣就可以確保消息處理的一致性。
- 部分關鍵字分組(Partial Key grouping):這種方式與域分組很相似,根據定義的域來對數據流進行分組,不同的是,這種方式會考慮下遊 Bolt 數據處理的均衡性問題,在輸入數據源關鍵字不平衡時會有更好的性能1。感興趣的讀者可以參考這篇論文,其中詳細解釋了這種分組方式的工作原理以及它的優點。
- 完全分組(All grouping):這種方式下數據流會被同時發送到 Bolt 的所有任務中(也就是說同一個元組會被複製多份然後被所有的任務處理),使用這種分組方式要特別小心。
- 全局分組(Global grouping):這種方式下所有的數據流都會被發送到 Bolt 的同一個任務中,也就是 id 最小的那個任務。
- 非分組(None grouping):使用這種方式說明你不關心數據流如何分組。目前這種方式的結果與隨機分組完全等效,不過未來 Storm 社區可能會考慮通過非分組方式來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個線程中執行。
- 直接分組(Direct grouping):這是一種特殊的分組方式。使用這種方式意味著元組的發送者可以指定下遊的哪個任務可以接收這個元組。隻有在數據流被聲明為直接數據流時才能夠使用直接分組方式。使用直接數據流發送元組需要使用 OutputCollector 的其中一個 emitDirect 方法。Bolt 可以通過 TopologyContext 來獲取它的下遊消費者的任務 id,也可以通過跟蹤 OutputCollector 的
emit
方法(該方法會返回它所發送元組的目標任務的 id)的數據來獲取任務 id。 - 本地或隨機分組(Local or shuffle grouping):如果在源組件的 worker 進程裏目標 Bolt 有一個或更多的任務線程,元組會被隨機分配到那些同進程的任務中。換句話說,這與隨機分組的方式具有相似的效果。
相關資料
- TopologyBuilder:使用此類構造拓撲
-
InputDeclarer:在
TopologyBuilder
中調用setBolt
方法時會返回這個對象的實例,通過該對象就可以定義 Bolt 的輸入數據流以及數據流的分組方式 - CoordinatedBolt:這個 Bolt 主要用於分布式 RPC 拓撲,其中大量使用了直接數據流與直接分組模型
可靠性(Reliability)
Storm 可以通過拓撲來確保每個發送的元組都能得到正確處理。通過跟蹤由 Spout 發出的每個元組構成的元組樹可以確定元組是否已經完成處理。每個拓撲都有一個“消息延時”參數,如果 Storm 在延時時間內沒有檢測到元組是否處理完成,就會將該元組標記為處理失敗,並會在稍後重新發送該元組。
為了充分利用 Storm 的可靠性機製,你必須在元組樹創建新結點的時候以及元組處理完成的時候通知 Storm。這個過程可以在 Bolt 發送元組時通過 OutputCollector 實現:在 emit
方法中實現元組的錨定(Anchoring),同時使用 ack
方法表明你已經完成了元組的處理。
關於可靠性保障的更多內容可以參考這篇文章:消息的可靠性處理。
任務(Tasks)
在 Storm 集群中每個 Spout 和 Bolt 都由若幹個任務(tasks)來執行。每個任務都與一個執行線程相對應。數據流分組可以決定如何由一組任務向另一組任務發送元組。你可以在 TopologyBuilder 的 setSpout
方法和 setBolt
方法中設置 Spout/Bolt 的並行度。
工作進程(Workers)
拓撲是在一個或多個工作進程(worker processes)中運行的。每個工作進程都是一個實際的 JVM 進程,並且執行拓撲的一個子集。例如,如果拓撲的並行度定義為300,工作進程數定義為50,那麼每個工作進程就會執行6個任務(進程內部的線程)。Storm 會在所有的 worker 中分散任務,以便實現集群的負載均衡。
相關資料
- Config.TOPOLOGY_WORKERS:這個配置項用於設置拓撲的工作進程數
1 Partial Key grouping 方式目前僅支持開發版,尚未加入 Storm 的正式發行版,不過可以通過
CustomStreamGrouping
間接實現該分組功能,具體的實現可以參考PartialKeyGrouping
源代碼。
最後更新:2017-05-22 15:03:08