Apache Storm 官方文檔 —— FAQ
Storm 最佳實踐
關於配置 Storm + Trident 的建議
- worker 的數量最好是服務器數量的倍數;topology 的總並發度(parallelism)最好是 worker 數量的倍數;Kafka 的分區數(partitions)最好是 Spout(特指
KafkaSpout
)並發度的倍數 - 在每個機器(supervisor)上每個拓撲應用隻配置一個 worker
- 在拓撲最開始運行的時候設置使用較少的大聚合器,並且最好是每個 worker 進程分配一個
- 使用獨立的調度器(scheduler)來分配任務(關於Scheduler 的知識請參考 xumingming 的博客 —— 譯者注)
- 在每個 worker 上隻配置使用一個 acker —— 這是 0.9.x 版本的默認特性,不過在早期版本中有所不同
- 在配置文件中開啟 GC 日誌記錄;如果一切正常,日誌中記錄的 major GC 應該會非常少
- 將 trident 的 batch interval 配置為你的集群的端到端時延的 50% 左右
- 開始時設置一個很小的
TOPOLOGY_MAX_SPOUT_PENDING
(對於 trident 可以設置為 1,對於一般的 topology 可以設置為 executor 的數量),然後逐漸增大,直到數據流不再發生變化。這時你可能會發現結果大約等於“2 × 吞吐率(每秒收到的消息數) × 端到端時延”
(最小的額定容量的2倍)。
如何避免 worker 總是出現莫名其妙的故障的問題
- 確保 Storm 對你的日誌目錄有寫權限
- 確保你的堆內存沒有溢出
- 確保所有的 worker 上都已經正確地安裝了所有的依賴庫文件
- 確保 ZooKeeper 的 hostname 不是簡單地設置為 “localhost”
- 確保集群中的每台機器上都配置好了正確、唯一的 hostname,並且這些 hostname 需要配置在所有機器的 Storm 配置文件中
- 確保 a) 不同的 worker 之間,b) 不同的 Storm 節點之間,c) Storm 與 ZooKeeper 集群之間, d) 各個 worker 與拓撲運行所需要的 Kafka/Kestrel/其他數據庫等 之間沒有開啟防火牆或者其他安全保護機製;如果有,請使用 netstat 來為各個端口之間的通信授權
Help!Storm 使用過程中無法獲取:
-
日誌文件:日誌文件默認記錄在
$STORM_HOME/logs
目錄中。請檢查你對該目錄是否有寫權限。具體的日誌配置信息位於 logback/cluster.xml 文件中(0.9 之前的版本需要在 log4j/*.properties 配置文件中進行配置。 -
最終輸出的 JVM 設置:需要在配置文件(storm.yaml)的
childopts
配置項中添加-XX+PrintFlagsFinal
命令選項。 -
最終輸出的 Java 係統屬性信息:需要在你構建拓撲的位置添加代碼
Properties props = System.getProperties(); props.list(System.out);
我應該使用多少個 worker?
worker 的完整數量是由 supervisor 配置的。每個 supervisor 會分配到一定數量的 JVM slot,你在拓撲中設置的 worker number 就是以這個 slot 數量為依據進行分配的。
不建議為每個拓撲在每台機器上分配超過一個 worker。
假如有一個運行於三台 8 核服務器節點的拓撲,它的並行度為24,每個 bolt 在每台機器上分配有 8 個 executor(即每個 CPU 核心分配一個)。這種場景下,使用三個 worker (每個 worker 分配 8 個executor)相對於使用更多的 worker (比如使用 24 個 worker,為每個 executor 分別分配一個)有三點好處:
首先,在 worker 內部將數據流重新分區到不同的 executor 的操作(比如 shuffle 或者 group-by)就不會產生觸發到傳輸 buffer 緩衝區,tuple 會直接從發送端轉儲到接收端的 buffer 區。這一點有很大的優勢。相反,如果目標 executor 是在同一台機器的不同 worker 進程內,tuple 就需要經曆“發送 -> worker 傳輸隊列 -> 本地 socket 端口 -> 接收端 worker -> 接收端 executor”這樣一個漫長的過程。雖然這個過程並不會產生網絡級傳輸,但是在同一台機器的不同進程間的傳輸損耗也是很可觀的。
其次,三個大的聚合器帶來的大的緩存空間比 24 個小聚合器帶來的小緩存空間要有用得多。因為這回降低數據傾斜造成的影響,同時提高 LRU 的性能。
最後,更少的 worker 可以有效地降低控製流的頻繁變動。
拓撲
Trident 拓撲支持多數據流嗎
Trident 拓撲可以設計成條件路徑(if-else)的工作流形式嗎?例如,bolt0 在接收 spout 的數據流時,可以根據輸入 tuple 的值來選擇將數據流發送到 bolt1 或者 bolt2,而不是同時向兩個 bolt 發送。
Trident 的 “each” 運算符可以返回一個數據流對象,你可以將該對象存儲在某個變量中,然後你可以對同一個數據流執行多個 each 操作來分解該數據流,如下述代碼所示:
Stream s = topology.each(...).groupBy(...).aggregate(...) Stream branch1 = s.each(..., FilterA) Stream branch2 = s.each(..., FilterB)
你可以使用 join、merge 或者 multiReduce 來聯結各個數據流。
到目前為止,Trident 暫時不支持輸出多個數據流。(詳見 STORM-68)
Spout
Coordinator 是什麼,為什麼會有很多 Coordinator?
Trident spout 實際上是通過 Storm 的 bolt 運行的。MasterBatchCoordinator
(MBC)封裝了 Trident 拓撲的 spout,它負責整合 Trident 中的 batch,這一點對於你所使用的任何類型的 spout 而言都是一樣的。Trident 的 batch 就是在 MBC 向各個 spout-coordinator 分發種子 tuple 的過程中生成的。Spout-coordinator bolt 知道你所定義的 spout 是如何互相協作的 —— 實際上,在使用 Kafka 的情況下,各個 spout 就是通過 spout-coordinator 來獲取 pull 消息所需要的 partition 和 offset 信息的。
在 spout 的 metadata 記錄中能夠存儲什麼信息?
隻能存儲少量靜態數據,而且是越少越好(盡管你確實可以向其中存儲更多的信息,不過我們不推薦這樣做)。
emitPartitionBatchNew
函數是多久調用一次的?
由於在 Trident 中 MBC 才是實際運行的 spout,一個 batch 中的所有 tuple 都是 MBC 生成的 tuple 樹的節點。也就是說,Storm 的 “max spout pending” 參數實際上定義的是可以並發運行的 batch 數量。MBC 在滿足以下兩個條件下會發送出一個新的 batch:首先,掛起的 tuple 數需要小於 “max pending” 參數;其次,距離上一個 batch 的發送已經過去了至少一個trident batch interval 的間隔時間。
如果沒有數據發送,Trident 會降低發送頻率嗎?
是的,Storm 中有一個可選的 “spout 等待策略”,默認配置是 sleep 一段指定的配置時間。
Trident batch interval 參數有什麼用?
你知道 486 時代的計算機上麵為什麼有個 trubo button 嗎?這個參數的作用和這個按鈕有點像。
實際上,trident batch interval 有兩個用處。首先,它可以用於減緩 spout 從遠程數據源獲取數據的速度,但這不會影響數據處理的效率。例如,對於一個從給定的 S3 存儲區中讀取批量上傳文件並按行發送數據的 spout,我們就不希望它經常觸發 S3 的閾值,因為文件要隔幾分鍾才會上傳一次,而且每個 batch 也需要花費一定的時間來執行。
另一個用處是限製啟動期間或者突發數據負載情況下內部消息隊列的負載壓力。如果 spout 突然活躍起來,並向係統中擠入了 10 個 batch 的記錄,那麼可能會有從 batch7 開始的大量不緊急的 tuple 堵塞住傳輸緩衝區,並且阻塞了從 batch3 中的 tuple(甚至可能包含 batch3 中的部分舊 tuple)的 commit 過程#。對於這種情況,我們的解決方法就是將 trident batch interval 設置為正常的端到端處理時延的一半左右 —— 也就是說如果需要花費 600 ms 的時間處理一個 batch,那麼就可以每 300 ms 處理一個 batch。
注意,這個 300 ms 僅僅是一個上限值,而不是額外增加的延時時間,如果你的 batch 需要花費 258 ms 來運行,那麼 Trident 就隻會延時等待 42 ms。
如何設置 batch 大小?
Trident 本身不會對 batch 進行限製。不過如果使用 Kafka 的相關 spout,那麼就可以使用 max fetch bytes 大小除以 平均 record 大小來計算每個子 batch 分區的有效 record 大小。
怎樣重新設置 batch 的大小?
Trident 的 batch 在某種意義上是一種過載的設施。batch 大小與 partition 的數量均受限於或者是可以用於定義#:
- 事務安全單元(一段時間內存在風險的 tuple);
- 相對於每個 partition,一個用於窗口數據流分析的有效窗口機製;
- 相對於每個 partition,使用 partitionQuery,partitionPersist 等命令時能夠同時進行的查詢操作數量;
- 相對於每個 partition,spout 能夠同時分配的 record 數量。
不能在 batch 生成之後更改 batch 的大小,不過可以通過 shuffle 操作以及修改並行度的方式來改變 partition 的數量。
時間相關問題
怎樣基於指定時間聚合數據
對於帶有固定時間戳的 records,如果需要對他們執行計數、求均值或者聚合操作,並將結果整合到離散的時間桶(time bucket)中,Trident 是一個很好的具有可擴展性的解決方案。
這種情況下可以寫一個 each
函數來將時間戳置入一個時間桶中:如果桶的大小是以“小時”為單位的,那麼時間戳 2013-08-08 12:34:56
就會被匹配到 2013-08-08 12:00:00
桶中,其他的 12 時到 13 時之間的時間也一樣。然後可以使用persistentAggregate
來對時間桶分組。persistentAggregate
會使用一個基於數據存儲的本地 cacheMap。這些包含有大量 records 的 group 會使用高效的批量讀取/寫入方式對數據存儲區進行操作,所以並不會對數據存儲區進行大量的讀操作;隻要你的數據傳送足夠快捷,Trident 就可以高效地使用內存與網絡。即使某台服務器宕機了一天,需要重新快速地發送一整天的數據,舊有的結果也可以靜默地獲取到並進行更新,並且這並不會影響當前結果的計算過程。
怎麼才能知道某個時間桶中已經收到了所有需要的 record?
很遺憾,你不會知道什麼時候所有的 event 都已經采集到了 —— 這是一個認識論問題,而不是一個分布式係統的問題。你可以:
- 使用域相關知識來設定時間限製。
- 引入標記機製:對於一個指定時間窗,確定某個 record 會處在所有的 record 的最後位置。Trident 使用這個機製來判斷一個 batch 是否結束。例如,你收到一組傳感器采集到的 records,每個傳感器都是按順序發送數據的,那麼一旦所有的傳感器都發送出一個 “3:02:xx” 的數據,你就可以知道可以開始處理這個時間窗了。
- 如果可以的話,盡量使你的處理過程增量化:每個新來的值都會使結果越來越準確。Trident ReducerAggregator 就是一個可以通過一個舊有的結果以及一組新數據來返回一個更新的結果的運算符。這使得結果可以被緩存並序列化到一個數據庫中;如果某台服務器宕機了一天,在恢複運行之後需要重新快速地發送一整天的數據,舊有的結果也可以靜默地獲取到並進行更新。
- 使用 Lambda 架構:將所有收到的事件數據歸檔到存儲區中(S3,HBase,HDFS)。在快速處理層,一旦時間窗複位,就對對應的時間桶進行處理來獲取有效結果,並且在處理過程中跳過所有比早於該時間窗的過期數據。定期地執行全局聚合操作就可以計算出一個較“正確”的結果。
附注
# 此處譯文可能不夠準確,有疑問的讀者請參考原文對應內容。
最後更新:2017-05-22 15:02:14