閱讀140 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Apache Storm 官方文檔 —— Trident API 概述

Trident 的核心數據模型是“流”(Stream),不過與普通的拓撲不同的是,這裏的流是作為一連串 batch 來處理的。流是分布在集群中的不同節點上運行的,並且對流的操作也是在流的各個 partition 上並行運行的。

Trident 中有 5 類操作:

  1. 針對每個小分區(partition)的本地操作,這類操作不會產生網絡數據傳輸;
  2. 針對一個數據流的重新分區操作,這類操作不會改變數據流中的內容,但是會產生一定的網絡傳輸;
  3. 通過網絡數據傳輸進行的聚合操作;
  4. 針對數據流的分組操作;
  5. 融合與聯結操作。

本地分區操作

本地分區操作是在每個分區塊上獨立運行的操作,其中不涉及網絡數據傳輸。

函數

函數負責接收一個輸入域的集合並選擇輸出或者不輸出 tuple。輸出 tuple 的域會被添加到原始數據流的輸入域中。如果一個函數不輸出 tuple,那麼原始的輸入 tuple 就會被直接過濾掉。否則,每個輸出 tuple 都會複製一份輸入 tuple 。假設你有下麵這樣的函數:

public class MyFunction extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }
}

再假設你有一個名為 “mystream” 的數據流,這個流中包含下麵幾個 tuple,每個 tuple 中包含有 “a”、“b”、“c” 三個域:

[1, 2, 3]
[4, 1, 6]
[3, 0, 8]

如果你運行這段代碼:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

那麼最終輸出的結果 tuple 就會包含有 “a”、“b”、“c”、“d” 4 個域,就像下麵這樣:

[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]

過濾器

過濾器負責判斷輸入的 tuple 是否需要保留。以下麵的過濾器為例:

public class MyFilter extends BaseFilter {
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
    }
}

通過使用這段代碼:

mystream.each(new Fields("b", "a"), new MyFilter())

就可以將下麵這樣帶有 “a”、“b”、“c” 三個域的 tuple

[1, 2, 3]
[2, 1, 1]
[2, 3, 4]

最終轉化成這樣的結果 tuple:

[2, 1, 1]

partitionAggregate

partitionAggregate 會在一批 tuple 的每個分區上執行一個指定的功能操作。與上麵的函數不同,由 partitionAggregate發送出的 tuple 會將輸入 tuple 的域替換。以下麵這段代碼為例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假如輸入流中包含有 “a”、“b” 兩個域並且有以下幾個 tuple 塊:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]

經過上麵的代碼之後,輸出就會變成帶有一個名為 “sum” 的域的數據流,其中的 tuple 就是這樣的:

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]

Storm 有三個用於定義聚合器的接口:CombinerAggregatorReducerAggregator 以及 Aggregator

這是 CombinerAggregator 接口:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

CombinerAggregator 會將帶有一個域的一個單獨的 tuple 返回作為輸出。CombinerAggregator 會在每個輸入 tuple 上運行初始化函數,然後使用組合函數來組合所有輸入的值。如果在某個分區中沒有 tuple, CombinerAggregator 就會輸出zero 方法的結果。例如,下麵是 Count 的實現代碼:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

如果你使用 aggregate 方法來代替 partitionAggregate 方法,你就會發現 CombinerAggregator 的好處了。在這種情況下,Trident 會在發送 tuple 之前通過分區聚合操作來優化計算過程。

ReducerAggregator 的接口實現是這樣的:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

ReducerAggregator 會使用 init 方法來產生一個初始化的值,然後使用該值對每個輸入 tuple 進行遍曆,並最終生成並輸出一個單獨的 tuple,這個 tuple 中就包含有我們需要的計算結果值。例如,下麵是將 Count 定義為 ReducerAggregator 的代碼:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}

ReducerAggregator 同樣可以用於 persistentAggregate,你會在後麵看到這一點。

最常用的聚合器接口還是下麵的 Aggregator 接口:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

Aggregator 聚合器可以生成任意數量的 tuple,這些 tuple 也可以帶有任意數量的域。聚合器可以在執行過程中的任意一點輸出tuple,他們的執行過程是這樣的:

  1. 在處理一批數據之前先調用 init 方法。init 方法的返回值是一個代表著聚合狀態的對象,這個對象接下來會被傳入 aggregate 方法和 complete 方法中。
  2. 對於一個區塊中的每個 tuple 都會調用 aggregate 方法。這個方法能夠更新狀態並且有選擇地輸出 tuple。
  3. 在區塊中的所有 tuple 都被 aggregate 方法處理之後就會調用 complete 方法。

下麵是使用 Count 作為聚合器的代碼:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

有時你可能會需要同時執行多個聚合操作。這個過程叫做鏈式處理,可以使用下麵這樣的代碼來實現:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

這段代碼會在每個分區上分別執行 Count 和 Sum 聚合器,而輸出中隻會包含一個帶有 [“count”, “sum”] 域的單獨的 tuple。

stateQuery 與 partitionPersist

stateQuery 與 partitionPersist 會分別查詢、更新 state 數據源。你可以參考 Trident State 文檔 來了解如何使用它們。

projection

projection 方法隻會保留操作中指定的域。如果你有一個帶有 [“a”, “b”, “c”, “d”] 域的數據流,通過執行這段代碼:

mystream.project(new Fields("b", "d"))

就會使得輸出數據流中隻包含有 [“b”, “d”] 域。

重分區操作

重分區操作會執行一個用來改變在不同的任務間分配 tuple 的方式的函數。在重分區的過程中分區的數量也可能會發生變化(例如,重分區之後的並行度就有可能會增大)。重分區會產生一定的網絡數據傳輸。下麵是重分區操作的幾個函數:

  1. shuffle:通過隨機輪詢算法來重新分配目標區塊的所有 tuple。
  2. broadcast:每個 tuple 都會被複製到所有的目標區塊中。這個函數在 DRPC 中很有用 —— 比如,你可以使用這個函數來獲取每個區塊數據的查詢結果。
  3. partitionBy:該函數會接收一組域作為參數,並根據這些域來進行分區操作。可以通過對這些域進行哈希化,並對目標分區的數量取模的方法來選取目標區塊。partitionBy 函數能夠保證來自同一組域的結果總會被發送到相同的目標區間。
  4. global:這種方式下所有的 tuple 都會被發送到同一個目標分區中,而且數據流中的所有的塊都會由這個分區處理。
  5. batchGlobal:同一個 batch 塊中的所有 tuple 會被發送到同一個區塊中。當然,在數據流中的不同區塊仍然會分配到不同的區塊中。
  6. partition:這個函數使用自定義的分區方法,該方法會實現 backtype.storm.grouping.CustomStreamGrouping 接口。

聚類操作

Trident 使用 aggregate 方法和 persistentAggregate 方法來對數據流進行聚類操作。其中,aggregate 方法會分別對數據流中的每個 batch 進行處理,而 persistentAggregate 方法則會對數據流中的所有 batch 執行聚類處理,並將結果存入某個 state 中。

在數據流上執行 aggregate 方法會執行一個全局的聚類操作。在你使用 ReducerAggregator 或者 Aggregator 時,數據流首先會被重新分區成一個單獨的分區,然後聚類函數就會在該分區上執行操作。而在你使用 CombinerAggregator 時,Trident 首先會計算每個分區的部分聚類結果,然後將這些結果重分區到一個單獨的分區中,最後在網絡數據傳輸完成之後結束這個聚類過程。CombinerAggregator 比其他的聚合器的運行效率更高,在聚類時應該盡可能使用CombinerAggregator

下麵是一個使用 aggregate 來獲取一個 batch 的全局計數值的例子:

mystream.aggregate(new Count(), new Fields("count"))

與 partitionAggregate 一樣,aggregate 的聚合器也可以進行鏈式處理。然而,如果你在一個處理鏈中同時使用了CombinerAggregator 和非 CombinerAggregator,Trident 就不能對部分聚類操作進行優化了。

想要了解更多使用 persistentAggregate 的方法,可以參考 Trident State 文檔 一文。

對分組數據流的操作

通過對指定的域執行 partitionBy 操作,groupBy 操作可以將數據流進行重分區,使得相同的域的 tuple 分組可以聚集在一起。例如,下麵是一個 groupBy 操作的示例:

groupBy

如果你在分組數據流上執行聚合操作,聚合器會在每個分組(而不是整個區塊)上運行。persistentAggregate 同樣可以在一個分組數據裏上運行,這種情況下聚合結果會存儲在 MapState 中,其中的 key 就是分組的域名。

和其他操作一樣,對分組數據流的聚合操作也可以以鏈式的方式執行。

融合(Merge)與聯結(join)

Trident API 的最後一部分是聯結不同的數據流的操作。聯結數據流最簡單的方式就是將所有的數據流融合到一個流中。你可以使用 TridentTopology 的 merge 方法實現該操作,比如這樣:

topology.merge(stream1, stream2, stream3);

Trident 會將融合後的新數據流的域命名為為第一個數據流的輸出域。

聯結數據流的另外一種方法是使用 join。像 SQL 那樣的標準 join 操作隻能用於有限的輸入數據集,對於無限的數據集就沒有用武之地了。Trident 中的 join 隻會應用於每個從 spout 中輸出的小 batch。

下麵是兩個流的 join 操作的示例,其中一個流含有 [“key”, “val1″, “val2″] 域,另外一個流含有 [“x”, “val1″] 域:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

上麵的例子會使用 “key” 和 “x” 作為 join 的域來聯結 stream1 和 stream2。Trident 要求先定義好新流的輸出域,因為輸入流的域可能會覆蓋新流的域名。從 join 中輸出的 tuple 中會包含:

  1. join 域的列表。在這個例子裏,輸出的 “key” 域與 stream1 的 “key” 域以及 stream2 的 “x” 域對應。
  2. 來自所有流的非 join 域的列表。這個列表是按照傳入 join 方法的流的順序排列的。在這個例子裏,“ a” 和 “b” 域與 stream1 的 “val1” 和 “val2” 域對應;而 “c” 域則與 stream2 的 “val1” 域相對應。

在對不同的 spout 發送出的流進行 join 時,這些 spout 上會按照他們發送 batch 的方式進行同步處理。也就是說,一個處理中的 batch 中含有每個 spout 發送出的 tuple。

到這裏你大概仍然會對如何進行窗口 join 操作感到困惑。窗口操作(包括平滑窗口、滾動窗口等 —— 譯者注)主要是指將當前的 tuple 與過去若幹小時時間段內的 tuple 聯結起來的過程。

你可以使用 partitionPersist 和 stateQuery 來實現這個過程。過去一段時間內的 tuple 會以 join 域為關鍵字被保存到一個 state 源中。然後就可以使用 stateQuery 查詢 join 域來實現這個“聯結”(join)的過程。

最後更新:2017-05-22 14:33:26

  上一篇:go  每年14PB數據存儲需求,海量交通安全數據如何安放?
  下一篇:go  Samba 係列(七):在 Samba AD DC 服務器上創建共享目錄並映射到 Windows/Linux 客戶