Storm可靠性及事務性相關設計: Acker及Trident State
Storm可靠性相關
Storm可靠性的設計與它的Acker有很大關係,先讓我用比較拙劣的語句簡單描述下。
Storm的tuple,被OutputCollector emit的時候——這個稱為archoring(生成新的tuples),需要指定和它相關的之前的tuple,並且要指定executor完之後ack之類的api,這樣就能建立一顆可追蹤的tuple樹。如:
public class SplitSentence extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { _collector.emit(tuple, new Values(word)); } _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }上麵這件事一般IBasicBolt可以罩住,更多的方法可以使用IRichBolt。
一個topology裏麵的acker數量是可以設置的,然後tuple比較多的話可以多設置幾個acker,提高效率。每個tuple有一個64位的id,acker利用這個id來追蹤tuple,且會知道這個tuple他的祖宗們,也就是隻要繼續跟蹤新的tuple就可以了,因為祖宗的id會被傳遞下去。
storm用一致性哈希來把spout-tuple-id對應給acker,因為tuple知道自己的祖宗,所以他可以算出通知哪個acker來ack(所有的根tuple是知道的,hash好了之後,以後的子tuple去對應的地方ack)。acker會維護tuple樹上的各個tuple,當他知道這個樹完成處理了,就會通知某個對應的task。
acker task不顯示跟蹤整個tuple樹,不然會占據很多內存,acker使用一個恒定的20 bytes來針對每個spout tuple。一個acker存一個spout-tuple-id的時候,存兩個值:一個是task id,用於關聯task;第二個是 ack val,一個64位的數。ack val是整個樹狀態的一個表示,把所有的tuple id異或起來,當ack val=0,就知道整棵tuple樹成功完成了,否則失敗,然後就可以通知task了。
在以上的可靠性之下,如果:
- task fail了,tuple沒有被ack。超時機製保證這個tuple以後再被重新處理
- Acker掛 。這個acker跟蹤的tuple都超時,都會重新處理
- Spout掛了。消息源重新發送消息。
以上內容可以具體參考wiki:Guaranteeing-message-processing
Acker更多設計可以參考: twitter-storm-code-analysis-acker-merchanism
Storm事務性相關
State in Trident
Trident在讀寫有狀態的數據源方麵是有著一流的抽象封裝的。狀態即可以保留在topology的內部,比如說內存和HDFS,也可以放到外部存儲當中,比如說Memcached或者Cassandra。這些都是使用同一套Trident API。
Trident以一種容錯的方式來管理狀態以至於當你在更新狀態的時候你不需要去考慮錯誤以及重試的情況。這種保證每個消息被處理有且隻有一次的原理會讓你更放心的使用Trident的topology。
在進行狀態更新時,會有不同的容錯級別。在外麵一起來討論這點之前,讓我們先通過一個例子來說明一下如果想要坐到有且隻有一次處理的必要的技巧。假定你在做一個關於某stream的計數聚合器,你想要把運行中的計數存放到一個數據庫中。如果你在數據庫中存了一個值表示這個計數,每次你處理一個tuple之後,就將數據庫存儲的計數加一。
當錯誤發生,truple會被重播。這就帶來了一個問題:當狀態更新的時候,你完全不知道你是不是在之前已經成功處理過這個tuple。也許你之前從來沒處理過這個tuple,這樣的話你就應該把count加一。另外一種可能就是你之前是成功處理過這個tuple的,但是這個在其他的步驟處理這個tuple的時候失敗了,在這種情況下,我們就不應該將count加一。再或者,你接受到過這個tuple,但是上次處理這個tuple的時候在更新數據庫的時候失敗了,這種情況你就應該去更新數據庫。
如果隻是簡單的存計數到數據庫的話,你是完全不知道這個tuple之前是否已經被處理過了的。所以你需要更多的信息來做正確的決定。Trident提供了下麵的語義來實現有且隻有一次被處理的目標。
- Tuples 是被分成小的集合被批量處理的 (see the tutorial)
- 每一批tuples被給定一個唯一ID作為事務ID (txid). 當這一批tuple被重播時, txid不變.
- 批與批之間的狀態更新時嚴格順序的。比如說第三批tuple的狀態的更新必須要等到第二批tuple的狀態更新成功之後才可以進行.
Transactional spouts
- 有著同樣txid的batch一定是一樣的。當重播一個txid對應的batch時,一定會重播和之前對應txid的batch中同樣的tuples。
- 各個batch之間是沒有交集的。每個tuple隻能屬於一個batch
- 每一個tuple都屬於一個batch,無一例外
你也許會問:為什麼我們不總是使用transactional spout?這很容易理解。一個原因是並不是所有的地方都需要容錯的。舉例來說,TransactionalTridentKafkaSpout 工作的方式是給定一個txid的batch所包含的一個屬於一個topic的來自於所有Kafka partition的tuple序列。一旦這個batch被發出,在任何時候如果這個batch被重新發出時,它必須包含原來所有的tuple以滿足 transactional spout的語義。現在我們假定一個batch被TransactionalTridentKafkaSpout所發出,這個batch沒有被成功處理,並且同時kafka的一個節點也down掉了。你就無法像之前一樣重播一個完全一樣的batch(因為kakfa的節點down掉,該topic的一部分partition可能會無法使用),整個處理會被中斷。
這也就是"opaque transactional" spouts(不透明事務spout)存在的原因- 他們對於丟失源節點這種情況是容錯的,仍然能夠幫你達到有且隻有一次處理的語義。後麵會對這種spout有所介紹。
(當然,在Kafka開啟replication功能時,transactional spout也是可以做到容錯的)
在外麵來討論"opaque transactional" spout之前,我們先來看看你應該怎樣設計一個State來實現transactional spout的有且隻有一次執行的語義。這個State的類型是"transactional state" 並且它利用了任何一個txid總是對應同樣的tuple序列這個語義。
假如說你有一個用來計算單詞出現次數的topology,你想要將單詞的出現次數以key/value對的形式存儲到數據庫中。key就是單詞,value就是這個這個單詞出現的次數。你已經看到隻是存儲一個數量是不足以知道你是否已經處理過一個batch的。你可以通過將value和txid一起存儲到數據庫中。這樣的話,當更新這個count之前,你可以先去比較數據庫中存儲的txid和現在要存儲的txid。如果一樣,就跳過什麼都不做,因為這個value之前已經被處理過了。如果不一樣,就執行存儲。這個邏輯可以工作的前提就是txid永不改變,並且Trident保證狀態的更新是在batch之間嚴格順序進行的。
考慮下麵這個例子的運行邏輯, 假定你在處理一個txid為3的包含下麵tuple的batch:
["man"] ["man"] ["dog"]
man => [count=3, txid=1] dog => [count=4, txid=3] apple => [count=10, txid=2]單詞“man”對應的txid是1. 因為當前的txid是3,你可以確定你還沒有為這個batch中的tuple更新過這個單詞的數量。所以你可以放心的給count加2並更新txid為3. 與此同時,單詞“dog”的txid和當前的txid是相同的,因此你可以跳過這次更新。此時數據庫中的數據如下:
man => [count=5, txid=3] dog => [count=4, txid=3] apple => [count=10, txid=2]接下來我們一起再來看看 opaque transactional spout已經怎樣去為這種spout設計相應的state。
Opaque transactional spouts
- 每個tuple隻在一個batch中被成功處理。然而,一個tuple在一個batch中被處理失敗後,有可能會在另外的一個batch中被成功處理
{ value = 4, prevValue = 1, txid = 2 }
{ value = 6, prevValue = 4, txid = 3 }
{ value = 3, prevValue = 1, txid = 2 }
Non-transactional spouts
Non-transactional spout(非事務spout)不確保每個batch中的tuple的規則。所以他可能是最多被處理一次的,如果tuple被處理失敗就不重發的話。同時他也可能會是至少處理一次的,如果tuple在不同的batch中被多次成功處理的時候。無論怎樣,這種spout是不可能實現有且隻有一次被成功處理的語義的。Summary of spout and state types
State和Spout類型的選擇其實是一種在容錯性和存儲消耗之間的權衡,你的應用的需要會決定那種組合更適合你。
State APIs
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6);所有管理opaque transactional state所需的邏輯都在MemcachedState.opaque方法的調用中被涵蓋了,除此之外,數據庫的更新會自動以batch的形式來進行以避免多次訪問數據庫。
public interface State { void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream void commit(Long txid); }當一個State更新開始時,以及當一個State更新結束時你都會被告知,並且會告訴你該次的txid。Trident並沒有對你的state的工作方式有任何的假定。
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database } }然後你還需要提供給Trident一個StateFactory來在Trident的task中創建你的State對象。LocationDB 的 StateFactory可能會如下所示:
public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } }Trident提供了一個QueryFunction接口用來實現Trident中在一個source state上查詢的功能。同時還提供了一個StateUpdater來實現Trident中更新source state的功能。比如說,讓我們寫一個查詢地址的操作,這個操作會查詢LocationDB來找到用戶的地址。讓我們以怎樣在topology中實現該功能開始,假定這個topology會接受一個用戶id作為輸入數據流。
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStaticState(new LocationDBFactory()); topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))接下來讓我們一起來看看QueryLocation 的實現應該是什麼樣的:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<String> ret = new ArrayList(); for(TridentTuple input: inputs) { ret.add(state.getLocation(input.getLong(0))); } return ret; } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } }QueryFunction的執行分為兩部分。首先Trident收集了一個batch的read操作並把他們統一交給batchRetrieve。在這個例子中,batchRetrieve會接受到多個用戶id。batchRetrieve應該返還一個和輸入tuple數量相同的result序列。result序列中的第一個元素對應著第一個輸入tuple的結果,result序列中的第二個元素對應著第二個輸入tuple的結果,以此類推。
你可以看到,這段代碼並沒有想Trident那樣很好的利用batch的優勢,而是為每個輸入tuple去查詢了一次LocationDB。所以一種更好的操作LocationDB方式應該是這樣的:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocationsBulk(List<Long> userIds, List<String> locations) { // set locations in bulk } public List<String> bulkGetLocations(List<Long> userIds) { // get locations in bulk } }
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<Long> userIds = new ArrayList<Long>(); for(TridentTuple input: inputs) { userIds.add(input.getLong(0)); } return state.bulkGetLocations(userIds); } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } }通過有效減少訪問數據庫的次數,這段代碼比上一個實現會高效的多。如何你要更新State,你需要使用StateUpdater接口。下麵是一個StateUpdater的例子用來將新的地址信息更新到LocationDB當中。
public class LocationUpdater extends BaseStateUpdater<LocationDB> { public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) { List<Long> ids = new ArrayList<Long>(); List<String> locations = new ArrayList<String>(); for(TridentTuple t: tuples) { ids.add(t.getLong(0)); locations.add(t.getString(1)); } state.setLocationsBulk(ids, locations); } }下麵列出了你應該如何在Trident topology中使用上麵聲明的LocationUpdater:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStream("locations", locationsSpout) .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())partitionPersist 操作會更新一個State。其內部是將 State和一批更新的tuple交給StateUpdater,由StateUpdater完成相應的更新操作。
persistentAggregate
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals); }
當你在一個未經過group的stream上麵進行聚合的話,Trident會期待你的state實現Snapshottable接口:
public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o); }
MemoryMapState 和 MemcachedState 都實現了上麵的2個接口。
Implementing Map States
public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }
OpaqueMap's會用OpaqueValue的value來調用multiPut方法,TransactionalMap's會提供TransactionalValue中的value,而NonTransactionalMaps隻是簡單的把從Topology獲取的object傳遞給multiPut。
Trident還提供了一種CachedMap類來進行自動的LRU cache。
最後更新:2017-04-03 12:54:03