閱讀386 返回首頁    go 微軟 go Office


Apache Storm 官方文檔 —— 常用模式

本文列出了 Storm 拓撲中使用的一些常見模式,包括:

  1. 數據流的 join
  2. 批處理
  3. BasicBolt
  4. 內存緩存與域分組的結合
  5. Top N 流式計算
  6. TimeCacheMap
  7. CoordinatedBolt 與 KeyedFairBolt

Joins

數據流的 join 一般指的是通過共有的域來聚合兩個或多個數據流的過程。與一般的數據庫中 join 操作要求有限的輸入與清晰的語義不同,數據流 join 的輸入往往是無限的數據集,而且並不具備明確的語義。

join 的類型一般是由應用的需求決定的。有些應用需要將兩個流在某個固定時間內的所有 tuple 進行 join,另外一些應用卻可能要求對每個 join 域的 join 操作過程的兩側隻保留一個 tuple,而其他的應用也許還有一些其他需求。不過這些 join 類型一般都會有一個基本的模式,那就是將多個輸入流進行分區。Storm 可以很容易地使用域分組的方法將多個輸入流聚集到一個聯結 bolt 中,比如下麵這樣:

builder.setBolt("join", new MyJoiner(), parallelism)
  .fieldsGrouping("1", new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping("2", new Fields("joinfield1", "joinfield2"))
  .fieldsGrouping("3", new Fields("joinfield1", "joinfield2"));

當然,上麵的代碼隻是個例子,實際上不同的流完全可以具有不同的輸入域。

批處理

通常由於效率或者其他方麵的原因,你需要使用將 tuple 們組合成 batch 來處理,而不是一個個分別處理它們。比如,在做數據庫更新操作或者流聚合操作時,你就會需要這樣的批處理形式。

要確保數據處理的可靠性,正確的方式是在 bolt 進行批處理之前將 tuple 們緩存在一個實例變量中。在完成批處理操作之後,你就可以一起 ack 所有的緩存的 tuple 了。

如果這個批處理 bolt 還需要繼續向下遊發送 tuple,你可能還需要使用多錨定(multi-anchoring)來確保可靠性。具體怎麼做取決於應用的需求。想要了解更多關於可靠性的工作機製的內容請參考消息的可靠性保障一文。

BasicBolt

Bolt 處理 tuple 的一種基本模式是在 execute 方法中讀取輸入 tuple、發送出基於輸入 tuple 的新 tuple,然後在方法末尾對 tuple 進行應答(ack)。符合這種模式的 bolt 一般是一種函數或者過濾器。對於這種基本的處理模式,Storm 提供了IBasicBolt 接口來自動實現這個過程。更多內容請參考消息的可靠性保障一文。

內存緩存與域分組的結合

在 Storm 的 bolt 中保存一定的緩存也是一種比較常見的方式。尤其是在於域分組結合的時候,緩存的作用特別顯著。例如,假如你有一個用於將短鏈接(short URLs,例如 bit.ly, t.co,等等)轉化成長鏈接(龍 URLs)的 bolt。你可以通過一個將短鏈接映射到長鏈接的 LRU 緩存來提高係統的性能,避免反複的 HTTP 請求操作。假如現在有一個名為 “urls” 的組件用於發送短鏈接,另外有一個 “expand” 組件用於將短鏈接擴展為長鏈接,並且在 “expand” 內部保留一個緩存。讓我們來看看下麵兩段代碼有什麼不同:

builder.setBolt("expand", new ExpandUrl(), parallelism)
  .shuffleGrouping(1);
builder.setBolt("expand", new ExpandUrl(), parallelism)
  .fieldsGrouping("urls", new Fields("url"));

由於域分組可以使得相同的 URL 永遠被發往同一個 task,第二段代碼會比第一段代碼高效得多。這樣可以避免在不同的 task 的緩存中的複製動作,並且看上去短 URL 可以更好地在命中緩存。

Top N

Storm 中一種常見的連續計算模式是計算數據流中某種形式的 Top N 結果。假如現在有一個可以以 [“value”, “count”] 的形式發送 tuple 的 bolt,並且你需要一個可以根據 count 計算結果輸出前 N 個 tuple 的 bolt。實現這個操作的最簡單的方法就是使用一個對數據流進行全局分組的 bolt,並且在內存中維護一個包含 top N 結果的列表。

這種方法並不適用於大規模數據流,因為整個數據流都會發往同一個 task,會造成該 task 的內存負載過高。更好的做法是將數據流分區,同時對每個分區計算 top N 結果,然後將這些結果匯總來得到最終的全局 top N 結果。下麵是這個模式的代碼:

builder.setBolt("rank", new RankObjects(), parallelism)
  .fieldsGrouping("objects", new Fields("value"));
builder.setBolt("merge", new MergeObjects())
  .globalGrouping("rank");

這個方法之所以可行是因為第一個 bolt 的域分組操作確保了每個小分區在語義上的正確性。你可以在 storm-starter 裏看到使用這個模式的一個例子。

當然,如果待處理的數據集存在較嚴重的數據傾斜,那麼還是應該使用 partialKeyGrouping 來代替 fieldsGrouping,因為 partialKeyGrouping 可以通過兩個下遊 bolt 分散每個 key 的負載。

builder.setBolt("count", new CountObjects(), parallelism)
  .partialKeyGrouping("objects", new Fields("value"));
builder.setBolt("rank" new AggregateCountsAndRank(), parallelism)
  .fieldsGrouping("count", new Fields("key"))
builder.setBolt("merge", new MergeRanksObjects())
  .globalGrouping("rank");

這個拓撲中需要一個中間層來聚合來自上遊 bolt 數據流的分區計數結果,但這一層僅僅會做一個簡單的聚合處理,這樣 bolt 就不會受到由於數據傾斜帶來的負載壓力。你可以在 storm-starter 中看到使用這個模式的一個例子。

支持 LRU 的 TimeCacheMap

有時候你可能會需要一個能夠保留“活躍的”數據並且能夠使得超時的“非活躍的”數據自動失效的緩存。TimeCacheMap 是一個可以高效地實現此功能的數據結構。它還提供了一個鉤子用於實現在數據失效後的回調操作。

用於分布式 RPC 的 CoordinatedBolt 與 KeyedFairBolt

在構建 Storm 上層的分布式 RPC 應用時,通常會用到兩種常用的模式。現在這兩種模式已經被封裝為 CoordinatedBolt 和KeyedFairBolt,並且已經加入了 Storm 標準庫中。

CoordinatedBolt 將你的處理邏輯 bolt 包裝起來,並且在你的 bolt 收到了指定請求的所有 tuple 之後發出通知。CoordinatedBolt 中大量使用了直接數據流組來實現此功能。

KeyedFairBolt 同樣包裝了你的處理邏輯 bolt,並且可以讓你的拓撲同時處理多個 DRPC 調用,而不是每次隻執行一個。

如果需要了解更多內容請參考分布式 RPC一文。

最後更新:2017-05-22 14:01:37

  上一篇:go  Apache Storm 官方文檔 —— 配置開發環境
  下一篇:go  Apache Storm 官方文檔 —— Storm 與 Kestrel