328
技術社區[雲棲]
Flink原理與實現:詳解Flink中的狀態管理
Flink原理與實現係列文章 :
Flink 原理與實現:架構和拓撲概覽
Flink 原理與實現:如何生成 StreamGraph
Flink 原理與實現:如何生成 JobGraph
Flink原理與實現:如何生成ExecutionGraph及物理執行圖
Flink原理與實現:Operator Chain原理
上麵Flink原理與實現的文章中,有引用word count的例子,但是都沒有包含狀態管理。也就是說,如果一個task在處理過程中掛掉了,那麼它在內存中的狀態都會丟失,所有的數據都需要重新計算。從容錯和消息處理的語義上(at least once, exactly once),Flink引入了state和checkpoint。
首先區分一下兩個概念,state一般指一個具體的task/operator的狀態。而checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。
Flink通過定期地做checkpoint來實現容錯和恢複。
State
Keyed State和Operator State
Flink中包含兩種基礎的狀態:Keyed State和Operator State。
Keyed State
顧名思義,就是基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。
Operator State
與Keyed State不同,Operator State跟一個特定operator的一個並發實例綁定,整個operator隻對應一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應多個keyed state。
舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。
原始狀態和Flink托管狀態 (Raw and Managed State)
Keyed State和Operator State,可以以兩種形式存在:原始狀態和托管狀態。
托管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
下麵是Flink整個狀態框架的類圖,還是比較複雜的,可以先掃一眼,看到後麵再回過來看:
通過框架提供的接口,我們來更新和管理狀態的值。
而raw state即原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。
通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。
下文中所提到的狀態,如果沒有特殊說明,均為托管狀態。
使用Keyed State
首先看一下Keyed State下,我們可以用哪些原子狀態:
- ValueState:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過
update
方法更新狀態值,通過value()
方法獲取狀態值。 - ListState:即key上的狀態值為一個列表。可以通過
add
方法往列表中附加值;也可以通過get()
方法返回一個Iterable<T>
來遍曆狀態值。 - ReducingState:這種狀態通過用戶傳入的reduceFunction,每次調用
add
方法添加值的時候,會調用reduceFunction,最後合並到一個單一的狀態值。 - FoldingState:跟ReducingState有點類似,不過它的狀態值類型可以與
add
方法中傳入的元素類型不同(這種狀態將會在Flink未來版本中被刪除)。 - MapState:即狀態值為一個map。用戶通過
put
或putAll
方法添加元素。
以上所有的狀態類型,都有一個clear
方法,可以清除當前key對應的狀態。
需要注意的是,以上所述的State對象,僅僅用於與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲係統中。相當於我們隻是持有了這個狀態的句柄(state handle)。
接下來看下,我們如何得到這個狀態句柄。Flink通過StateDescriptor
來定義一個狀態。這是一個抽象類,內部定義了狀態名稱、類型、序列化器等基礎信息。與上麵的狀態對應,從StateDescriptor
派生了ValueStateDescriptor
, ListStateDescriptor
等descriptor。
具體如下:
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- FoldingState getFoldingState(FoldingStateDescriptor)
- MapState getMapState(MapStateDescriptor)
接下來我們看一下創建和使用ValueState的例子:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* ValueState狀態句柄. 第一個值為count,第二個值為sum。
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// 獲取當前狀態值
Tuple2<Long, Long> currentSum = sum.value();
// 更新
currentSum.f0 += 1;
currentSum.f1 += input.f1;
// 更新狀態值
sum.update(currentSum);
// 如果count >=2 清空狀態值,重新計算
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // 狀態名稱
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 狀態類型
Tuple2.of(0L, 0L)); // 狀態默認值
sum = getRuntimeContext().getState(descriptor);
}
}
// ...
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
由於狀態需要從RuntimeContext
中創建和獲取,因此如果要使用狀態,必須使用RichFunction。普通的Function是無狀態的。
KeyedStream上的scala api則提供了一些語法糖,讓創建和使用狀態更加方便:
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
Inside Keyed State
上麵以Keyed State為例講了如何使用狀態,接下來我們從代碼層麵分析一下,框架在內部做了什麼事情。
先看下上麵例子中open
方法中獲取狀態句柄的代碼:
sum = getRuntimeContext().getState(descriptor);
它調用了RichFlatMapFunction.getRuntimeContext().getState
方法,最終會調用StreamingRuntimeContext.getState
方法:
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
return keyedStateStore.getState(stateProperties);
}
checkPreconditionsAndGetKeyedStateStore
方法中:
KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
return keyedStateStore;
即返回了AbstractStreamOperator.keyedStateStore
變量。這個變量的初始化在AbstractStreamOperator.initState
方法中:
private void initKeyedState() {
try {
TypeSerializer<Object> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
// create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer
if (null != keySerializer) {
KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
long estimatedStateSizeInMB = config.getStateSize();
this.keyedStateBackend = container.createKeyedStateBackend(
keySerializer,
// The maximum parallelism == number of key group
container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
subTaskKeyGroupRange,
estimatedStateSizeInMB);
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
// ...
}
它先調用StreamTask.createKeyedStateBackend
方法創建stateBackend,然後將stateBackend傳入DefaultKeyedStateStore。
StreamTask.createKeyedStateBackend
方法通過它內部的stateBackend來創建keyed statebackend:
backend = stateBackend.createKeyedStateBackend(
getEnvironment(),
getEnvironment().getJobID(),
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
estimatedStateSizeInMB,
getEnvironment().getTaskKvStateRegistry());
看一下statebackend的初始化,在StreamTask.createStateBackend
方法中,這個方法會根據配置項state.backend
的值創建backend,其中內置的backend有jobmanager
, filesystem
, rocksdb
。
jobmanager
的state backend會把狀態存儲在job manager的內存中。filesystem
會把狀態存在文件係統中,有可能是本地文件係統,也有可能是HDFS、S3等分布式文件係統。rocksdb
會把狀態存在rocksdb中。
所以可以看到,創建了state backend之後,創建keyed stated backend,實際上就是調用具體的state backend來創建。我們以filesystem為例,實際就是FsStateBackend.createKeyedStateBackend
方法,這個方法也很簡單,直接返回了HeapKeyedStateBackend
對象。
先不展開說HeapKeyedStateBackend
類,我們返回去看創建keyed state,最終返回的是DefaultKeyedStateStore
對象,它的getState
, getListState
, getReducingState
等方法,都是對底層keyed state backend的一層封裝,keyedStateBackend.getPartitionedState
來返回具體的state handle(DefaultKeyedStateStore.getPartitionedState
方法)。
這個方法實際調用了AbstractKeyedStateBackend.getPartitionedState
方法,HeapKeyedStateBackend
和RocksDBKeyedStateBackend
都從這個基類派生。
這個類有一個成員變量:
private final HashMap<String, InternalKvState<?>> keyValueStatesByName;
它保存了的一個映射。map value中的InternalKvState,實際為創建的HeapValueState
, HeapListState
, RocksDBValueState
, RocksDBListStat
等實現。
回到上麵AbstractKeyedStateBackend.getPartitionedState
,正常的代碼路徑下,它會調用AbstractKeyedStateBackend.getOrCreateKeyedState
方法來創建這個InternalKvState,其方法如下:
S state = stateDescriptor.bind(new StateBackend() {
@Override
public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
}
@Override
public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
}
// ...
AbstractKeyedStateBackend.createValueState
,AbstractKeyedStateBackend.createListState
等方法是AbstractKeyedStateBackend的抽象方法,具體還是在HeapKeyedStateBackend、RocksDBKeyedStateBackend等類中實現的,所以這裏創建的state隻是一個代理,它proxy了具體的上層實現。在我們的例子中,最後繞了一個圈,調用的仍然是HeapKeyedStateBackend.createValueState
方法,並將state name對應的state handle放入到keyValueStatesByName這個map中,保證在一個task中隻有一個同名的state handle。
回來看HeapKeyedStateBackend
,這個類有一個成員變量:
private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
它的key為state name, value為StateTable,用來存儲這個state name下的狀態值。它會將所有的狀態值存儲在內存中。
它的createValueState
方法:
StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
即先注冊StateTable,然後返回一個HeapValueState。
這裏整理一下從應用層麵創建一個ValueState的state handle的過程:
sum = getRuntimeContext().getState(descriptor) (app code)
--> RichFlatMapFunction.getRuntimeContext().getState
--> StreamingRuntimeContext.getState
--> KeyedStateStore.getState(stateProperties)
--> AbstractStreamOperator.keyedStateStore.getState
--> DefaultKeyedStateStore.getState
--> DefaultKeyedStateStore.getPartitionedState
--> AbstractKeyedStateBackend.getPartitionedState
--> AbstractKeyedStateBackend.getOrCreateKeyedState
--> HeapKeyedStateBackend.createValueState
--> HeapKeyedStateBackend.tryRegisterStateTable
--> return new HeapValueState
而從框架層麵看,整個調用流程如下:
Task.run
--> StreamTask.invoke
--> StreamTask.initializeState
--> StreamTask.initializeOperators
--> AbstractStreamOperator.initializeState
--> AbstractStreamOperator.initKeyedState
--> StreamTask.createKeyedStateBackend
--> MemoryStateBackend.createKeyedStateBackend
--> HeapKeyedStateBackend.<init>
整體來看,創建一個state handle還是挺繞的,中間經過了多層封裝和代理。
創建完了state handle,接下來看看如何獲取和更新狀態值。
首先需要講一下HeapState在內存中是如何組織的,還是以最簡單的HeapValueState為例,
具體的數據結構,是在其基類AbstractHeapState
中,以StateTable<K, N, SV> stateTable
的形式存在的,其中K代表Key的類型,N代表state的namespace(這樣屬於不同namespace的state可以重名),SV代表state value的類型。
StateTable
類內部數據結構如下:
protected final KeyGroupRange keyGroupRange;
/** Map for holding the actual state objects. */
private final List<Map<N, Map<K, ST>>> state;
/** Combined meta information such as name and serializers for this state */
protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;
最核心的數據結構是state
成員變量,它保存了一個list,其值類型為Map<N, Map<K, ST>>
,即按namespace和key分組的兩級map。那麼它為什麼是一個list呢,這裏就要提到keyGroupRange
成員變量了,它代表了當前state所包含的key的一個範圍,這個範圍根據當前的sub task id以及最大並發進行計算,在AbstractStreamOperator.initKeyedState
方法中:
KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
舉例來說,如果當前task的並發是2,最大並發是128,那麼task-1所屬的state backend的keyGroupRange為[0,63],而task-2所屬的state backend的keyGroupRange為[64,127]。
這樣,task-1中的StateTable.state這個list,最大size即為64。獲取特定key的state value時,會先計算key的hash值,然後用hash值 % 最大並發,這樣會得到一個[0,127]之間的keyGroup,到這個list中get到這個下標的Map<N, Map<K,V>>
值,然後根據 namespace + key二級獲取到真正的state value。
看到這裏,有人可能會問,對於一個key,如何保證在task-1中,它計算出來的keyGroup一定是在[0,63]之間,在task-2中一定是在[64,127]之間呢?
原因是,在KeyedStream中,使用了KeyGroupStreamPartitioner
這種partitioner來向下遊task分發keys,而這個類重載的selectChannels
方法如下:
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
return returnArray;
這裏關鍵是KeyGroupRangeAssignment.assignKeyToParallelOperator
方法,它中間調用了KeyGroupRangeAssignment.assignToKeyGroup
方法來確定一個key所屬的keyGroup,這個跟state backend計算keyGroup是同一個方法。然後根據這個keyGroup,它會計算出擁有這個keyGroup的task,並將這個key發送到此task。所以能夠保證,從KeyedStream上emit到下遊task的數據,它的state所屬的keyGroup一定是在當前task的keyGroupRange中的。
上麵已經提到了獲取ValueState的值,這裏貼一下代碼,結合一下就很容易理解了:
Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
if (namespaceMap == null) {
return stateDesc.getDefaultValue();
}
Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
return stateDesc.getDefaultValue();
}
V result = keyedMap.get(backend.getCurrentKey());
if (result == null) {
return stateDesc.getDefaultValue();
}
return result;
而更新值則通過ValueState.update
方法進行更新,這裏就不貼代碼了。
上麵講了最簡單的ValueState,其他類型的state,其實也是基本一樣的,隻不過stateTable中狀態值的類型不同而已。如HeapListState,它的狀態值類型為ArrayList;HeapMapState,它的狀態值類型為HashMap。而值類型的不同,導致了在State上的接口也有所不同,如ListState會有add
方法,MapState有put
和get
方法。在這裏就不展開說了。
Checkpoint
到上麵為止,都是簡單的關於狀態的讀寫,而且狀態都還是隻在Task本地,接下來就會涉及到checkpoint。
所謂checkpoint,就是在某一時刻,將所有task的狀態做一個快照(snapshot),然後存儲到memory/file system/rocksdb等。
關於Flink的分布式快照,請參考 分布式Snapshot和Flink Checkpointing簡介 及相關論文,這裏不詳述了。
Flink的checkpoint,是由CheckpointCoordinator
來協調的,它位於JobMaster中。但是其實在ExecutionGraph中已經創建了,見ExecutionGraph.enableSnapshotCheckpointing
方法。
當Job狀態切換到RUNNING時,CheckpointCoordinatorDeActivator
(從JobStatusListener派生)會觸發回調coordinator.startCheckpointScheduler();
,根據配置的checkpoint interval來定期觸發checkpoint。
每個checkpoint由checkpoint ID和timestamp來唯一標識,其中checkpoint ID可以是standalone(基於內存)的,也可能是基於ZK的。
已經完成的checkpoint,保存在CompletedCheckpointStore中,可以是StandaloneCompletedCheckpointStore(保存在JobMaster內存中),也可以是ZooKeeperCompletedCheckpointStore(保存在ZK中),甚至是自己實現的store,比如基於HDFS的。
觸發checkpoint的方法在CheckpointCoordinator.ScheduledTrigger中,隻有一行:
triggerCheckpoint(System.currentTimeMillis(), true);
這個方法比較長,它會先做一係列檢查,如檢查coordinator自身的狀態(是否被shutdown),還會檢查與上次checkpoint的時間間隔、當前的並發checkpoint數是否超過限製,如果都沒問題,再檢查所有task的狀態是否都為RUNNING,都沒問題之後,觸發每個Execution的checkpoint:
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
看下Execution.triggerCheckpoint
方法:
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final SimpleSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
}
}
很簡單,通過RPC調用向TaskManager觸發當前JOB的checkpoint,然後一路調用下去:
RpcTaskManagerGateway.triggerCheckpoint
--> TaskExecutorGateway.triggerCheckpoint
--> TaskExecutor.triggerCheckpoint
--> task.triggerCheckpointBarrier
--> StatefulTask.triggerCheckpoint
--> StreamTask.triggerCheckpoint
--> StreamTask.performCheckpoint
具體做checkpoint的時候,會先向下遊廣播checkpoint barrier,然後調用StreamTask.checkpointState
方法做具體的checkpoint,實際會調用到StreamTask.executeCheckpointing
方法。
checkpoint裏,具體操作為,遍曆每個StreamTask中的所有operator:
- 調用operator的
snapshotState(FSDataOutputStream out, long checkpointId, long timestamp)
方法,存儲operator state,這個結果會返回operator state handle,存儲於nonPartitionedStates
中。這裏實際處理的時候,隻有當user function實現了Checkpointed
接口,才會做snapshot。需要注意的是,此接口已經deprecated,被CheckpointedFunction
代替,而對CheckpointedFunction
的snapshot會在下麵的第2步中來做,因此這兩個接口一般來說是2選1的。 - 調用operator的
snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions)
方法,返回OperatorSnapshotResult
對象。注意雖然每個snapshot方法返回的都是一個RunnableFuture,不過目前實際上還是同步做的checkpoint(可以比較容易改成異步)。- 這裏會先調用
AbstractStreamOperator.snapshotState
方法,為rich function做state snapshot - 調用
operatorStateBackend.snapshot
方法,對operator state做snapshot。 - 調用
keyedStateBackend.snapshot
方法,對keyed state做snapshot。 - 調用
timerServiceBackend.snapshot
方法,對processing time/event time window中注冊的timer回調做snapshot(恢複狀態的時候必須也要恢複timer回調)
- 這裏會先調用
- 調用
StreamTask.runAsyncCheckpointingAndAcknowledge
方法確認上麵的snapshot是否都成功,如果成功,則會向CheckpointCoordinator發送ack消息。 - CheckpointCoordinator收到ack消息後,會檢查本地是否存在這個pending的checkpoint,並且這個checkpoint是否超時,如果都OK,則判斷是否收到所有task的ack消息,如果是,則表示已經完成checkpoint,會得到一個CompletedCheckpoint並加入到completedCheckpointStore中。
在上麵的checkpoint過程中,如果state backend選擇的是jobmanager,那麼最終返回的state handle為ByteStreamStateHandle,這個state handle中包含了snapshot後的所有狀態數據。而如果是filesystem,則state handle隻會包含數據的文件句柄,數據則在filesystem中,這個下麵會再細說。
Filesystem State Backend
上麵提到的都是比較簡單的基於內存的state backend,在實際生產中是不太可行的。因此一般會使用filesystem或者rocksdb的state backend。我們先講一下基於filesystem的state backend。
基於內存的state backend實現為MemoryStateBackend
,基於文件係統的state backend的實現為FsStateBackend
。FsStateBackend有一個策略,當狀態的大小小於1MB(可配置,最大1MB)時,會把狀態數據直接存儲在meta data file中,避免出現很小的狀態文件。
FsStateBackend另外一個成員變量就是basePath
,即checkpoint的路徑。實際做checkpoint時,生成的路徑為:<base-path>/<job-id>/chk-<checkpoint-id>/
。
而且filesystem推薦使用分布式文件係統,如HDFS等,這樣在fail over時可以恢複,如果是本地的filesystem,那恢複的時候是會有問題的。
回到StreamTask,在做checkpoint的時候,是通過CheckpointStateOutputStream
寫狀態的,FsStateBack會使用FsCheckpointStreamFactory
,然後通過FsCheckpointStateOutputStream
去寫具體的狀態,這個實現也比較簡單,就是一個帶buffer的寫文件係統操作。最後向上層返回的StreamStateHandle,視狀態的大小,如果狀態特別小,則會直接返回帶狀態數據的ByteStreamStateHandle
,否則會返回FileStateHandle
,這個state handle包含了狀態文件名和大小。
需要注意的是,雖然checkpoint是寫入到文件係統中,但是基於FsStateBackend創建的keyed state backend,仍然是HeapKeyedStateBackend
,也就是說,keyed state的讀寫仍然是會在內存中的,隻有在做checkpoint的時候才會持久化到文件係統中。
RocksDB State Backend
RocksDB跟上麵的都略有不同,它會在本地文件係統中維護狀態,KeyedStateBackend等會直接寫入本地rocksdb中。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候,會把本地的數據直接複製到filesystem中。fail over的時候從filesystem中恢複到本地。
從RocksDBStateBackend創建出來的RocksDBKeyedStateBackend,更新的時候會直接以key + namespace作為key,然後把具體的值更新到rocksdb中。
如果是ReducingState,則在add
的時候,會先從rocksdb中讀取已有的值,然後根據用戶的reduce function進行reduce,再把新值寫入rocksdb。
做checkpoint的時候,會首先在本地對rockdb做checkpoint(rocksdb自帶的checkpoint功能),這一步是同步的。然後將checkpoint異步複製到遠程文件係統中。最後返回RocksDBStateHandle
。
RocksDB克服了HeapKeyedStateBackend受內存限製的缺點,同時又能夠持久化到遠端文件係統中,比較適合在生產中使用。
Queryable State
Queryable State,顧名思義,就是可查詢的狀態,表示這個狀態,在流計算的過程中就可以被查詢,而不像其他流計算框架,需要存儲到外部係統中才能被查詢。目前可查詢的state主要針對partitionable state,如keyed state等。
簡單來說,當用戶在job中定義了queryable state之後,就可以在外部,通過QueryableStateClient
,通過job id, state name, key來查詢所對應的狀態的實時的值。
queryable state目前支持兩種方法來定義:
通過
KeyedStream.asQueryableState
方法,生成一個QueryableStream,需要注意的是,這個stream類似於一個sink,是不能再做transform的。 實現上,生成QueryableStream就是為當前stream加上一個operator:QueryableAppendingStateOperator
,它的processElement
方法,每來一個元素,就會調用state.add
去更新狀態。因此這種方式有一個限製,隻能使用ValueDescriptor, FoldingStateDescriptor或者ReducingStateDescriptor,而不能是ListStateDescriptor,因為它可能會無限增長導致OOM。此外,由於不能在stream後麵再做transform,也是有一些限製。-
通過managed keyed state。
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), Tuple2.of(0L, 0L)); descriptor.setQueryable("query-name"); // queryable state name
這個隻需要將具體的state descriptor標識為queryable即可,這意味著可以將一個pipeline中間的operator的state標識為可查詢的。
首先根據state descriptor的配置,會在具體的TaskManager中創建一個KvStateServer,用於state查詢,它就是一個簡單的netty server,通過KvStateServerHandler
來處理請求,查詢state value並返回。
但是一個partitionable state,可能存在於多個TaskManager中,因此需要有一個路由機製,當QueryableStateClient給定一個query name和key時,要能夠知道具體去哪個TaskManager中查詢。
為了做到這點,在Job的ExecutionGraph(JobMaster)上會有一個用於定位KvStateServer的KvStateLocationRegistry,當在TaskManager中注冊了一個queryable KvStateServer時,就會調用JobMaster.notifyKvStateRegistered
,通知JobMaster。
具體流程如下圖:
這個設計看起來很美好,通過向流計算實時查詢狀態數據,免去了傳統的存儲等的開銷。但實際上,除了上麵提到的狀態類型的限製之外,也會受netty server以及state backend本身的性能限製,因此並不適用於高並發的查詢。
參考資料:
- Dynamic Scaling: Key Groups
- Stateful Stream Processing
- Working with State
- Scaling to large state
- Queryable state design doc
最後更新:2017-10-19 14:33:50