閱讀783 返回首頁    go 技術社區[雲棲]


Storm專題二:Storm Trident API 使用詳解

一、概述
     Storm Trident中的核心數據模型就是“Stream”,也就是說,Storm Trident處理的是Stream,但是實際上Stream是被成批處理的,Stream被切分成一個個的Batch分布到集群中,所有應用在Stream上的函數最終會應用到每個節點的Batch中,實現並行計算,具體如下圖所示:
     
在Trident中有五種操作類型:
  1. Apply Locally:本地操作,所有操作應用在本地節點數據上,不會產生網絡傳輸     
  2. Repartitioning:數據流重定向,單純的改變數據流向,不會改變數據內容,這部分會有網絡傳輸
  3. Aggragation:聚合操作,會有網絡傳輸
  4. Grouped streams上的操作
  5. Merge和Join
小結:上麵提到了Trident實際上是通過把函數應用到每個節點的Batch上的數據以實現並行,而應用的這些函數就是TridentAPI,下麵我們就具體介紹一下TridentAPI的各種操作。  

二、Trident五種操作詳解

2.1 Apply Locally本地操作:操作都應用在本地節點的Batch上,不會產生網絡傳輸

2.1.1 Functions:函數操作

     函數的作用是接收一個tuple(需指定接收tuple的哪個字段),輸出0個或多個tuples。輸出的新字段值會被追加到原始輸入tuple的後麵,如果一個function不輸出tuple,那就意味這這個tuple被過濾掉了,下麵舉例說明:
  • 定義一個Function:
    public class MyFunction extends BaseFunction {
      @Override
      public void execute(TridentTuple tuple, TridentCollector collector) {
           for ( int i = 0; i < tuple.getInteger(0); i++) {
              collector.emit( new Values(i));
          }
     }
   }
     小結:Function實際上就是對經過Function函的tuple做一些操作以改變其內容。
  • 比如我們處理一個“mystream”的數據流,它有三個字段分別是[“a”, “b”, “c”] ,數據流中tuple的內容是:
     [1, 2, 3] [4, 1, 6] [3, 0, 8]
  • 我們運行我們的Function:  
 java mystream.each(new Fields("b"), new MyFunction(), new Fields("d")));
     它意思是接收輸入的每個tuple “b”字段得值,把函數結算結果做為新字段“d”追加到每個tuple後麵,然後發射出去。
  • 最終運行結果會是每個tuple有四個字段[“a”, “b”, “c”, “d”],每個tuple的內容變成了:
     [1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]
    小結:我們注意到,如果一個function發射多個tuple時,每個發射的新tuple中仍會保留原來老tuple的數據。

2.1.2 Filters:過濾操作
  • Filters很簡單,接收一個tuple並決定是否保留這個tuple。舉個例子,定義一個Filter:
 public class MyFilter extends BaseFilter {
     public boolean isKeep(TridentTuple tuple) {
           return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
     }
   }
  • 假設我們的tuples有這個幾個字段 [“a”, “b”, “c”]: 
     [1, 2, 3] [2, 1, 1] [2, 3, 4]
  • 然後運行我們的Filter:
 java mystream.each(new Fields("b", "a"), new MyFilter());
  • 則最終得到的tuple是 :
     [2, 1, 1]

     說明第一個和第三個不滿足條件,都被過濾掉了。

     小結:Filter就是一個過濾器,它決定是否需要保留當前tuple。

2.1.3 PartitionAggregate
    PartitionAggregate的作用對每個Partition中的tuple進行聚合,與前麵的函數在原tuple後麵追加數據不同,PartitionAggregate的輸出會直接替換掉輸入的tuple,僅數據PartitionAggregate中發射的tuple。下麵舉例說明:
  • 定義一個累加的PartitionAggregate:
java mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));
  • 假設我們的Stream包含兩個字段 [“a”, “b”],各個Partition的tuple內容是:
     ``` Partition 0: [“a”, 1] [“b”, 2]

     Partition 1: [“a”, 3] [“c”, 8]

     Partition 2: [“e”, 1] [“d”, 9] [“d”, 10] ```
  • 輸出的內容隻有一個字段“sum”,值是:
     ``` Partition 0: [3]

     Partition 1: [11]

     Partition 2: [20] ```

    TridentAPI提供了三個聚合器的接口:CombinerAggregator, ReducerAggregator, and Aggregator.

我們先看一下CombinerAggregator接口:   
 public interface CombinerAggregator <T> extends Serializable {
         T init(TridentTuple tuple);
         T combine(T val1, T val2);
         T zero();
    }
    CombinerAggregator接口隻返回一個tuple,並且這個tuple也隻包含一個field。init方法會先執行,它負責預處理每一個接收到的tuple,然後再執行combine函數來計算收到的tuples直到最後一個tuple到達,當所有tuple處理完時,CombinerAggregator會發射zero函數的輸出,舉個例子:
  • 定義一個CombinerAggregator實現來計數:  
 public class CombinerCount implements CombinerAggregator<Integer>{
     @Override
     public Integer init(TridentTuple tuple) {
           return 1;
     }
     @Override
     public Integer combine(Integer val1, Integer val2) {
          
           return val1 + val2;
     }
     @Override
     public Integer zero() {
           return 0;
     }
   }
     小結:當你使用aggregate 方法代替PartitionAggregate時,CombinerAggregator的好處就體現出來了,因為Trident會自動優化計算,在網絡傳輸tuples之前做局部聚合。

我們再看一下ReducerAggregator:
 public interface ReducerAggregator <T> extends Serializable {
         T init();
         T reduce(T curr, TridentTuple tuple);
     }
     ReducerAggregator通過init方法提供一個初始值,然後為每個輸入的tuple迭代這個值,最後生產處一個唯一的tuple輸出,下麵舉例說明:
  • 定義一個ReducerAggregator接口實現技術器的例子:
 public class ReducerCount implements ReducerAggregator<Long>{
     @Override
     public Long init() {
           return 0L;
     }
     @Override
     public Long reduce(Long curr, TridentTuple tuple) {
           return curr + 1;
     }
 }
最後一個是Aggregator接口,它是最通用的聚合器,它的形式如下:
  
  public interface Aggregator<T> extends Operation {
        T init(Object batchId, TridentCollector collector);
        void aggregate(T val, TridentTuple tuple, TridentCollector collector);
        void complete(T val, TridentCollector collector);
   }
    Aggregator接口可以發射含任意數量屬性的任意數據量的tuples,並且可以在執行過程中的任何時候發射:
  1. init:在處理數據之前被調用,它的返回值會作為一個狀態值傳遞給aggregate和complete方法
  2. aggregate:用來處理每一個輸入的tuple,它可以更新狀態值也可以發射tuple
  3. complete:當所有tuple都被處理完成後被調用     
    下麵舉例說明:
  • 定義一個實現來完成一個計數器:

   public class CountAgg extends BaseAggregator<CountState>{
     static class CountState { long count = 0; }
     @Override
     public CountState init(Object batchId, TridentCollector collector) {
           return new CountState();
     }
     @Override
     public void aggregate(CountState val, TridentTuple tuple, TridentCollector collector) {
          val. count+=1;
     }
     @Override
     public void complete(CountState val, TridentCollector collector) {
          collector.emit( new Values(val. count));
     }
  }
    有時候我們需要同時執行多個聚合器,這在Trident中被稱作chaining,使用方法如下:

java mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd();
    這點代碼會在每個Partition上運行count和sum函數,最終輸出一個tuple:[“count”, “sum”]
projection:投影操作
     投影操作作用是僅保留Stream指定字段的數據,比如有一個Stream包含如下字段: [“a”, “b”, “c”, “d”]
     運行如下代碼:   

java mystream.project(new Fields("b", "d"))
    則輸出的流僅包含 [“b”, “d”]字段。
2.2 Repartitioning重定向操作
     重定向操作是如何在各個任務間對tuples進行分區。分區的數量也有可能改變重定向的結果。重定向需要網絡傳輸,下麵介紹下重定向函數:
  1. shuffle:通過隨機分配算法來均衡tuple到各個分區
  2. broadcast:每個tuple都被廣播到所有的分區,這種方式在drcp時非常有用,比如在每個分區上做stateQuery
  3. partitionBy:根據指定的字段列表進行劃分,具體做法是用指定字段列表的hash值對分區個數做取模運算,確保相同字段列表的數據被劃分到同一個分區
  4. global:所有的tuple都被發送到一個分區,這個分區用來處理整個Stream
  5. batchGlobal:一個Batch中的所有tuple都被發送到同一個分區,不同的Batch會去往不同的分區
  6. Partition:通過一個自定義的分區函數來進行分區,這個自定義函數實現了 backtype.storm.grouping.CustomStreamGrouping
2.3 Aggragation聚合操作
     Trident有aggregate和 persistentAggregate方法來做聚合操作。aggregate是獨立的運行在Stream的每個Batch上的,而persistentAggregate則是運行在Stream的所有Batch上並把運算結果存儲在state source中。
     運行aggregate方法做全局聚合。當你用到  ReducerAggregator或Aggregator時,Stream首先被重定向到一個分區中,然後其中的聚合函數便在這個分區上運行。當你用到CombinerAggregator時,Trident會首先在每個分區上做局部聚合,然後把局部聚合後的結果重定向到一個分區,因此使用CombinerAggregator會更高效,可能的話我們需要優先考慮使用它。
     下麵舉個例子來說明如何用aggregate進行全局計數:
java mystream.aggregate(new Count(), new Fields("count"));
和paritionAggregate一樣,aggregators的聚合也可以串聯起來,但是如果你把一個 CombinerAggregator和一個非CombinerAggregator串聯在一起,Trident是無法完成局部聚合優化的。
2.4 grouped streams
      GroupBy操作是根據特定的字段對流進行重定向的,還有,在一個分區內部,每個相同字段的tuple也會被Group到一起,下麵這幅圖描述了這個場景:
     如果你在grouped Stream上麵運行aggregators,聚合操作會運行在每個Group中而不是整個Batch。persistentAggregate也能運行在GroupedSteam上,不過結果會被保存在MapState中,其中的key便是分組的字段。
     當然,aggregators在GroupedStreams上也可以串聯。
2.5 Merge和Joins:
api的最後一部分便是如何把各種流匯聚到一起。最簡單的方式就是把這些流匯聚成一個流。我們可以這麼做:   
java topology.merge(stream1, stream2, stream3);
另一種合並流的方式就是join。一個標準的join就像是一個sql,必須有標準的輸入,因此,join隻針對符合條件的Stream。join應用在來自Spout的每一個小Batch中。join時候的tuple會包含:  
 1. join的字段,如Stream1中的key和Stream2中的x    
 2. 所有非join的字段,根據傳入join方法的順序,a和b分別代表steam1的val1和val2,c代表Stream2的val1          
     當join的是來源於不同Spout的stream時,這些Spout在發射數據時需要同步,一個Batch所包含的tuple會來自各個Spout。    


關於Trident State 相關內容請參考【Trident State詳解】 
關於TridentAPI實踐方麵,請參考【Trident API實踐】

最後更新:2017-04-03 05:40:01

  上一篇:go 我們的首要之務,並不是遙望模煳的遠方,而是專心處理眼前的事務
  下一篇:go 程序員生存定律-打造屬於自己的稀缺性