Apache Storm 官方文檔 —— Trident State
Trident 中含有對狀態化(stateful)的數據源進行讀取和寫入操作的一級抽象封裝工具。這個所謂的狀態(state)既可以保存在拓撲內部(保存在內存中並通過 HDFS 來實現備份),也可以存入像 Memcached 或者 Cassandra 這樣的外部數據庫中。而對於 Trident API 而言,這兩種機製並沒有任何區別。
Trident 使用一種容錯性的方式實現對 state 的管理,這樣,即使在發生操作失敗或者重試的情況下狀態的更新操作仍然是冪等的。基於這個機製,每條消息都可以看作被恰好處理了一次,然後你就可以很容易地推斷出 Trident 拓撲的狀態。
State 的更新過程支持多級容錯性保證機製。在討論這一點之前,我們先來看一個例子,這個例子展示了如何實現恰好一次的語義的技術。假如你正在對數據流進行一個計數聚合操作,並打算將計數結果存入數據庫中。在這個例子裏,你存入數據庫的就是一個對應計數結果的值,每次處理新 tuple 的時候就會增加這個值。
考慮到可能存在的處理失敗情況,tuple 有可能需要重新處理。這樣就給 state 的更新操作帶來了一個問題(或者其他的副作用)—— 你無法知道當前的這個 tuple 的更新操作是否已經處理過了。也許你之前沒有處理過這個 tuple,那麼你現在就需要增加計數結果;也許你之前已經處理過 tuple 了並且成功地增加了計數結果,但是在後續操作過程中 tuple 的處理失敗了,並由此引發了 tuple 的重新處理操作,這時你就不能再增加計數結果了;還有可能你之前在使用這個 tuple 更新數據庫的時候出錯了,也就是說計數值的更新操作並未成功,此時在 tuple 的重新處理過程中你仍然需要更新數據庫。
所以說,如果隻是向數據庫中簡單地存入計數值,你確實無法知道 tuple 是否已經被處理過。因此,你需要一些更多的信息來做決定。Trident 提供了一種支持恰好一次處理的語義,如下所述:
- 通過小數據塊(batch)的方式來處理 tuple(可以參考Trident 教程一文)
- 為每個 batch 提供一個唯一的 id,這個 id 稱為 “事務 id”(transaction id,txid)。如果需要對 batch 重新處理,這個 batch 上仍然會賦上相同的 txid。
- State 的更新操作是按照 batch 的順序進行的。也就是說,在 batch 2 完成處理之前,batch 3 的狀態更新操作不會進行。
基於這幾個基本性質,你的 State 的實現就可以檢測到 tuple 的 batch 是否已經被處理過,並根據檢測結果選擇合適的 state 更新操作。你具體采用的操作取決於你的輸入 spout 提供的語義,這個語義對每個 batch 都是有效的。有三類支持容錯性的 spout:“非事務型”(non-transactional)、“事務型”(transactional)以及“模煳事務型”(opaque transactional)。接下來我們來分析下每種 spout 類型的容錯性語義。
事務型 spout(Transactional spouts)
記住一點,Trident 是通過小數據塊(batch)的方式來處理 tuple 的,而且每個 batch 都會有一個唯一的 txid。spout 的特性是由他們所提供的容錯性保證機製決定的,而且這種機製也會對每個 batch 發生作用。事務型 spout 包含以下特性:
- 每個 batch 的 txid 永遠不會改變。對於某個特定的 txid,batch 在執行重新處理操作時所處理的 tuple 集和它的第一次處理操作完全相同。
- 不同 batch 中的 tuple 不會出現重複的情況(某個 tuple 隻會出現在一個 batch 中,而不會同時出現在多個 batch 中)。
- 每個 tuple 都會放入一個 batch 中(處理操作不會遺漏任何的 tuple)。
這是一種很容易理解的 spout,其中的數據流會被分解到固定的 batches 中。Storm-contrib 項目中提供了一種基於 Kafka 的事務型 spout 實現。
看到這裏,你可能會有這樣的疑問:為什麼不在拓撲中完全使用事務型 spout 呢?這個原因很好理解。一方麵,有些時候事務型 spout 並不能提供足夠可靠的容錯性保障,所以不需要使用事務型 spout。比如,TransactionalTridentKafkaSpout
的工作方式就是使得帶有某個 txid 的 batch 中包含有來自一個 Kafka topic 的所有 partition 的 tuple。一旦一個 batch 被發送出去,在將來無論重新發送這個 batch 多少次,batch 中都會包含有完全相同的 tuple 集,這是由事務型 spout 的語義決定的。現在假設 TransactionalTridentKafkaSpout
發送出的某個 batch 處理失敗了,而與此同時,Kafka 的某個節點因為故障下線了。這時你就無法重新處理之前的 batch 了(因為 Kafka 的節點故障,Kafka topic 必然有一部分 partition 無法獲取到),這個處理過程也會因此終止。
這就是要有“模煳事務型” spout 的原因了 —— 模煳事務型 spout 支持在數據源節點丟失的情況下仍然可以實現恰好一次的處理語義。我們會在下一節討論這類 spout。
順便提一點,如果 Kafka 支持數據複製,那麼就可以放心地使用事務型 spout 提供的容錯性機製了,因為這種情況下某個節點的故障不會導致數據丟失,不過 Kafka 暫時還不支持該特性。(本文的寫作時間應該較早,Kakfa 早就已經可以支持複製的機製了 —— 譯者注)。
在討論“模煳事務型” spout 之前,讓我們先來看看如何為事務型 spout 設計一種支持恰好一次語義的 State。這個 State 就稱為 “事務型 state”,它支持對於特定的 txid 永遠隻與同一組 tuple 相關聯的特性。
假如你的拓撲需要計算單詞數,而且你準備將計數結果存入一個 K-V 型數據庫中。這裏的 key 就是單詞,value 對應於單詞數。從上麵的討論中你應該已經明白了僅僅存儲計數結果是無法確定某個 batch 中的tuple 是否已經被處理過的。所以,現在你應該將 txid 作為一種原子化的值與計數值一起存入數據庫。隨後,在更新計數值的時候,你就可以將數據庫中的 txid 與當前處理的 batch 的 txid 進行比對。如果兩者相同,你就可以跳過更新操作 —— 由於 Trident 的強有序性處理機製,可以確定數據庫中的值是對應於當前的 batch 的。如果兩者不同,你就可以放心地增加計數值。由於一個 batch 的 txid 永遠不會改變,而且 Trident 能夠保證 state 的更新操作完全是按照 batch 的順序進行的,所以,這樣的處理邏輯是完全可行的。
下麵來看一個例子。假如你正在處理 txid 3,其中包含有以下幾個 tuple:
["man"]
["man"]
["dog"]
假如數據庫中有以下幾個 key-value 對:
man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
其中與 “man” 相關聯的 txid 為 1。由於當前處理的 txid 為 3,你就可以確定當前處理的 batch 與數據庫中存儲的值無關,這樣你就可以放心地將 “man” 的計數值加上 2 並更新 txid 為 3。另一方麵,由於 “dog” 的 txid 與當前的 txid 相同,所以,“dog” 的計數是之前已經處理過的,現在不能再對數據庫中的計數值進行更新操作。這樣,在結束 txid3 的更新操作之後,數據庫中的結果就會變成這樣:
man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]
現在我們再來討論一下“模煳事務型” spout。
模煳事務型 spout(Opaque transactional spouts)
前麵已經提到過,模煳事務型 spout 不能保證一個 txid 對應的 batch 中包含的 tuple 完全一致。模煳事務型 spout 有以下的特性:
- 每個 tuple 都會通過某個 batch 處理完成。不過,在 tuple 處理失敗的時候,tuple 有可能繼續在另一個 batch 中完成處理,而不一定是在原先的 batch 中完成處理。
OpaqueTridentKafkaSpout 就具有這樣的特性,同時它對 Kafka 節點的丟失問題具有很好的容錯性。OpaqueTridentKafkaSpout
在發送一個 batch 的時候總會總上一個 batch 結束的地方開始發送新 tuple。這一點可以保證 tuple 不會被遺漏,而且也不會被多個 batch 處理。
不過,模煳事務型 spout 的缺點就在於不能通過 txid 來識別數據庫中的 state 是否是已經處理過的。這是因為在 state 的更新的過程中,batch 有可能會發生變化。
在這種情況下,你應該在數據庫中存儲更多的 state 信息。除了一個結果值和 txid 之外,你還應該存入前一個結果值。我們再以上麵的計數值的例子來分析以下這個問題。假如你的 batch 的部分計數值是 “2”,現在你需要應用一個更新操作。假定現在數據庫中的值是這樣的:
{ value = 4,
prevValue = 1,
txid = 2
}
- 情形1:假如當前處理的 txid 為 3,這與數據庫中的 txid 不同。這時可以將 “prevValue” 的值設為 “value” 的值,再為 “value” 的值加上部分計數的結果並更新 txid。執行完這一係列操作之後的數據庫中的值就會變成這樣:
{ value = 6,
prevValue = 4,
txid = 3
}
- 情形2:如果當前處理的 txid 為 2,也就是和數據庫中存儲的 txid 一致,這種情況下的處理邏輯與上麵的 txid 不一致的情況又有所不同。因為此時你會知道數據庫中的更新操作是由上一個擁有相同 txid 的batch 做出的。不過那個 batch 有可能與當前的 batch 並不相同,所以你需要忽略它的操作。這個時候,你應該將 “prevValue” 加上 batch 中的部分計數值來計算新的 “value”。在這個操作之後數據庫中的值就會變成這樣:
{ value = 3,
prevValue = 1,
txid = 2
}
這種方法之所以可行是因為 Trident 具有強順序性處理的特性。一旦 Trident 開始處理一個新的 batch 的狀態更新操作,它永遠不會回到過去的 batch 的處理上。同時,由於模煳事務型 spout 會保證 batch 之間不會存在重複 —— 每個 tuple 隻會被某一個 batch 完成處理 —— 所以你可以放心地使用 prevValue 來更新 value。
非事務型 spout(Non-transactional spouts)
非事務型 spout 不能為 batch 提供任何的安全性保證。非事務型 spout 有可能提供一種“至多一次”的處理模型,在這種情況下 batch 處理失敗後 tuple 並不會重新處理;也有可能提供一種“至少一次”的處理模型,在這種情況下可能會有多個 batch 分別處理某個 tuple。總之,此類 spout 不能提供“恰好一次”的語義。
不同類型的 Spout 與 State 的總結
下圖顯示了不同的 spout/state 的組合是否支持恰好一次的消息處理語義:
模煳事務型 state 具有最好的容錯性特征,不過這是以在數據庫中存儲更多的內容為代價的(一個 txid 和兩個 value)。事務型 state 要求的存儲空間相對較小,但是它的缺點是隻對事務型 spout 有效。相對的,非事務型要求的存儲空間最少,但是它也不能提供任何的恰好一次的消息執行語義。
你選擇 state 與 spout 的時候必須在容錯性與存儲空間占用之間權衡。可以根據你的應用的需求來確定哪種組合最適合你。
State API
從上文的描述中你已經了解到了恰好一次的消息執行語義的原理是多麼的複雜。不過作為用戶你並不需要處理這些複雜的 txid 比對、多值存儲等操作,Trident 已經在 State 中封裝了所有的容錯性處理邏輯,你隻需要像下麵這樣寫代碼即可:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count")) .parallelismHint(6);
所有處理模煳事務型 state 的邏輯已經封裝在 MemcachedState.opaque
的調用中了。另外,狀態更新都會自動調整為批處理操作,這樣可以減小與數據庫的反複交互的資源損耗。
基本的 State
接口隻有兩個方法:
public interface State { void beginCommit(Long txid); // 對於類似於在 DRPC 流上進行 partitionPersist 的操作,此方法可以為空 void commit(Long txid); }
前麵已經說過,state 更新操作的開始時和結束時都會獲取一個 txid。對於你的 state 怎麼工作,你在其中使用什麼樣的方法執行更新操作,或者使用什麼樣的方法從 state 中讀取數據,Trident 並不關心。
假如你有一個包含有用戶的地址信息的定製數據庫,你需要使用 Trident 與該數據庫交互。你的 State 的實現就會包含有用於獲取與設置用戶信息的方法,比如下麵這樣:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocation(long userId, String location) { // code to access database and set location } public String getLocation(long userId) { // code to get location from database } }
接著你就可以為 Trident 提供一個 StateFactory 來創建 Trident 任務內部的 State 對象的實例。對應於你的數據庫(LocationDB)的 StateFactory 大概是這樣的:
public class LocationDBFactory implements StateFactory { public State makeState(Map conf, int partitionIndex, int numPartitions) { return new LocationDB(); } }
Trident 提供了一個用於查詢 state 數據源的 QueryFunction
接口,以及一個用於更新 state 數據源的 StateUpdater
接口。例如,我們可以寫一個查詢 LocationDB 中的用戶地址信息的 “QueryLocation”。讓我們從你在拓撲中使用這個操作的方式開始。假如在拓撲中需要讀取輸入流中的 userid 信息:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStaticState(new LocationDBFactory()); topology.newStream("myspout", spout) .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
這裏的 QueryLocation
的實現可能是這樣的:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<String> ret = new ArrayList(); for(TridentTuple input: inputs) { ret.add(state.getLocation(input.getLong(0))); } return ret; } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } }
QueryFunction
的執行包含兩個步驟。首先,Trident 會將讀取的一些數據中匯總為一個 batch 傳入 batchRetrieve 方法中。在這個例子中,batchRetrieve 方法會收到一些用戶 id。然後 batchRetrieve 會返回一個與輸入 tuple 列表大小相同的隊列。結果隊列的第一個元素與第一個輸入 tuple 對應,第二個元素與第二個輸入 tuple 相對應,以此類推。
你會發現這段代碼並沒有發揮出 Trident 批處理的優勢,因為這段代碼僅僅一次查詢一下 LocationDB。所以,實現 LocationDB 的更好的方式應該是這樣的:
public class LocationDB implements State { public void beginCommit(Long txid) { } public void commit(Long txid) { } public void setLocationsBulk(List<Long> userIds, List<String> locations) { // set locations in bulk } public List<String> bulkGetLocations(List<Long> userIds) { // get locations in bulk } }
然後,你可以這樣實現 QueryLocation
方法:
public class QueryLocation extends BaseQueryFunction<LocationDB, String> { public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) { List<Long> userIds = new ArrayList<Long>(); for(TridentTuple input: inputs) { userIds.add(input.getLong(0)); } return state.bulkGetLocations(userIds); } public void execute(TridentTuple tuple, String location, TridentCollector collector) { collector.emit(new Values(location)); } }
這段代碼大幅減少了域數據庫的IO,具有更高的執行效率。
你需要使用 StateUpdater
接口來更新 state。下麵是一個更新 LocationDB 的地址信息的 StateUpdater 實現:
public class LocationUpdater extends BaseStateUpdater<LocationDB> { public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) { List<Long> ids = new ArrayList<Long>(); List<String> locations = new ArrayList<String>(); for(TridentTuple t: tuples) { ids.add(t.getLong(0)); locations.add(t.getString(1)); } state.setLocationsBulk(ids, locations); } }
然後你就可以在 Trident 拓撲中這樣使用這個操作:
TridentTopology topology = new TridentTopology(); TridentState locations = topology.newStream("locations", locationsSpout) .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
partitionPersist
操作會更新 state 數據源。StateUpdater
接收 State 和一批 tuple 作為輸入,然後更新這個 State。上麵的代碼僅僅從輸入 tuple 中抓取 userid 和 location 信息,然後對 State 執行一個批處理更新操作。
在 Trident 拓撲更新 LocationDB 之後,partitionPersist
會返回一個表示更新後狀態的 TridentState
對象。隨後你就可以在拓撲的其他地方使用 stateQuery
方法對這個 state 執行查詢操作。
你也許注意到了 StateUpdater 中有一個 TridentCollector 參數。發送到這個 collector 的 tuple 會進入一個“新的數值流”中。在這個例子裏向這個新的流發送 tuple 並沒有意義,不過如果你需要處理類似於更新數據庫中的計數值這樣的操作,你可以考慮將更新後的技術結果發送到這個流中。可以通過 TridentState.newValuesStream
方法來獲取新的流的數據。
persistentAggregate
Trident 使用一個稱為 persistentAggregate
的方法來更新 State。你已經在前麵的數據流單詞統計的例子裏見過了這個方法,這裏再寫一遍:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
partitionPersist 是一個接收 Trident 聚合器作為參數並對 state 數據源進行更新的方法,persistentAggregate 就是構建於 partitionPersist 上層的一個編程抽象。在這個例子裏,由於是一個分組數據流(grouped stream),Trident 需要你提供一個實現 MapState
接口的 state。被分組的域就是 state 中的 key,而聚合的結果就是 state 中的 value。MapState
接口是這樣的:
public interface MapState<T> extends State { List<T> multiGet(List<List<Object>> keys); List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters); void multiPut(List<List<Object>> keys, List<T> vals); }
而當你在非分組數據流上執行聚合操作時(全局聚合操作),Trident 需要你提供一個實現了 Snapshottable
接口的對象:
public interface Snapshottable<T> extends State { T get(); T update(ValueUpdater updater); void set(T o); }
MemoryMapState 與 MemcachedState 都實現了上麵兩個接口。
實現 Map State 接口
實現 MapState
接口非常簡單,Trident 幾乎已經為你做好了所有的準備工作。OpaqueMap
、TransactionalMap
、與NonTransactionalMap
類都分別實現了各自的容錯性語義。你隻需要為這些類提供一個用於對不同的 key/value 進行 multiGets 與 multiPuts 處理的 IBackingMap 實現類。IBackingMap
接口是這樣的:
public interface IBackingMap<T> { List<T> multiGet(List<List<Object>> keys); void multiPut(List<List<Object>> keys, List<T> vals); }
OpaqueMap 會使用 OpaqueValue 作為 vals 參數來調用 multiPut 方法,TransactionalMap 會使用 TransactionalValue 作為參數,而 NonTransactionalMap 則直接將拓撲中的對象傳入。
Trident 也提供了一個 CachedMap 用於實現 K-V map 的自動 LRU 緩存功能。
最後,Trident 還提供了一個 SnapshottableMap 類,該類通過將全局聚合結果存入一個固定的 key 中的方法將 MapState 對象轉化為一個 Snapshottable 對象。
可以參考 MemcachedState 的實現來了解如何將這些工具結合到一起來提供一個高性能的 MapState。MemcachedState
支持選擇模煳事務型、事務型或者非事務型語義。
最後更新:2017-05-22 14:33:09