Flink - Checkpoint
Flink在流上最大的特點,就是引入全局snapshot,
CheckpointCoordinator
做snapshot的核心組件為, CheckpointCoordinator
/** * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. * * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone * implementations don't support any recovery. */ public class CheckpointCoordinator { /** Tasks who need to be sent a message when a checkpoint is started */ private final ExecutionVertex[] tasksToTrigger; //需要觸發checkpoint的tasks /** Tasks who need to acknowledge a checkpoint before it succeeds */ private final ExecutionVertex[] tasksToWaitFor; /** Tasks who need to be sent a message when a checkpoint is confirmed */ private final ExecutionVertex[] tasksToCommitTo; /** Map from checkpoint ID to the pending checkpoint */ private final Map<Long, PendingCheckpoint> pendingCheckpoints; /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; //用於記錄已經完成的checkpoints /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */ private final ArrayDeque<Long> recentPendingCheckpoints; /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these * need to be ascending across job managers. */ protected final CheckpointIDCounter checkpointIdCounter; //保證產生遞增的checkpoint id,即使當jobmanager crash,也有保證全局checkpoint id是遞增的 /** The base checkpoint interval. Actual trigger time may be affected by the * max concurrent checkpoints and minimum-pause values */ private final long baseInterval; //觸發checkpoint的時間間隔 /** The max time (in ms) that a checkpoint may take */ private final long checkpointTimeout; //一次checkpoint消耗的最大時間,超過,我們就可以認為該checkpoint超時失敗 /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to * enforce minimum processing time between checkpoint attempts */ private final long minPauseBetweenCheckpoints; //checkpoint之間的最小間隔 /** The maximum number of checkpoints that may be in progress at the same time */ private final int maxConcurrentCheckpointAttempts; //最多同時存在多少checkpoint /** Actor that receives status updates from the execution graph this coordinator works for */ private ActorGateway jobStatusListener; /** The number of consecutive failed trigger attempts */ private int numUnsuccessfulCheckpointsTriggers; private ScheduledTrigger currentPeriodicTrigger; /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint. * Non-volatile, because only accessed in synchronized scope */ private boolean periodicScheduling; /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only * accessed in synchronized scope */ private boolean triggerRequestQueued; /** Flag marking the coordinator as shut down (not accepting any messages any more) */ private volatile boolean shutdown; //注意是volatile,保證可見性 /** Shutdown hook thread to clean up state handles. */ private final Thread shutdownHook; /** Helper for tracking checkpoint statistics */ private final CheckpointStatsTracker statsTracker; public CheckpointCoordinator( JobID job, long baseInterval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpointAttempts, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, CheckpointStatsTracker statsTracker) throws Exception { checkpointIDCounter.start(); //開啟CheckpointIDCounter this.timer = new Timer("Checkpoint Timer", true); this.statsTracker = checkNotNull(statsTracker); if (recoveryMode == RecoveryMode.STANDALONE) { // 如果是standalone模式,需要加上shutdownHook來清理state // Add shutdown hook to clean up state handles when no checkpoint recovery is // possible. In case of another configured recovery mode, the checkpoints need to be // available for the standby job managers. this.shutdownHook = new Thread(new Runnable() { @Override public void run() { try { CheckpointCoordinator.this.shutdown(); //顯示的調用shutdown } catch (Throwable t) { LOG.error("Error during shutdown of checkpoint coordinator via " + "JVM shutdown hook: " + t.getMessage(), t); } } }); try { // Add JVM shutdown hook to call shutdown of service Runtime.getRuntime().addShutdownHook(shutdownHook); } catch (IllegalStateException ignored) { // JVM is already shutting down. No need to do anything. } catch (Throwable t) { LOG.error("Cannot register checkpoint coordinator shutdown hook.", t); } } else { this.shutdownHook = null; } }
CheckpointIDCounter
有兩種,
StandaloneCheckpointIDCounter
這種case下的,counter,隻是用AtomicLong來是實現的,那JobManager如果掛了,那這個值可能是丟了的,重啟後,應該是無法保證遞增的
但這裏說,在standalone的情況下,不需要做recovery,所以這個是可以接受的
/** * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. * * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not * recoverable in this recovery mode. */ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter { private final AtomicLong checkpointIdCounter = new AtomicLong(1); @Override public void start() throws Exception { } @Override public void stop() throws Exception { } @Override public long getAndIncrement() throws Exception { return checkpointIdCounter.getAndIncrement(); } @Override public void setCount(long newCount) { checkpointIdCounter.set(newCount); } }
ZooKeeperCheckpointIDCounter
這種counter用zk的persistent node來保存當前的計數,以保證計數的遞增
/** * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. * * <p>Each counter creates a ZNode: * <pre> * +----O /flink/checkpoint-counter/<job-id> 1 [persistent] * . * . * . * +----O /flink/checkpoint-counter/<job-id> N [persistent] * </pre> * * <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. */ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter
CompletedCheckpointStore
接口,用於記錄有哪些已經完成的checkpoint
/** * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. */ public interface CompletedCheckpointStore { /** * Recover available {@link CompletedCheckpoint} instances. * * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest * available checkpoint. */ void recover() throws Exception; /** * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints. * * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of * retained checkpoints, the oldest one will be discarded via {@link * CompletedCheckpoint#discard(ClassLoader)}. */ void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception; /** * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was * added. */ CompletedCheckpoint getLatestCheckpoint() throws Exception; /** * Discards all added {@link CompletedCheckpoint} instances via {@link * CompletedCheckpoint#discard(ClassLoader)}. */ void discardAllCheckpoints() throws Exception; /** * Returns all {@link CompletedCheckpoint} instances. * * <p>Returns an empty list if no checkpoint has been added yet. */ List<CompletedCheckpoint> getAllCheckpoints() throws Exception; /** * Returns the current number of retained checkpoints. */ int getNumberOfRetainedCheckpoints(); }
看下StandaloneCompletedCheckpointStore,其實就是一個用於記錄CompletedCheckpoint的ArrayDeque
class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { /** The completed checkpoints. */ private final ArrayDeque<CompletedCheckpoint> checkpoints; }
ZooKeeperCompletedCheckpointStore,這個就是用zk來記錄
/** * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. * * <p>Checkpoints are added under a ZNode per job: * <pre> * +----O /flink/checkpoints/<job-id> [persistent] * . | * . +----O /flink/checkpoints/<job-id>/1 [persistent] * . . . * . . . * . . . * . +----O /flink/checkpoints/<job-id>/N [persistent] * </pre> * * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, * only the latest one is used and older ones are discarded (even if the maximum number * of retained checkpoints is greater than one). * * <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the * same program, it is OK to take any valid successful checkpoint as long as the "history" of * checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoint to circumvent those situations. */ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
做snapshot流程
StreamingJobGraphGenerator
配置checkpoint
private void configureCheckpointing() { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); //取出Checkpoint的配置 if (cfg.isCheckpointingEnabled()) { long interval = cfg.getCheckpointInterval(); //Checkpoint的時間間隔 // collect the vertices that receive "trigger checkpoint" messages. // currently, these are all the sources List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>(); // collect the vertices that need to acknowledge the checkpoint // currently, these are all vertices List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size()); // collect the vertices that receive "commit checkpoint" messages // currently, these are all vertices List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(); for (JobVertex vertex : jobVertices.values()) { if (vertex.isInputVertex()) { //隻有對source vertex,才加入triggerVertices,因為隻需要在源頭觸發checkpoint triggerVertices.add(vertex.getID()); } // TODO: add check whether the user function implements the checkpointing interface commitVertices.add(vertex.getID()); //當前所有節點都會加入commitVertices和ackVertices ackVertices.add(vertex.getID()); } JobSnapshottingSettings settings = new JobSnapshottingSettings( //生成JobSnapshottingSettings triggerVertices, ackVertices, commitVertices, interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints()); jobGraph.setSnapshotSettings(settings); //調用setSnapshotSettings // if the user enabled checkpointing, the default number of exec retries is infinitive. int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries(); if(executionRetries == -1) { streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE); } } }
JobManager
submitJob的時候,將JobGraph中的配置,放到ExecutionGraph中去
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = { // configure the state checkpointing val snapshotSettings = jobGraph.getSnapshotSettings if (snapshotSettings != null) { val jobId = jobGraph.getJobID() val idToVertex: JobVertexID => ExecutionJobVertex = id => { val vertex = executionGraph.getJobVertex(id) if (vertex == null) { throw new JobSubmissionException(jobId, "The snapshot checkpointing settings refer to non-existent vertex " + id) } vertex } val triggerVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava val ackVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava val confirmVertices: java.util.List[ExecutionJobVertex] = snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava val completedCheckpoints = checkpointRecoveryFactory .createCompletedCheckpoints(jobId, userCodeLoader) val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId) executionGraph.enableSnapshotCheckpointing( snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout, snapshotSettings.getMinPauseBetweenCheckpoints, snapshotSettings.getMaxConcurrentCheckpoints, triggerVertices, ackVertices, confirmVertices, context.system, leaderSessionID.orNull, checkpointIdCounter, completedCheckpoints, recoveryMode, savepointStore) } }
ExecutionGraph
創建checkpointCoordinator對象
public void enableSnapshotCheckpointing( long interval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, ActorSystem actorSystem, UUID leaderSessionID, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, RecoveryMode recoveryMode, StateStore<Savepoint> savepointStore) throws Exception { ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); // disable to make sure existing checkpoint coordinators are cleared disableSnaphotCheckpointing(); if (isStatsDisabled) { checkpointStatsTracker = new DisabledCheckpointStatsTracker(); } else { int historySize = jobConfiguration.getInteger( ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, tasksToWaitFor); } // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( jobID, interval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode, checkpointStatsTracker); // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) registerJobStatusListener( //將checkpointCoordinator的actor注冊到jobStatusListenerActors,這樣當job狀態變化時,可以通知checkpointCoordinator checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
這裏看到checkpointCoordinator 作為ExecutionGraph的成員,
接著會異步的提交ExecutionGraph,
// execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously // because it is a blocking operation future { try { if (isRecovery) { executionGraph.restoreLatestCheckpointedState() //恢複CheckpointedState } else { //...... } submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //把jobGraph放到submittedJobGraphs中track } jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) //告訴client,job提交成功 if (leaderElectionService.hasLeadership) { executionGraph.scheduleForExecution(scheduler) //真正的調度executionGraph } else { //...... } } catch { //....... } }(context.dispatcher)
CheckpointCoordinatorDeActivator
/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor { private final CheckpointCoordinator coordinator; private final UUID leaderSessionID; @Override public void handleMessage(Object message) { if (message instanceof ExecutionGraphMessages.JobStatusChanged) { JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus(); if (status == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } // we ignore all other messages } @Override public UUID getLeaderSessionID() { return leaderSessionID; } }
在job狀態發生變化時,需要打開或關閉Checkpoint scheduler
CheckpointCoordinator
開啟定時startCheckpointScheduler
public void startCheckpointScheduler() { synchronized (lock) { // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; currentPeriodicTrigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); } } private class ScheduledTrigger extends TimerTask { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis()); } catch (Exception e) { LOG.error("Exception while triggering checkpoint", e); } } }
triggerCheckpoint,用於觸發一次checkpoint
/** * Triggers a new checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. * @param nextCheckpointId The checkpoint ID to use for this checkpoint or <code>-1</code> if * the checkpoint ID counter should be queried. */ public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) throws Exception { // we will actually trigger this checkpoint! final long checkpointID; if (nextCheckpointId < 0) { try { // this must happen outside the locked scope, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); } catch (Throwable t) { } } else { checkpointID = nextCheckpointId; } //對於沒有開始的Checkpoint,稱為PendingCheckpoint,傳入所有需要ack checkpoint的ackTasks //後續會一個個ack這些tasks,當所有的ackTasks都被acked,PendingCheckpoint就變成CompletedCheckpoint final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); // schedule the timer that will clean up the expired checkpoints,定期去清理過期的checkpoint TimerTask canceller = new TimerTask() { @Override public void run() { try { synchronized (lock) { // only do the work if the checkpoint is not discarded anyways // note that checkpoint completion discards the pending checkpoint object if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint " + checkpointID + " expired before completing."); checkpoint.discard(userClassLoader); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); onCancelCheckpoint(checkpointID); triggerQueuedRequests(); } } } catch (Throwable t) { LOG.error("Exception while handling checkpoint timeout", t); } } }; try { // re-acquire the lock synchronized (lock) { pendingCheckpoints.put(checkpointID, checkpoint); //將該PendingCheckpoint加入列表track timer.schedule(canceller, checkpointTimeout); //並且啟動canceller } // end of lock scope // send the messages to the tasks that trigger their checkpoint for (int i = 0; i < tasksToTrigger.length; i++) { ExecutionAttemptID id = triggerIDs[i]; TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp); tasksToTrigger[i].sendMessageToCurrentExecution(message, id); //給所有的需要觸發checkpoint的task發送checkpoint message,這裏隻是source tasks } numUnsuccessfulCheckpointsTriggers = 0; return true; } catch (Throwable t) { } }
---------上麵隻會給所有的source發checkpoint message,所以下麵的流程隻有source會走到-----------
TaskManager
sendMessageToCurrentExecution,發送的message最終會被TaskManager收到,
/** * Handler for messages related to checkpoints. * * @param actorMessage The checkpoint message. */ private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { case message: TriggerCheckpoint => //如果是triggerCheckpoint val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp val task = runningTasks.get(taskExecutionId) //從runningTasks中取出真正執行的task if (task != null) { task.triggerCheckpointBarrier(checkpointId, timestamp) //最終是調用task的triggerCheckpointBarrier } case message: NotifyCheckpointComplete => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp val task = runningTasks.get(taskExecutionId) if (task != null) { task.notifyCheckpointComplete(checkpointId) //調用task的notifyCheckpointComplete } else { log.debug( s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.") } // unknown checkpoint message case _ => unhandled(actorMessage) } }
Task
public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) { AbstractInvokable invokable = this.invokable; if (executionState == ExecutionState.RUNNING && invokable != null) { if (invokable instanceof StatefulTask) { // build a local closure final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable; final String taskName = taskNameWithSubtask; Runnable runnable = new Runnable() { @Override public void run() { try { statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp); //關鍵就是調用statefulTask的triggerCheckpoint,這個時候task正在執行,所以checkpoint是並行做的 } catch (Throwable t) { failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t)); } } }; executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName); } } }
StreamTask
StreamTask就是實現了StatefulTask
所以最終調用到,
StreamTask.triggerCheckpoint,這裏麵會實際去做checkpoint工作
調用performCheckpoint(checkpointId, timestamp)
protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception { synchronized (lock) { //加鎖,checkpoint需要stop world if (isRunning) { // Since both state checkpointing and downstream barrier emission occurs in this // lock scope, they are an atomic operation regardless of the order in which they occur. // Given this, we immediately emit the checkpoint barriers, so the downstream operators // can start their checkpoint work as soon as possible operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); //立即發出barrier,理由如上注釋 // now draw the state snapshot final StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); final StreamTaskState[] states = new StreamTaskState[allOperators.length]; boolean hasAsyncStates = false; for (int i = 0; i < states.length; i++) { //根據各個state的類型,判斷是否需要異步 StreamOperator<?> operator = allOperators[i]; if (operator != null) { StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); if (state.getOperatorState() instanceof AsynchronousStateHandle) { hasAsyncStates = true; } if (state.getFunctionState() instanceof AsynchronousStateHandle) { hasAsyncStates = true; } if (state.getKvStates() != null) { for (KvStateSnapshot<?, ?, ?, ?, ?> kvSnapshot: state.getKvStates().values()) { if (kvSnapshot instanceof AsynchronousKvStateSnapshot) { hasAsyncStates = true; } } } states[i] = state.isEmpty() ? null : state; } } for (int i = 0; i < states.length; i++) { //為所有的Operator生成snapshot的StreamTaskState StreamOperator<?> operator = allOperators[i]; if (operator != null) { StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); //通過operator.snapshotOperatorState生成StreamTaskState states[i] = state.isEmpty() ? null : state; } } StreamTaskStateList allStates = new StreamTaskStateList(states); //異步或同步的進行checkpoint if (allStates.isEmpty()) { getEnvironment().acknowledgeCheckpoint(checkpointId); } else if (!hasAsyncStates) { //sync方式 this.lastCheckpointSize = allStates.getStateSize(); getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); } else { //async方式 // start a Thread that does the asynchronous materialization and // then sends the checkpoint acknowledge String threadName = "Materialize checkpoint state " + checkpointId + " - " + getName(); AsyncCheckpointThread checkpointThread = new AsyncCheckpointThread( threadName, this, cancelables, states, checkpointId); synchronized (cancelables) { cancelables.add(checkpointThread); } checkpointThread.start(); } return true; } else { return false; } } }
這裏是對於source而言的checkpoint的調用邏輯,對於中間節點或sink,是要根據barrier情況,通過onEvent來觸發triggerCheckpoint的
StreamTask.triggerCheckpoint最關鍵的步驟是,會對task中每個operator完成state snapshot
最終生成StreamTaskStateList allStates,保存所有的state的list
最終同步或異步的調用
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
把state snapshot發送到Jobmanager去,後麵就看看JobManager怎麼處理的
同步的方式比較簡單,但是一般都是需要異步的做snapshot的,
看看異步的AsyncCheckpointThread
AsyncCheckpointThread
@Override public void run() { try { for (StreamTaskState state : states) { if (state != null) { if (state.getFunctionState() instanceof AsynchronousStateHandle) { AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState(); state.setFunctionState(asyncState.materialize()); } if (state.getOperatorState() instanceof AsynchronousStateHandle) { AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState(); state.setOperatorState(asyncState.materialize()); } if (state.getKvStates() != null) { Set<String> keys = state.getKvStates().keySet(); HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates(); for (String key: keys) { if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) { AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key); kvStates.put(key, asyncHandle.materialize()); //可以看到把真正的存儲,delay到這裏的materialize去做 } } } } } StreamTaskStateList allStates = new StreamTaskStateList(states); owner.lastCheckpointSize = allStates.getStateSize(); owner.getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName()); }
RuntimeEnvironment
package org.apache.flink.runtime.taskmanager;
/** * In implementation of the {@link Environment}. */ public class RuntimeEnvironment implements Environment { @Override public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) { // try and create a serialized version of the state handle SerializedValue<StateHandle<?>> serializedState; long stateSize; if (state == null) { serializedState = null; stateSize = 0; } else { try { serializedState = new SerializedValue<StateHandle<?>>(state); } catch (Exception e) { throw new RuntimeException("Failed to serialize state handle during checkpoint confirmation", e); } try { stateSize = state.getStateSize(); } catch (Exception e) { throw new RuntimeException("Failed to fetch state handle size", e); } } AcknowledgeCheckpoint message = new AcknowledgeCheckpoint( jobId, executionId, checkpointId, serializedState, stateSize); jobManager.tell(message); } }
所以可以看到,是把這個ack發送到job manager的,
JobManager
handleCheckpointMessage
/** * Dedicated handler for checkpoint messages. * * @param actorMessage The checkpoint actor message. */ private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { case ackMessage: AcknowledgeCheckpoint => val jid = ackMessage.getJob() currentJobs.get(jid) match { case Some((graph, _)) => val checkpointCoordinator = graph.getCheckpointCoordinator() val savepointCoordinator = graph.getSavepointCoordinator() if (checkpointCoordinator != null && savepointCoordinator != null) { future { //future等待異步的ack消息 try { if (checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { //JobManager收到checkpoint的ack message // OK, this is the common case } else { // Try the savepoint coordinator if the message was not addressed // to the periodic checkpoint coordinator. if (!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) { log.info("Received message for non-existing checkpoint " + ackMessage.getCheckpointId) } } } catch { case t: Throwable => log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t) } }(context.dispatcher) }
CheckpointCoordinator
receiveAcknowledgeMessage
/** * Receives an AcknowledgeCheckpoint message and returns whether the * message was associated with a pending checkpoint. * * @param message Checkpoint ack from the task manager * * @return Flag indicating whether the ack'd checkpoint was associated * with a pending checkpoint. * * @throws Exception If the checkpoint cannot be added to the completed checkpoint store. */ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception { final long checkpointId = message.getCheckpointId(); CompletedCheckpoint completed = null; PendingCheckpoint checkpoint; // Flag indicating whether the ack message was for a known pending // checkpoint. boolean isPendingCheckpoint; synchronized (lock) { checkpoint = pendingCheckpoints.get(checkpointId); //取出相應的pendingCheckpoint if (checkpoint != null && !checkpoint.isDiscarded()) { isPendingCheckpoint = true; if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) { //根據這個ack message,對pendingCheckpoint進行ack if (checkpoint.isFullyAcknowledged()) { //如果所有需要ack的tasks都完成ack completed = checkpoint.toCompletedCheckpoint(); //將狀態置為Completed completedCheckpointStore.addCheckpoint(completed); //將checkpoint track到completedCheckpointStore,表示完成一次完整的checkpoint pendingCheckpoints.remove(checkpointId); //從pending裏麵去除相應的checkpoint rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(completed.getTimestamp()); onFullyAcknowledgedCheckpoint(completed); triggerQueuedRequests(); } } } } // send the confirmation messages to the necessary targets. we do this here // to be outside the lock scope if (completed != null) { final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp); ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); //通知每個ExecutionVertex,checkpoint完成 } } statsTracker.onCompletedCheckpoint(completed); } return isPendingCheckpoint; }
PendingCheckpoint
在acknowledgeTask中,
隻是把state,cache在collectedStates中,
public boolean acknowledgeTask( ExecutionAttemptID attemptID, SerializedValue<StateHandle<?>> state, long stateSize) { synchronized (lock) { if (discarded) { return false; } ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID); if (vertex != null) { if (state != null) { collectedStates.add(new StateForTask( state, stateSize, vertex.getJobvertexId(), vertex.getParallelSubtaskIndex(), System.currentTimeMillis() - checkpointTimestamp)); } numAcknowledgedTasks++; return true; } else { return false; } } }
接著在收到所有的task的ack後,會調用toCompletedCheckpoint
public CompletedCheckpoint toCompletedCheckpoint() { synchronized (lock) { if (discarded) { throw new IllegalStateException("pending checkpoint is discarded"); } if (notYetAcknowledgedTasks.isEmpty()) { CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId, checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates)); dispose(null, false); return completed; } else { throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged"); } } }
把collectedStates封裝在CompletedCheckpoint中,返回
最後調用completedCheckpointStore.addCheckpoint,存儲這個checkpoint,可以參考
ZooKeeperCompletedCheckpointStore
NotifyCheckpointComplete
通用這個NotifyCheckpointComplete,也最到TaskManager,Task,最終調到StreamTask.notifyCheckpointComplete
@Override public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (lock) { if (isRunning) { LOG.debug("Notification of complete checkpoint for task {}", getName()); // We first notify the state backend if necessary if (stateBackend instanceof CheckpointNotifier) { ((CheckpointNotifier) stateBackend).notifyCheckpointComplete(checkpointId); } for (StreamOperator<?> operator : operatorChain.getAllOperators()) { if (operator != null) { operator.notifyOfCompletedCheckpoint(checkpointId); } } } else { LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName()); } } }
這個就是checkpoint的完整的過程
再看看restore的過程
Restore過程
可以看到,在提交job的時候,會調用
executionGraph.restoreLatestCheckpointedState()
/** * Restores the latest checkpointed state. * * <p>The recovery of checkpoints might block. Make sure that calls to this method don't * block the job manager actor and run asynchronously. * */ public void restoreLatestCheckpointedState() throws Exception { synchronized (progressLock) { if (checkpoin最後更新:2017-04-07 21:23:50
上一篇:
Flink - metrics
下一篇:
Flink - state