閱讀250 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Storm Trident 詳解

英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial

----------------

Trident是在storm基礎上,一個以realtime 計算為目標的高度抽象。 它在提供處理大吞吐量數據能力的同時,也提供了低延時分布式查詢和有狀態流式處理的能力。 如果你對Pig和Cascading這種高級批量處理工具很了解的話,那麼應該畢竟容易理解Trident,因為他們之間很多的概念和思想都是類似的。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此之外,Trident 還提供了一些專門的原語,從而在基於數據庫或者其他存儲的前提下來應付有狀態的遞增式處理。


舉例說明

讓我們一起來看一個Trident的例子。在這個例子中,我們主要做了兩件事情:

  1. 從一個流式輸入中讀取語句病計算每個單詞的個數
  2. 提供查詢給定單詞列表中每個單詞當前總數的功能
因為這隻是一個例子,我們會從如下這樣一個無限的輸入流中讀取語句作為輸入:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"),
spout.setCycle(true);

這個spout會循環輸出列出的那些語句到sentence stream當中,下麵的代碼會以這個stream作為輸入並計算每個單詞的個數:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

讓我們一起來讀一下這段代碼。我們首先創建了一個TridentTopology對象。TridentTopology類相應的接口來構造Trident計算過程中的所有內容。我們在調用了TridentTopology類的newStream方法時,傳入了一個spout對象,spout對象會從外部讀取數據並輸出到當前topology當中,從而在topology中創建了一個新的數據流。在這個例子中,我們使用了上麵定義的FixedBatchSpout對象。輸入數據源同樣也可以是如Kestrel或者Kafka這樣的隊列服務。Trident會再Zookeeper中保存一小部分狀態信息來追蹤數據的處理情況,而在代碼中我們指定的字符串“spout1”就是Zookeeper中用來存儲metadata信息的Znode節點

Trident在處理輸入stream的時候會把輸入轉換成若幹個tuple的batch來處理。比如說,輸入的sentence stream可能會被拆分成如下的batch:

Batched stream

一般來說,這些小的batch中的tuple可能會在數千或者數百萬這樣的數量級,這完全取決於你的輸入的吞吐量。

Trident提供了一係列非常成熟的批量處理的API來處理這些小batch. 這些API和你在Pig或者Cascading中看到的非常類似, 你可以做group by's, joins, aggregations, 運行 functions, 執行 filters等等。當然,獨立的處理每個小的batch並不是非常有趣的事情,所以Trident提供了很多功能來實現batch之間的聚合的結果並可以將這些聚合的結果存儲到內存,Memcached, Cassandra或者是一些其他的存儲中。同時,Trident還提供了非常好的功能來查詢實時狀態。這些實時狀態可以被Trident更新,同時它也可以是一個獨立的狀態源。

回到我們的這個例子中來,spout輸出了一個隻有單一字段“sentence”的數據流。在下一行,topology使用了Split函數來拆分stream中的每一個tuple,Split函數讀取輸入流中的“sentence”字段並將其拆分成若幹個word tuple。每一個sentence tuple可能會被轉換成多個word tuple,比如說"the cow jumped over the moon" 會被轉換成6個 "word" tuples. 下麵是Split的定義:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

如你所見,真的很簡單。它隻是簡單的根據空格拆分sentence,並將拆分出的每個單詞作為一個tuple輸出。

topology的其他部分計算單詞的個數並將計算結果保存到了持久存儲中。首先,word stream被根據“word”字段進行group操作,然後每一個group使用Count聚合器進行持久化聚合。persistentAggregate會幫助你把一個狀態源聚合的結果存儲或者更新到存儲當中。在這個例子中,單詞的數量被保持在內存中,不過我們可以很簡單的把這些數據保存到其他的存儲當中,如 Memcached, Cassandra等。如果我們要把結果存儲到Memcached中,隻是簡單的使用下麵這句話替換掉persistentAggregate就可以,這當中的"serverLocations"是Memcached cluster的主機和端口號列表:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
MemcachedState.transactional()

persistentAggregate存儲的數據就是所有batch聚合的結果。

Trident非常酷的一點就是它是完全容錯的,擁有者有且隻有一次處理的語義。這就讓你可以很輕鬆的使用Trident來進行實時數據處理。Trident會把狀態以某種形式保持起來,當有錯誤發生時,它會根據需要來恢複這些狀態。

persistentAggregate方法會把數據流轉換成一個TridentState對象。在這個例子當中,TridentState對象代表了所有的單詞的數量。我們會使用這個TridentState對象來實現在計算過程中的進行分布式查詢。

下麵這部分實現了一個低延時的單詞數量的分布式查詢。這個查詢以一個用空格分割的單詞列表為輸入,並返回這些單詞當天的個數。這些查詢是想普通的RPC調用那樣被執行的,要說不同的話,那就是他們在後台是並行執行的。下麵是執行查詢的一個例子:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

如你所見,除了這是並發執行在storm cluster上之外,這看上去就是一個正常的RPC調用。這樣的簡單查詢的延時通常在10毫秒左右。當然,更負責的DRPC調用可能會占用更長的時間,盡管延時很大程度上是取決於你給計算分配了多少資源。

這個分布式查詢的實現如下所示:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

我們仍然是使用TridentTopology對象來創建DRPC stream,並且我們將這個函數命名為“words”。這個函數名會作為第一個參數在使用DRPC Client來執行查詢的時候用到。

每個DRPC請求會被當做隻有一個tuple的batch來處理。在處理的過程中,以這個輸入的單一tuple來表示這個請求。這個tuple包含了一個叫做“args”的字段,在這個字段中保存了客戶端提供的查詢參數。在這個例子中,這個參數是一個以空格分割的單詞列表。

首先,我們使用Splict功能把入參拆分成獨立的單詞。然後對“word” 進行group by操作,之後就可以使用stateQuery來在上麵代碼中創建的TridentState對象上進行查詢。stateQuery接受一個數據源(在這個例子中,就是我們的topolgoy所計算的單詞的個數)以及一個用於查詢的函數作為輸入。在這個例子中,我們使用了MapGet函數來獲取每個單詞的出現個數。由於DRPC stream是使用跟TridentState完全同樣的group方式(按照“word”字段進行group),每個單詞的查詢會被路由到TridentState對象管理和更新這個單詞的分區去執行。

接下來,我們用FilterNull這個過濾器把從未出現過的單詞給去掉,並使用Sum這個聚合器將這些count累加起來。最終,Trident會自動把這個結果發送回等待的客戶端。

Trident在如何最大程度的保證執行topogloy性能方麵是非常智能的。在topology中會自動的發生兩件非常有意思的事情:

  1. 讀取和更新狀態的操作 (比如說 persistentAggregate 和 stateQuery) 會自動的是batch的形式操作狀態。 如果有20次更新需要被同步到存儲中,Trident會自動的把這些操作匯總到一起,隻做一次讀一次寫,而不是進行20次讀20次寫的操作。因此你可以在很方便的執行計算的同時,保證了非常好的性能。
  2. Trident的聚合器已經是被優化的非常好了的。Trident並不是簡單的把一個group中所有的tuples都發送到同一個機器上麵進行聚合,而是在發送之前已經進行過一次部分的聚合。打個比方,Count聚合器會先在每個partition上麵進行count,然後把每個分片count匯總到一起就得到了最終的count。這個技術其實就跟MapReduce裏麵的combiner是一個思想。

讓我們再來看一下Trident的另外一個例子。

Reach

下一個例子是一個純粹的DRPC topology。這個topology會計算一個給定URL的reach。那麼什麼事reach呢,這裏我們將reach定義為有多少個獨立用戶在Twitter上麵expose了一個給定的URL,那麼我們就把這個數量叫做這個URL的reach。要計算reach,你需要tweet過這個URL的所有人,然後找到所有follow這些人的人,並將這些follower去重,最後就得到了去重後的follower的數量。如果把計算reach的整個過程都放在一台機器上麵,就太勉強了,因為這會需要進行數千次數據庫調用以及上一次的tuple的讀取。如果使用Storm和Trident,你就可以把這些計算步驟在整個cluster中進行並發。

這個topology會讀取兩個state源。一個用來保存URL以及tweet這個URL的人的關係的數據庫。還有一個保持人和他的follower的關係的數據庫。topology的定義如下:

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState());

topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
       .parallelismHint(200)
       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
       .groupBy(new Fields("follower"))
       .aggregate(new One(), new Fields("one"))
       .parallelismHint(20)
       .aggregate(new Count(), new Fields("reach"));

這個topology使用newStaticState方法創建了TridentState對象來代表一種外部存儲。使用這個TridentState對象,我們就可以在這個topology上麵進行動態查詢了。和所有的狀態源一樣,在數據庫上麵的查找會自動被批量執行,從而最大程度的提升效率。

這個topology的定義是非常直觀的 - 隻是一個簡單的批量處理job。首先,查詢urlToTweeters數據庫來得到tweet過這個URL的人員列表。這個查詢會返回一個列表,因此我們使用ExpandList函數來把每一個反悔的tweeter轉換成一個tuple。

接下來,我們來獲取每個tweeter的follower。我們使用shuffle來把要處理的tweeter分布到toplology運行的每一個worker中並發去處理。然後查詢follower數據庫從而的到每個tweeter的follower。你可以看到我們為topology的這部分分配了很大的並行度,這是因為這部分是整個topology中最耗資源的計算部分。

然後我們在follower上麵使用group by操作進行分組,並對每個組使用一個聚合器。這個聚合器隻是簡單的針對每個組輸出一個tuple “One”,再count “One” 從而的到不同的follower的數量。“One”聚合器的定義如下:

public class One implements CombinerAggregator<Integer> {
   public Integer init(TridentTuple tuple) {
       return 1;
   }

   public Integer combine(Integer val1, Integer val2) {
       return 1;
   }

   public Integer zero() {
       return 1;
   }        
}

這是一個"匯總聚合器", 它會在傳送結果到其他worker匯總之前進行局部匯總,從而來最大程度上提升性能。Sum也是一個匯總聚合器,因此以Sum作為topology的最終操作是非常高效的。

接下來讓我們一起來看看Trident的一些細節。

Fields and tuples

Trident的數據模型就是TridentTuple - 一個有名的值的列表。在一個topology中,tuple是在一係列的處理操作(operation)中增量生成的。operation一般以一組子彈作為輸入並輸出一組功能字段。Operation的輸入字段經常是輸入tuple的一個子集,而功能字段則是operation的輸出。

看一下如下這個例子。假定你有一個叫做“stream”的stream,它包含了“x”,"y"和"z"三個字段。為了運行一個讀取“y”作為輸入的過濾器MyFilter,你可以這樣寫:

stream.each(new Fields("y"), new MyFilter())

假定MyFilter的實現是這樣的:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
       return tuple.getInteger(0) < 10;
   }
}

這會保留所有“y”字段小於10的tuples。TridentTuple傳個MyFilter的輸入將隻有字段“y”。這裏需要注意的是,當選擇輸入字段時,Trident會自動投影tuple的一個子集,這個操作是非常高效的。

讓我們一起看一下“功能字段”是怎樣工作的。假定你有如下這個功能:

public class AddAndMultiply extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       int i1 = tuple.getInteger(0);
       int i2 = tuple.getInteger(1);
       collector.emit(new Values(i1 + i2, i1 * i2));
   }
}

這個函數接收兩個數作為輸入並輸出兩個新的值:“和”和“乘積”。假定你有一個stream,其中包含“x”,"y"和"z"三個字段。你可以這樣使用這個函數:

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));

輸出的功能字段被添加到輸入tuple中。因此這個時候,每個tuple中將會有5個字段"x", "y", "z", "added", 和 "multiplied". "added" 和"multiplied"對應於AddAndMultiply輸出的第一和第二個字段。

另外,我們可以使用聚合器來用輸出字段來替換輸入tuple。如果你有一個stream包含字段"val1"和"val2",你可以這樣做:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

output stream將會隻包含一個叫做“sum”的字段,這個sum字段就是“val2”的累積和。

在group之後的stream上,輸出將會是被group的字段以及聚合器輸出的字段。舉例如下:

stream.groupBy(new Fields("val1"))
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

在這個例子中,輸出將包含字段"val1" 和 "sum".

State

在實時計算領域的一個主要問題就是怎麼樣來管理狀態並能輕鬆應對錯誤和重試。消除錯誤的影響是非常重要的,因為當一個節點死掉,或者一些其他的問題出現時,那行batch需要被重新處理。問題是-你怎樣做狀態更新來保證每一個消息被處理且隻被處理一次?

這是一個很棘手的問題,我們可以用接下來的例子進一步說明。假定你在做一個你的stream的計數聚合,並且你想要存儲運行時的count到一個數據庫中去。如果你隻是存儲這個count到數據庫中,並且想要進行一次更新,我們是沒有辦法知道同樣的狀態是不是以前已經被update過了的。這次更新可能在之前就嚐試過,並且已經成功的更新到了數據庫中,不過在後續的步驟中失敗了。還有可能是在上次更新數據庫的過程中失敗的,這些你都不知道。

Trident通過做下麵兩件事情來解決這個問題:

  1. 每一個batch被賦予一個唯一標識id“transaction id”。如果一個batch被重試,它將會擁有和之前同樣的transaction id
  2. 狀態更新是以batch為單位有序進行的。也就是說,batch 3的狀態更新必須等到batch 2的狀態更新成功之後才可以進行。

有了這2個原則,你就可以達到有且隻有一次更新的目標。你可以將transaction id和count一起以原子的方式存到數據庫中。當更新一個count的時候,需要判斷數據庫中當前batch的transaction id。如果跟要更新的transaction id一樣,就跳過這次更新。如果不同,就更新這個count。

當然,你不需要在topology中手動處理這些邏輯。這些邏輯已經被封裝在Stage的抽象中並自動進行。你的Stage object也不需要自己去實習transaction id的跟蹤操作。如果你想了解更多的關於如何實現一個Stage以及在容錯過程中的一些取舍問題,可以參照這篇文章.

一個Stage可以采用任何策略來存儲狀態。它可以存儲到一個外部的數據庫,也可以在內存中保持狀態並備份到HDFS中。Stage並不需要永久的保持狀態。比如說,你有一個內存版的Stage實現,它保存最近X個小時的數據並丟棄老的數據。可以把 Memcached integration 作為例子來看看State的實現.

Execution of Trident topologies

Trident的topology會被編譯成盡可能高效的Storm topology。隻有在需要對數據進行repartition的時候(如groupby或者shuffle)才會把tuple通過network發送出去,如果你有一個trident如下:

Compiling Trident to Storm 1

它將會被編譯成如下的storm topology:

Compiling Trident to Storm 2

Conclusion

Trident使得實時計算更加優雅。你已經看到了如何使用Trident的API來完成大吞吐量的流式計算,狀態維護,低延時查詢等等功能。Trident讓你在獲取最大性能的同時,以更自然的一種方式進行實時計算。

轉載自:https://blog.csdn.net/derekjiang/article/details/9126185 

最後更新:2017-04-03 05:40:00

  上一篇:go poj 2459 Feed Accounting
  下一篇:go 關於《Swift開發指南》背後的那些事