閱讀328 返回首頁    go 技術社區[雲棲]


Flink原理與實現:詳解Flink中的狀態管理

Flink原理與實現係列文章 :

Flink 原理與實現:架構和拓撲概覽
Flink 原理與實現:如何生成 StreamGraph
Flink 原理與實現:如何生成 JobGraph
Flink原理與實現:如何生成ExecutionGraph及物理執行圖
Flink原理與實現:Operator Chain原理

上麵Flink原理與實現的文章中,有引用word count的例子,但是都沒有包含狀態管理。也就是說,如果一個task在處理過程中掛掉了,那麼它在內存中的狀態都會丟失,所有的數據都需要重新計算。從容錯和消息處理的語義上(at least once, exactly once),Flink引入了state和checkpoint。

首先區分一下兩個概念,state一般指一個具體的task/operator的狀態。而checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。

Flink通過定期地做checkpoint來實現容錯和恢複。

State

Keyed State和Operator State

Flink中包含兩種基礎的狀態:Keyed State和Operator State。

Keyed State

顧名思義,就是基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。

Operator State

與Keyed State不同,Operator State跟一個特定operator的一個並發實例綁定,整個operator隻對應一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應多個keyed state。

舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。

原始狀態和Flink托管狀態 (Raw and Managed State)

Keyed State和Operator State,可以以兩種形式存在:原始狀態和托管狀態。

托管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。

下麵是Flink整個狀態框架的類圖,還是比較複雜的,可以先掃一眼,看到後麵再回過來看:

image.png

通過框架提供的接口,我們來更新和管理狀態的值。

而raw state即原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。

通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。

下文中所提到的狀態,如果沒有特殊說明,均為托管狀態。

使用Keyed State

首先看一下Keyed State下,我們可以用哪些原子狀態:

  • ValueState:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值。
  • ListState:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍曆狀態值。
  • ReducingState:這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最後合並到一個單一的狀態值。
  • FoldingState:跟ReducingState有點類似,不過它的狀態值類型可以與add方法中傳入的元素類型不同(這種狀態將會在Flink未來版本中被刪除)。
  • MapState:即狀態值為一個map。用戶通過putputAll方法添加元素。

以上所有的狀態類型,都有一個clear方法,可以清除當前key對應的狀態。

需要注意的是,以上所述的State對象,僅僅用於與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲係統中。相當於我們隻是持有了這個狀態的句柄(state handle)。

接下來看下,我們如何得到這個狀態句柄。Flink通過StateDescriptor來定義一個狀態。這是一個抽象類,內部定義了狀態名稱、類型、序列化器等基礎信息。與上麵的狀態對應,從StateDescriptor派生了ValueStateDescriptor, ListStateDescriptor等descriptor。

具體如下:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

接下來我們看一下創建和使用ValueState的例子:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * ValueState狀態句柄. 第一個值為count,第二個值為sum。
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        // 獲取當前狀態值
        Tuple2<Long, Long> currentSum = sum.value();

        // 更新
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;

        // 更新狀態值
        sum.update(currentSum);

        // 如果count >=2 清空狀態值,重新計算
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // 狀態名稱
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 狀態類型
                        Tuple2.of(0L, 0L)); // 狀態默認值
        sum = getRuntimeContext().getState(descriptor);
    }
}

// ...
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

由於狀態需要從RuntimeContext中創建和獲取,因此如果要使用狀態,必須使用RichFunction。普通的Function是無狀態的。

KeyedStream上的scala api則提供了一些語法糖,讓創建和使用狀態更加方便:

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

Inside Keyed State

上麵以Keyed State為例講了如何使用狀態,接下來我們從代碼層麵分析一下,框架在內部做了什麼事情。

先看下上麵例子中open方法中獲取狀態句柄的代碼:

    sum = getRuntimeContext().getState(descriptor);

它調用了RichFlatMapFunction.getRuntimeContext().getState方法,最終會調用StreamingRuntimeContext.getState方法:

    public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
        KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
        return keyedStateStore.getState(stateProperties);
    }

checkPreconditionsAndGetKeyedStateStore方法中:

    KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
    return keyedStateStore;

即返回了AbstractStreamOperator.keyedStateStore變量。這個變量的初始化在AbstractStreamOperator.initState方法中:

    private void initKeyedState() {
        try {
            TypeSerializer<Object> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
            // create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer
            if (null != keySerializer) {
                KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());

                long estimatedStateSizeInMB = config.getStateSize();

                this.keyedStateBackend = container.createKeyedStateBackend(
                        keySerializer,
                        // The maximum parallelism == number of key group
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        subTaskKeyGroupRange,
                        estimatedStateSizeInMB);

                this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
            }

        // ...
    }

它先調用StreamTask.createKeyedStateBackend方法創建stateBackend,然後將stateBackend傳入DefaultKeyedStateStore。

StreamTask.createKeyedStateBackend方法通過它內部的stateBackend來創建keyed statebackend:

    backend = stateBackend.createKeyedStateBackend(
            getEnvironment(),
            getEnvironment().getJobID(),
            operatorIdentifier,
            keySerializer,
            numberOfKeyGroups,
            keyGroupRange,
            estimatedStateSizeInMB,
            getEnvironment().getTaskKvStateRegistry());

看一下statebackend的初始化,在StreamTask.createStateBackend方法中,這個方法會根據配置項state.backend的值創建backend,其中內置的backend有jobmanager, filesystem, rocksdb

jobmanager的state backend會把狀態存儲在job manager的內存中。
filesystem會把狀態存在文件係統中,有可能是本地文件係統,也有可能是HDFS、S3等分布式文件係統。
rocksdb會把狀態存在rocksdb中。

所以可以看到,創建了state backend之後,創建keyed stated backend,實際上就是調用具體的state backend來創建。我們以filesystem為例,實際就是FsStateBackend.createKeyedStateBackend方法,這個方法也很簡單,直接返回了HeapKeyedStateBackend對象。

先不展開說HeapKeyedStateBackend類,我們返回去看創建keyed state,最終返回的是DefaultKeyedStateStore對象,它的getState, getListState, getReducingState等方法,都是對底層keyed state backend的一層封裝,keyedStateBackend.getPartitionedState來返回具體的state handle(DefaultKeyedStateStore.getPartitionedState方法)。

這個方法實際調用了AbstractKeyedStateBackend.getPartitionedState方法,HeapKeyedStateBackendRocksDBKeyedStateBackend都從這個基類派生。

這個類有一個成員變量:

    private final HashMap<String, InternalKvState<?>> keyValueStatesByName;

它保存了的一個映射。map value中的InternalKvState,實際為創建的HeapValueState, HeapListState, RocksDBValueState, RocksDBListStat等實現。

回到上麵AbstractKeyedStateBackend.getPartitionedState,正常的代碼路徑下,它會調用AbstractKeyedStateBackend.getOrCreateKeyedState方法來創建這個InternalKvState,其方法如下:

        S state = stateDescriptor.bind(new StateBackend() {
            @Override
            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
            }

            @Override
            public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
            }
        // ...

AbstractKeyedStateBackend.createValueStateAbstractKeyedStateBackend.createListState等方法是AbstractKeyedStateBackend的抽象方法,具體還是在HeapKeyedStateBackend、RocksDBKeyedStateBackend等類中實現的,所以這裏創建的state隻是一個代理,它proxy了具體的上層實現。在我們的例子中,最後繞了一個圈,調用的仍然是HeapKeyedStateBackend.createValueState方法,並將state name對應的state handle放入到keyValueStatesByName這個map中,保證在一個task中隻有一個同名的state handle。

回來看HeapKeyedStateBackend,這個類有一個成員變量:

    private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();

它的key為state name, value為StateTable,用來存儲這個state name下的狀態值。它會將所有的狀態值存儲在內存中。

它的createValueState方法:

        StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
        return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);

即先注冊StateTable,然後返回一個HeapValueState。

這裏整理一下從應用層麵創建一個ValueState的state handle的過程:

sum = getRuntimeContext().getState(descriptor) (app code)
  --> RichFlatMapFunction.getRuntimeContext().getState
  --> StreamingRuntimeContext.getState
    --> KeyedStateStore.getState(stateProperties)
    --> AbstractStreamOperator.keyedStateStore.getState
      --> DefaultKeyedStateStore.getState
      --> DefaultKeyedStateStore.getPartitionedState
      --> AbstractKeyedStateBackend.getPartitionedState
      --> AbstractKeyedStateBackend.getOrCreateKeyedState
        --> HeapKeyedStateBackend.createValueState
        --> HeapKeyedStateBackend.tryRegisterStateTable
        --> return new HeapValueState        

而從框架層麵看,整個調用流程如下:

Task.run
  --> StreamTask.invoke
  --> StreamTask.initializeState
  --> StreamTask.initializeOperators
    --> AbstractStreamOperator.initializeState
    --> AbstractStreamOperator.initKeyedState
      --> StreamTask.createKeyedStateBackend
        --> MemoryStateBackend.createKeyedStateBackend
          --> HeapKeyedStateBackend.<init>

整體來看,創建一個state handle還是挺繞的,中間經過了多層封裝和代理。


創建完了state handle,接下來看看如何獲取和更新狀態值。

首先需要講一下HeapState在內存中是如何組織的,還是以最簡單的HeapValueState為例,
具體的數據結構,是在其基類AbstractHeapState中,以StateTable<K, N, SV> stateTable的形式存在的,其中K代表Key的類型,N代表state的namespace(這樣屬於不同namespace的state可以重名),SV代表state value的類型。

StateTable類內部數據結構如下:

    protected final KeyGroupRange keyGroupRange;
    /** Map for holding the actual state objects. */
    private final List<Map<N, Map<K, ST>>> state;
    /** Combined meta information such as name and serializers for this state */
    protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;

最核心的數據結構是state成員變量,它保存了一個list,其值類型為Map<N, Map<K, ST>>,即按namespace和key分組的兩級map。那麼它為什麼是一個list呢,這裏就要提到keyGroupRange成員變量了,它代表了當前state所包含的key的一個範圍,這個範圍根據當前的sub task id以及最大並發進行計算,在AbstractStreamOperator.initKeyedState方法中:

                KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
                        container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
                        container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());

舉例來說,如果當前task的並發是2,最大並發是128,那麼task-1所屬的state backend的keyGroupRange為[0,63],而task-2所屬的state backend的keyGroupRange為[64,127]。

這樣,task-1中的StateTable.state這個list,最大size即為64。獲取特定key的state value時,會先計算key的hash值,然後用hash值 % 最大並發,這樣會得到一個[0,127]之間的keyGroup,到這個list中get到這個下標的Map<N, Map<K,V>>值,然後根據 namespace + key二級獲取到真正的state value。

看到這裏,有人可能會問,對於一個key,如何保證在task-1中,它計算出來的keyGroup一定是在[0,63]之間,在task-2中一定是在[64,127]之間呢?

原因是,在KeyedStream中,使用了KeyGroupStreamPartitioner這種partitioner來向下遊task分發keys,而這個類重載的selectChannels方法如下:

        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
        return returnArray;

這裏關鍵是KeyGroupRangeAssignment.assignKeyToParallelOperator方法,它中間調用了KeyGroupRangeAssignment.assignToKeyGroup方法來確定一個key所屬的keyGroup,這個跟state backend計算keyGroup是同一個方法。然後根據這個keyGroup,它會計算出擁有這個keyGroup的task,並將這個key發送到此task。所以能夠保證,從KeyedStream上emit到下遊task的數據,它的state所屬的keyGroup一定是在當前task的keyGroupRange中的。

上麵已經提到了獲取ValueState的值,這裏貼一下代碼,結合一下就很容易理解了:

        Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());
        if (namespaceMap == null) {
            return stateDesc.getDefaultValue();
        }

        Map<K, V> keyedMap = namespaceMap.get(currentNamespace);
        if (keyedMap == null) {
            return stateDesc.getDefaultValue();
        }

        V result = keyedMap.get(backend.getCurrentKey());
        if (result == null) {
            return stateDesc.getDefaultValue();
        }

        return result;

而更新值則通過ValueState.update方法進行更新,這裏就不貼代碼了。

上麵講了最簡單的ValueState,其他類型的state,其實也是基本一樣的,隻不過stateTable中狀態值的類型不同而已。如HeapListState,它的狀態值類型為ArrayList;HeapMapState,它的狀態值類型為HashMap。而值類型的不同,導致了在State上的接口也有所不同,如ListState會有add方法,MapState有putget方法。在這裏就不展開說了。


Checkpoint

到上麵為止,都是簡單的關於狀態的讀寫,而且狀態都還是隻在Task本地,接下來就會涉及到checkpoint。
所謂checkpoint,就是在某一時刻,將所有task的狀態做一個快照(snapshot),然後存儲到memory/file system/rocksdb等。

關於Flink的分布式快照,請參考 分布式Snapshot和Flink Checkpointing簡介 及相關論文,這裏不詳述了。

Flink的checkpoint,是由CheckpointCoordinator來協調的,它位於JobMaster中。但是其實在ExecutionGraph中已經創建了,見ExecutionGraph.enableSnapshotCheckpointing方法。

當Job狀態切換到RUNNING時,CheckpointCoordinatorDeActivator(從JobStatusListener派生)會觸發回調coordinator.startCheckpointScheduler();,根據配置的checkpoint interval來定期觸發checkpoint。

每個checkpoint由checkpoint ID和timestamp來唯一標識,其中checkpoint ID可以是standalone(基於內存)的,也可能是基於ZK的。
已經完成的checkpoint,保存在CompletedCheckpointStore中,可以是StandaloneCompletedCheckpointStore(保存在JobMaster內存中),也可以是ZooKeeperCompletedCheckpointStore(保存在ZK中),甚至是自己實現的store,比如基於HDFS的。

觸發checkpoint的方法在CheckpointCoordinator.ScheduledTrigger中,隻有一行:

    triggerCheckpoint(System.currentTimeMillis(), true);

這個方法比較長,它會先做一係列檢查,如檢查coordinator自身的狀態(是否被shutdown),還會檢查與上次checkpoint的時間間隔、當前的並發checkpoint數是否超過限製,如果都沒問題,再檢查所有task的狀態是否都為RUNNING,都沒問題之後,觸發每個Execution的checkpoint:

    for (Execution execution: executions) {
        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }

看下Execution.triggerCheckpoint方法:

    public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final SimpleSlot slot = assignedResource;

        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }

很簡單,通過RPC調用向TaskManager觸發當前JOB的checkpoint,然後一路調用下去:

RpcTaskManagerGateway.triggerCheckpoint
  --> TaskExecutorGateway.triggerCheckpoint
  --> TaskExecutor.triggerCheckpoint
    --> task.triggerCheckpointBarrier
    --> StatefulTask.triggerCheckpoint
    --> StreamTask.triggerCheckpoint
    --> StreamTask.performCheckpoint

具體做checkpoint的時候,會先向下遊廣播checkpoint barrier,然後調用StreamTask.checkpointState方法做具體的checkpoint,實際會調用到StreamTask.executeCheckpointing方法。

checkpoint裏,具體操作為,遍曆每個StreamTask中的所有operator:

  1. 調用operator的snapshotState(FSDataOutputStream out, long checkpointId, long timestamp)方法,存儲operator state,這個結果會返回operator state handle,存儲於nonPartitionedStates中。這裏實際處理的時候,隻有當user function實現了Checkpointed接口,才會做snapshot。需要注意的是,此接口已經deprecated,被CheckpointedFunction代替,而對CheckpointedFunction的snapshot會在下麵的第2步中來做,因此這兩個接口一般來說是2選1的。
  2. 調用operator的snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions)方法,返回OperatorSnapshotResult對象。注意雖然每個snapshot方法返回的都是一個RunnableFuture,不過目前實際上還是同步做的checkpoint(可以比較容易改成異步)。
    1. 這裏會先調用AbstractStreamOperator.snapshotState方法,為rich function做state snapshot
    2. 調用operatorStateBackend.snapshot方法,對operator state做snapshot。
    3. 調用keyedStateBackend.snapshot方法,對keyed state做snapshot。
    4. 調用timerServiceBackend.snapshot方法,對processing time/event time window中注冊的timer回調做snapshot(恢複狀態的時候必須也要恢複timer回調)
  3. 調用StreamTask.runAsyncCheckpointingAndAcknowledge方法確認上麵的snapshot是否都成功,如果成功,則會向CheckpointCoordinator發送ack消息。
  4. CheckpointCoordinator收到ack消息後,會檢查本地是否存在這個pending的checkpoint,並且這個checkpoint是否超時,如果都OK,則判斷是否收到所有task的ack消息,如果是,則表示已經完成checkpoint,會得到一個CompletedCheckpoint並加入到completedCheckpointStore中。

在上麵的checkpoint過程中,如果state backend選擇的是jobmanager,那麼最終返回的state handle為ByteStreamStateHandle,這個state handle中包含了snapshot後的所有狀態數據。而如果是filesystem,則state handle隻會包含數據的文件句柄,數據則在filesystem中,這個下麵會再細說。


Filesystem State Backend

上麵提到的都是比較簡單的基於內存的state backend,在實際生產中是不太可行的。因此一般會使用filesystem或者rocksdb的state backend。我們先講一下基於filesystem的state backend。

基於內存的state backend實現為MemoryStateBackend,基於文件係統的state backend的實現為FsStateBackend。FsStateBackend有一個策略,當狀態的大小小於1MB(可配置,最大1MB)時,會把狀態數據直接存儲在meta data file中,避免出現很小的狀態文件。

FsStateBackend另外一個成員變量就是basePath,即checkpoint的路徑。實際做checkpoint時,生成的路徑為:<base-path>/<job-id>/chk-<checkpoint-id>/

而且filesystem推薦使用分布式文件係統,如HDFS等,這樣在fail over時可以恢複,如果是本地的filesystem,那恢複的時候是會有問題的。

回到StreamTask,在做checkpoint的時候,是通過CheckpointStateOutputStream寫狀態的,FsStateBack會使用FsCheckpointStreamFactory,然後通過FsCheckpointStateOutputStream去寫具體的狀態,這個實現也比較簡單,就是一個帶buffer的寫文件係統操作。最後向上層返回的StreamStateHandle,視狀態的大小,如果狀態特別小,則會直接返回帶狀態數據的ByteStreamStateHandle,否則會返回FileStateHandle,這個state handle包含了狀態文件名和大小。

需要注意的是,雖然checkpoint是寫入到文件係統中,但是基於FsStateBackend創建的keyed state backend,仍然是HeapKeyedStateBackend,也就是說,keyed state的讀寫仍然是會在內存中的,隻有在做checkpoint的時候才會持久化到文件係統中。

RocksDB State Backend

RocksDB跟上麵的都略有不同,它會在本地文件係統中維護狀態,KeyedStateBackend等會直接寫入本地rocksdb中。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候,會把本地的數據直接複製到filesystem中。fail over的時候從filesystem中恢複到本地。

從RocksDBStateBackend創建出來的RocksDBKeyedStateBackend,更新的時候會直接以key + namespace作為key,然後把具體的值更新到rocksdb中。

如果是ReducingState,則在add的時候,會先從rocksdb中讀取已有的值,然後根據用戶的reduce function進行reduce,再把新值寫入rocksdb。

做checkpoint的時候,會首先在本地對rockdb做checkpoint(rocksdb自帶的checkpoint功能),這一步是同步的。然後將checkpoint異步複製到遠程文件係統中。最後返回RocksDBStateHandle

RocksDB克服了HeapKeyedStateBackend受內存限製的缺點,同時又能夠持久化到遠端文件係統中,比較適合在生產中使用。


Queryable State

Queryable State,顧名思義,就是可查詢的狀態,表示這個狀態,在流計算的過程中就可以被查詢,而不像其他流計算框架,需要存儲到外部係統中才能被查詢。目前可查詢的state主要針對partitionable state,如keyed state等。

簡單來說,當用戶在job中定義了queryable state之後,就可以在外部,通過QueryableStateClient,通過job id, state name, key來查詢所對應的狀態的實時的值。

queryable state目前支持兩種方法來定義:

  • 通過KeyedStream.asQueryableState方法,生成一個QueryableStream,需要注意的是,這個stream類似於一個sink,是不能再做transform的。 實現上,生成QueryableStream就是為當前stream加上一個operator:QueryableAppendingStateOperator,它的processElement方法,每來一個元素,就會調用state.add去更新狀態。因此這種方式有一個限製,隻能使用ValueDescriptor, FoldingStateDescriptor或者ReducingStateDescriptor,而不能是ListStateDescriptor,因為它可能會無限增長導致OOM。此外,由於不能在stream後麵再做transform,也是有一些限製。

  • 通過managed keyed state。

    ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
        new ValueStateDescriptor<>(
                "average", // the state name
                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
                Tuple2.of(0L, 0L)); 
    descriptor.setQueryable("query-name"); // queryable state name
    

    這個隻需要將具體的state descriptor標識為queryable即可,這意味著可以將一個pipeline中間的operator的state標識為可查詢的。

首先根據state descriptor的配置,會在具體的TaskManager中創建一個KvStateServer,用於state查詢,它就是一個簡單的netty server,通過KvStateServerHandler來處理請求,查詢state value並返回。

但是一個partitionable state,可能存在於多個TaskManager中,因此需要有一個路由機製,當QueryableStateClient給定一個query name和key時,要能夠知道具體去哪個TaskManager中查詢。

為了做到這點,在Job的ExecutionGraph(JobMaster)上會有一個用於定位KvStateServer的KvStateLocationRegistry,當在TaskManager中注冊了一個queryable KvStateServer時,就會調用JobMaster.notifyKvStateRegistered,通知JobMaster。

具體流程如下圖:

image.png

這個設計看起來很美好,通過向流計算實時查詢狀態數據,免去了傳統的存儲等的開銷。但實際上,除了上麵提到的狀態類型的限製之外,也會受netty server以及state backend本身的性能限製,因此並不適用於高並發的查詢。


參考資料:

  1. Dynamic Scaling: Key Groups
  2. Stateful Stream Processing
  3. Working with State
  4. Scaling to large state
  5. Queryable state design doc

最後更新:2017-10-19 14:33:50

  上一篇:go  Flink原理與實現:如何生成ExecutionGraph及物理執行圖
  下一篇:go  Flink 原理與實現:Operator Chain原理