閱讀146 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Storm可靠性及事務性相關設計: Acker及Trident State

Storm可靠性相關


Storm可靠性的設計與它的Acker有很大關係,先讓我用比較拙劣的語句簡單描述下。

Storm的tuple,被OutputCollector emit的時候——這個稱為archoring(生成新的tuples),需要指定和它相關的之前的tuple,並且要指定executor完之後ack之類的api,這樣就能建立一顆可追蹤的tuple樹。如:

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;
        
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }
上麵這件事一般IBasicBolt可以罩住,更多的方法可以使用IRichBolt。
一個topology裏麵的acker數量是可以設置的,然後tuple比較多的話可以多設置幾個acker,提高效率。每個tuple有一個64位的id,acker利用這個id來追蹤tuple,且會知道這個tuple他的祖宗們,也就是隻要繼續跟蹤新的tuple就可以了,因為祖宗的id會被傳遞下去。
storm用一致性哈希來把spout-tuple-id對應給acker,因為tuple知道自己的祖宗,所以他可以算出通知哪個acker來ack(所有的根tuple是知道的,hash好了之後,以後的子tuple去對應的地方ack)。acker會維護tuple樹上的各個tuple,當他知道這個樹完成處理了,就會通知某個對應的task。
acker task不顯示跟蹤整個tuple樹,不然會占據很多內存,acker使用一個恒定的20 bytes來針對每個spout tuple。一個acker存一個spout-tuple-id的時候,存兩個值:一個是task id,用於關聯task;第二個是 ack val,一個64位的數。ack val是整個樹狀態的一個表示,把所有的tuple id異或起來,當ack val=0,就知道整棵tuple樹成功完成了,否則失敗,然後就可以通知task了。
在以上的可靠性之下,如果:

  1.  task fail了,tuple沒有被ack。超時機製保證這個tuple以後再被重新處理
  2. Acker掛 。這個acker跟蹤的tuple都超時,都會重新處理
  3. Spout掛了。消息源重新發送消息。
所以,storm的可靠性機製是完全分布式的,可伸縮的並且高度容錯的。

以上內容可以具體參考wiki:Guaranteeing-message-processing

Acker更多設計可以參考: twitter-storm-code-analysis-acker-merchanism



Storm事務性相關


State in Trident

Trident在讀寫有狀態的數據源方麵是有著一流的抽象封裝的。狀態即可以保留在topology的內部,比如說內存和HDFS,也可以放到外部存儲當中,比如說Memcached或者Cassandra。這些都是使用同一套Trident API。

Trident以一種容錯的方式來管理狀態以至於當你在更新狀態的時候你不需要去考慮錯誤以及重試的情況。這種保證每個消息被處理有且隻有一次的原理會讓你更放心的使用Trident的topology。

在進行狀態更新時,會有不同的容錯級別。在外麵一起來討論這點之前,讓我們先通過一個例子來說明一下如果想要坐到有且隻有一次處理的必要的技巧。假定你在做一個關於某stream的計數聚合器,你想要把運行中的計數存放到一個數據庫中。如果你在數據庫中存了一個值表示這個計數,每次你處理一個tuple之後,就將數據庫存儲的計數加一。

當錯誤發生,truple會被重播。這就帶來了一個問題:當狀態更新的時候,你完全不知道你是不是在之前已經成功處理過這個tuple。也許你之前從來沒處理過這個tuple,這樣的話你就應該把count加一。另外一種可能就是你之前是成功處理過這個tuple的,但是這個在其他的步驟處理這個tuple的時候失敗了,在這種情況下,我們就不應該將count加一。再或者,你接受到過這個tuple,但是上次處理這個tuple的時候在更新數據庫的時候失敗了,這種情況你就應該去更新數據庫。

如果隻是簡單的存計數到數據庫的話,你是完全不知道這個tuple之前是否已經被處理過了的。所以你需要更多的信息來做正確的決定。Trident提供了下麵的語義來實現有且隻有一次被處理的目標。

  • Tuples 是被分成小的集合被批量處理的 (see the tutorial)
  • 每一批tuples被給定一個唯一ID作為事務ID (txid). 當這一批tuple被重播時, txid不變.
  • 批與批之間的狀態更新時嚴格順序的。比如說第三批tuple的狀態的更新必須要等到第二批tuple的狀態更新成功之後才可以進行.
有了這些定義,你的狀態實現可以檢測到當前這批tuple是否以前處理過,並根據不同的情況進行不同的處理。你需要才去的行動取決於你的輸入spout。有三種不同類型的可以容錯的spout: 非事務的,事務的,以及不透明事務的spout。對應的,也有3種容錯的狀態:非事務的,事務的,以及不透明事務的狀態。讓我們一起來看看每一種spout類型能夠支持什麼樣的容錯類型。


Transactional spouts

記住,Trident是以小批量(batch)的形式在處理tuple,並且每一批都會分配一個唯一的transaction id。 不同的spout會根據他們可以給予不同的批量tuple的guarantee的能力有不同的屬性。一個transactional spout會有如下這些屬性:
  1. 有著同樣txid的batch一定是一樣的。當重播一個txid對應的batch時,一定會重播和之前對應txid的batch中同樣的tuples。
  2. 各個batch之間是沒有交集的。每個tuple隻能屬於一個batch
  3. 每一個tuple都屬於一個batch,無一例外
這是一類非常容易理解的spout, tuple 流被劃分為固定的batch並且永不改變。trident-kafka 有一個transactional spout的實現。

你也許會問:為什麼我們不總是使用transactional spout?這很容易理解。一個原因是並不是所有的地方都需要容錯的。舉例來說,TransactionalTridentKafkaSpout 工作的方式是給定一個txid的batch所包含的一個屬於一個topic的來自於所有Kafka partition的tuple序列。一旦這個batch被發出,在任何時候如果這個batch被重新發出時,它必須包含原來所有的tuple以滿足 transactional spout的語義。現在我們假定一個batch被TransactionalTridentKafkaSpout所發出,這個batch沒有被成功處理,並且同時kafka的一個節點也down掉了。你就無法像之前一樣重播一個完全一樣的batch(因為kakfa的節點down掉,該topic的一部分partition可能會無法使用),整個處理會被中斷。

這也就是"opaque transactional" spouts(不透明事務spout)存在的原因- 他們對於丟失源節點這種情況是容錯的,仍然能夠幫你達到有且隻有一次處理的語義。後麵會對這種spout有所介紹。

(當然,在Kafka開啟replication功能時,transactional spout也是可以做到容錯的)

在外麵來討論"opaque transactional" spout之前,我們先來看看你應該怎樣設計一個State來實現transactional spout的有且隻有一次執行的語義。這個State的類型是"transactional state" 並且它利用了任何一個txid總是對應同樣的tuple序列這個語義。

假如說你有一個用來計算單詞出現次數的topology,你想要將單詞的出現次數以key/value對的形式存儲到數據庫中。key就是單詞,value就是這個這個單詞出現的次數。你已經看到隻是存儲一個數量是不足以知道你是否已經處理過一個batch的。你可以通過將value和txid一起存儲到數據庫中。這樣的話,當更新這個count之前,你可以先去比較數據庫中存儲的txid和現在要存儲的txid。如果一樣,就跳過什麼都不做,因為這個value之前已經被處理過了。如果不一樣,就執行存儲。這個邏輯可以工作的前提就是txid永不改變,並且Trident保證狀態的更新是在batch之間嚴格順序進行的。

考慮下麵這個例子的運行邏輯, 假定你在處理一個txid為3的包含下麵tuple的batch:
["man"]  
["man"]  
["dog"] 
 假定數據庫中當前保存了下麵這樣的key/value 對:
man => [count=3, txid=1]  

dog => [count=4, txid=3]  

apple => [count=10, txid=2]  
單詞“man”對應的txid是1. 因為當前的txid是3,你可以確定你還沒有為這個batch中的tuple更新過這個單詞的數量。所以你可以放心的給count加2並更新txid為3. 與此同時,單詞“dog”的txid和當前的txid是相同的,因此你可以跳過這次更新。此時數據庫中的數據如下:

man => [count=5, txid=3]  

dog => [count=4, txid=3]  

apple => [count=10, txid=2] 
接下來我們一起再來看看 opaque transactional spout已經怎樣去為這種spout設計相應的state。 

Opaque transactional spouts

#xhe_tmpurl正如之前說過的,opaque transactional spout並不能確保一個txid所對應的batch的一致性。一個opaque transactional spout有如下屬性:
  • 每個tuple隻在一個batch中被成功處理。然而,一個tuple在一個batch中被處理失敗後,有可能會在另外的一個batch中被成功處理
OpaqueTridentKafkaSpout 是一個擁有這種屬性的spout,並且它是容錯的,即使Kafak的節點丟失。當OpaqueTridentKafkaSpout 發送一個batch的時候, 它會從上個batch成功結束發送的位置開始發送一個tuple序列。這就確保了永遠沒有任何一個tuple會被跳過或者被放在多個batch中被多次成功處理的情況.
使用opaque transactional spout,再使用和transactional spout相同的處理方式:判斷數據庫中存放的txid和當前txid去做對比已經不好用了。這是因為在state的更新過程之間,batch可能已經變了。
你隻能在數據庫中存儲更多的信息。除了value和txid,你還需要存儲之前的數值在數據庫中。讓我們還是用上麵的例子來說明這個邏輯。假定你當前batch中的對應count是“2”, 並且我們需要進行一次狀態更新。而當前數據庫中存儲的信息如下:
{ 
  value = 4,  

  prevValue = 1,  

  txid = 2  
} 
如果你當前的txid是3, 和數據庫中的txid不同。那麼就將value中的值設置到prevValue中,根據你當前的count增加value的值並更新txid。更新後的數據庫信息如下:
{ 
  value = 6,  

  prevValue = 4,  

  txid = 3  
}  
現在外麵再假定你的當前txid是2,和數據庫中存放的txid相同。這就說明數據庫裏麵value中的值包含了之前一個和當前txid相同的batch的更新。但是上一個batch和當前這個batch可能已經完全不同了,以至於我們需要無視它。在這種情況下,你需要在prevValue的基礎上加上當前count的值並將結果存放到value中去。數據庫中的信息如下所示:
{ 
  value = 3,  

  prevValue = 1,  

  txid = 2  
} 
因為Trident保證了batch之間的強順序性,因此這種方法是有效的。一旦Trident去處理一個新的batch,它就不會重新回到之前的任何一個batch。並且由於opaque transactional spout確保在各個batch之間是沒有共同成員的,每個tuple隻會在一個batch中被成功處理,你可以安全的在之前的值上進心更新。 

Non-transactional spouts

Non-transactional spout(非事務spout)不確保每個batch中的tuple的規則。所以他可能是最多被處理一次的,如果tuple被處理失敗就不重發的話。同時他也可能會是至少處理一次的,如果tuple在不同的batch中被多次成功處理的時候。無論怎樣,這種spout是不可能實現有且隻有一次被成功處理的語義的。

Summary of spout and state types

Opaque transactional state有著最為強大的容錯性。但是這是以存儲更多的信息作為代價的。Transactional states 需要存儲較少的狀態信息,但是僅能和 transactional spouts協同工作. Finally, non-transactional state所需要存儲的信息最少,但是卻不能實現有且隻有一次被成功處理的語義。

State和Spout類型的選擇其實是一種在容錯性和存儲消耗之間的權衡,你的應用的需要會決定那種組合更適合你。

State APIs

你已經看到一些錯綜複雜的方法來實現有且隻有一次被執行的語義。Trident這樣做的好處把所有容錯想過的邏輯都放在了State裏麵。 作為一個用戶,你並不需要自己去處理複雜的txid,存儲多餘的信息到數據庫中,或者是任何其他類似的事情。你隻需要寫如下這樣簡單的code:
TridentTopology topology = new TridentTopology();          

TridentState wordCounts =  

      topology.newStream("spout1", spout)  

        .each(new Fields("sentence"), new Split(), new Fields("word"))  

        .groupBy(new Fields("word"))  

        .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))                  

        .parallelismHint(6);  
所有管理opaque transactional state所需的邏輯都在MemcachedState.opaque方法的調用中被涵蓋了,除此之外,數據庫的更新會自動以batch的形式來進行以避免多次訪問數據庫。
State的基本接口隻包含下麵兩個方法:

public interface State {  

    void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream  

    void commit(Long txid);  

}  
當一個State更新開始時,以及當一個State更新結束時你都會被告知,並且會告訴你該次的txid。Trident並沒有對你的state的工作方式有任何的假定。
假定你自己搭了一套數據庫來存儲用戶位置信息,並且你想要在Trident中去訪問這個數據。你的state的實現應該有用戶信息的set、get方法
public class LocationDB implements State {  

    public void beginCommit(Long txid) {      

    }  

    public void commit(Long txid) {      

    }
      
    public void setLocation(long userId, String location) {  

      // code to access database and set location  

    }  

    public String getLocation(long userId) {  

      // code to get location from database  

    }  
}  
然後你還需要提供給Trident一個StateFactory來在Trident的task中創建你的State對象。LocationDB 的 StateFactory可能會如下所示:
public class LocationDBFactory implements StateFactory {  
   public State makeState(Map conf, int partitionIndex, int numPartitions) {  
      return new LocationDB();  
   }   
}  
Trident提供了一個QueryFunction接口用來實現Trident中在一個source state上查詢的功能。同時還提供了一個StateUpdater來實現Trident中更新source state的功能。比如說,讓我們寫一個查詢地址的操作,這個操作會查詢LocationDB來找到用戶的地址。讓我們以怎樣在topology中實現該功能開始,假定這個topology會接受一個用戶id作為輸入數據流。

TridentTopology topology = new TridentTopology();  

TridentState locations = topology.newStaticState(new LocationDBFactory());  

topology.newStream("myspout", spout)  
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))  
接下來讓我們一起來看看QueryLocation 的實現應該是什麼樣的:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {  

    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {  

        List<String> ret = new ArrayList();  

        for(TridentTuple input: inputs) {  

            ret.add(state.getLocation(input.getLong(0)));  

        }  
        return ret;  
    }  

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {  
        collector.emit(new Values(location));  
    }      
}  
QueryFunction的執行分為兩部分。首先Trident收集了一個batch的read操作並把他們統一交給batchRetrieve。在這個例子中,batchRetrieve會接受到多個用戶id。batchRetrieve應該返還一個和輸入tuple數量相同的result序列。result序列中的第一個元素對應著第一個輸入tuple的結果,result序列中的第二個元素對應著第二個輸入tuple的結果,以此類推。
你可以看到,這段代碼並沒有想Trident那樣很好的利用batch的優勢,而是為每個輸入tuple去查詢了一次LocationDB。所以一種更好的操作LocationDB方式應該是這樣的:
public class LocationDB implements State {  

    public void beginCommit(Long txid) {     
 
    }  

    public void commit(Long txid) {      

    }  

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {  

      // set locations in bulk  

    }  
      
    public List<String> bulkGetLocations(List<Long> userIds) {  

      // get locations in bulk  

    }  
}  
接下來,你可以這樣改寫上麵的QueryLocation:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> {  

    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {  

        List<Long> userIds = new ArrayList<Long>();  

        for(TridentTuple input: inputs) {  
            userIds.add(input.getLong(0));  
        }  

        return state.bulkGetLocations(userIds);  
    }  

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {  
        collector.emit(new Values(location));  
    }  
} 
通過有效減少訪問數據庫的次數,這段代碼比上一個實現會高效的多。如何你要更新State,你需要使用StateUpdater接口。下麵是一個StateUpdater的例子用來將新的地址信息更新到LocationDB當中。

public class LocationUpdater extends BaseStateUpdater<LocationDB> {  

    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {  

        List<Long> ids = new ArrayList<Long>();  

        List<String> locations = new ArrayList<String>();  

        for(TridentTuple t: tuples) {  

            ids.add(t.getLong(0));  

            locations.add(t.getString(1));  
        }  
        state.setLocationsBulk(ids, locations);  
    }  
}  
下麵列出了你應該如何在Trident topology中使用上麵聲明的LocationUpdater:
TridentTopology topology = new TridentTopology();  

TridentState locations =   
    topology.newStream("locations", locationsSpout)  
        .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())  
partitionPersist 操作會更新一個State。其內部是將 State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操作。
在這段代碼中,隻是簡單的從輸入的tuple中提取處userid和對應的location,並一起更新到State中。
partitionPersist 會返回一個TridentState對象來表示被這個Trident topoloy更新過的location db。 然後你就可以使用這個state在topology的任何地方進行查詢操作了。
同時,你也可以看到我們傳了一個TridentCollector給StateUpdaters。 emit到這個collector的tuple就會去往一個新的stream。在這個例子中,我們並沒有去往一個新的stream的需要,但是如果你在做一些事情,比如說更新數據庫中的某個count,你可以emit更新的count到這個新的stream。然後你可以通過調用TridentState#newValuesStream方法來訪問這個新的stream來進行其他的處理。

persistentAggregate

Trident有另外一種更新State的方法叫做persistentAggregate。 你在之前的word count例子中應該已經見過了,如下:
TridentTopology topology = new TridentTopology();          

TridentState wordCounts =  

      topology.newStream("spout1", spout)  

        .each(new Fields("sentence"), new Split(), new Fields("word"))  

        .groupBy(new Fields("word"))  

        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))  

persistentAggregate是在partitionPersist之上的另外一層抽象。它知道怎麼去使用一個Trident 聚合器來更新State。在這個例子當中,因為這是一個group好的stream,Trident會期待你提供的state是實現了MapState接口的。用來進行group的字段會以key的形式存在於State當中,聚合後的結果會以value的形式存儲在State當中。MapState接口看上去如下所示:
public interface MapState<T> extends State {  

    List<T> multiGet(List<List<Object>> keys);  

    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);  

    void multiPut(List<List<Object>> keys, List<T> vals);  

}  

當你在一個未經過group的stream上麵進行聚合的話,Trident會期待你的state實現Snapshottable接口:
public interface Snapshottable<T> extends State {  

    T get();  

    T update(ValueUpdater updater);  

    void set(T o);  

} 

MemoryMapState 和 MemcachedState 都實現了上麵的2個接口。

Implementing Map States

在Trident中實現MapState是非常簡單的,它幾乎幫你做了所有的事情。OpaqueMap, TransactionalMap, 和 NonTransactionalMap 類實現了所有相關的邏輯,包括容錯的邏輯。你隻需要將一個IBackingMap 的實現提供給這些類就可以了。IBackingMap接口看上去如下所示:
public interface IBackingMap<T> {  

    List<T> multiGet(List<List<Object>> keys);   

    void multiPut(List<List<Object>> keys, List<T> vals);   

}  

OpaqueMap's會用OpaqueValue的value來調用multiPut方法,TransactionalMap's會提供TransactionalValue中的value,而NonTransactionalMaps隻是簡單的把從Topology獲取的object傳遞給multiPut。
Trident還提供了一種CachedMap類來進行自動的LRU cache。
另外,Trident 提供了 SnapshottableMap 類將一個MapState 轉換成一個 Snapshottable 對象。
大家可以看看 MemcachedState的實現,從而學習一下怎樣將這些工具組合在一起形成一個高性能的MapState實現。MemcachedState是允許大家選擇使用opaque transactional, transactional, 還是 non-transactional 語義的。 


以上內容可以具體參考wiki: Trident-state


(全文完)

最後更新:2017-04-03 12:54:03

  上一篇:go 查看linux版本命令匯總
  下一篇:go C++編程規範之39:考慮將虛擬函數生命為非公用的,將公用函數聲明為非虛擬的