閱讀334 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flink原理與實現:如何生成ExecutionGraph及物理執行圖

閱讀本文之前,請先閱讀Flink原理與實現係列前麵的幾篇文章 :

Flink 原理與實現:架構和拓撲概覽
Flink 原理與實現:如何生成 StreamGraph
Flink 原理與實現:如何生成 JobGraph

ExecutionGraph生成過程

StreamGraph和JobGraph都是在client生成的,這篇文章將描述如何生成ExecutionGraph以及物理執行圖。同時會講解一個作業提交後如何被調度和執行。

client生成JobGraph之後,就通過submitJob提交至JobMaster。
在其構造函數中,會生成ExecutionGraph:

    this.executionGraph = ExecutionGraphBuilder.buildGraph(...)

看下這個方法,比較長,略過了一些次要的代碼片斷:


     // 流式作業中,schedule mode固定是EAGER的
        executionGraph.setScheduleMode(jobGraph.getScheduleMode());
        executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());

     // 設置json plan
     // ...

     // 檢查executableClass(即operator類),設置最大並發
     // ...

        // 按拓撲順序,獲取所有的JobVertex列表
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();

        // 根據JobVertex列表,生成execution graph
        executionGraph.attachJobGraph(sortedTopology);

        // checkpoint檢查

可以看到,生成execution graph的代碼,主要是在最後一行,即ExecutionGraph.attachJobGraph方法:

    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException, IOException {
       // 遍曆job vertex
        for (JobVertex jobVertex : topologiallySorted) {
            // 根據每一個job vertex,創建對應的ExecutionVertex
            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
            // 將創建的ExecutionJobVertex與前置的IntermediateResult連接起來
            ejv.connectToPredecessors(this.intermediateResults);

            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);

        // sanity check
        // ...

            this.verticesInCreationOrder.add(ejv);
        }
    }

可以看到,創建ExecutionJobVertex的重點就在它的構造函數中:

     // 上麵是並行度相關的設置

     // 序列化後的TaskInformation,這個信息很重要
     // 後麵deploy的時候會將TaskInformation分發到具體的Task中。
        this.serializedTaskInformation = new SerializedValue<>(new TaskInformation(
            jobVertex.getID(),
            jobVertex.getName(),
            parallelism,
            maxParallelism,
            // 這個就是Task將要執行的Operator的類名
            jobVertex.getInvokableClassName(),
            jobVertex.getConfiguration()));

     // ExecutionVertex列表,按照JobVertex並行度設置      
        this.taskVertices = new ExecutionVertex[numTaskVertices];

        this.inputs = new ArrayList<>(jobVertex.getInputs().size());

        // slot sharing和coLocation相關代碼
        // ...

        // 創建intermediate results,這是由當前operator的出度確定的,如果當前operator隻向下遊一個operator輸出,則為1
        // 注意一個IntermediateResult包含多個IntermediateResultPartition
        this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];

        for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
            final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);

            this.producedDataSets[i] = new IntermediateResult(
                    result.getId(),
                    this,
                    numTaskVertices,
                    result.getResultType());
        }

        // 根據job vertex的並行度,創建對應的ExecutionVertex列表。
        // 即,一個JobVertex/ExecutionJobVertex代表的是一個operator,而
        // 具體的ExecutionVertex則代表了每一個Task
        for (int i = 0; i < numTaskVertices; i++) {
            ExecutionVertex vertex = new ExecutionVertex(
                    this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);

            this.taskVertices[i] = vertex;
        }

        // sanity check
        // ...

        // set up the input splits, if the vertex has any
        // 這是batch相關的代碼
        // ...

        finishedSubtasks = new boolean[parallelism];

ExecutionJobVertex和ExecutionVertex是創建完了,但是ExecutionEdge還沒有創建呢,接下來看一下attachJobGraph方法中這一行代碼:

    ejv.connectToPredecessors(this.intermediateResults);

這個方法代碼如下:

     // 獲取輸入的JobEdge列表
        List<JobEdge> inputs = jobVertex.getInputs();

        // 遍曆每條JobEdge      
        for (int num = 0; num < inputs.size(); num++) {
            JobEdge edge = inputs.get(num);

            // 獲取當前JobEdge的輸入所對應的IntermediateResult
            IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
            if (ires == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
                        + edge.getSourceId());
            }

            // 將IntermediateResult加入到當前ExecutionJobVertex的輸入中。
            this.inputs.add(ires);

            // 為IntermediateResult注冊consumer
            // consumerIndex跟IntermediateResult的出度相關
            int consumerIndex = ires.registerConsumer();

            for (int i = 0; i < parallelism; i++) {
                ExecutionVertex ev = taskVertices[i];
                // 將ExecutionVertex與IntermediateResult關聯起來
                ev.connectSource(num, ires, edge, consumerIndex);
            }
        }

看下ExecutionVertex.connectSource方法代碼:

    public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {

     // 隻有forward的方式的情況下,pattern才是POINTWISE的,否則均為ALL_TO_ALL
        final DistributionPattern pattern = edge.getDistributionPattern();
        final IntermediateResultPartition[] sourcePartitions = source.getPartitions();

        ExecutionEdge[] edges;

        switch (pattern) {
            case POINTWISE:
                edges = connectPointwise(sourcePartitions, inputNumber);
                break;

            case ALL_TO_ALL:
                edges = connectAllToAll(sourcePartitions, inputNumber);
                break;

            default:
                throw new RuntimeException("Unrecognized distribution pattern.");

        }

        this.inputEdges[inputNumber] = edges;

        // 之前已經為IntermediateResult添加了consumer,這裏為IntermediateResultPartition添加consumer,即關聯到ExecutionEdge上
        for (ExecutionEdge ee : edges) {
            ee.getSource().addConsumer(ee, consumerNumber);
        }
    }

connectAllToAll方法:

        ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];

        for (int i = 0; i < sourcePartitions.length; i++) {
            IntermediateResultPartition irp = sourcePartitions[i];
            edges[i] = new ExecutionEdge(irp, this, inputNumber);
        }

        return edges;

看這個方法之前,需要知道,ExecutionVertex的inputEdges變量,是一個二維數據。它表示了這個ExecutionVertex上每一個input所包含的ExecutionEdge列表。

即,如果ExecutionVertex有兩個不同的輸入:輸入A和B。其中輸入A的partition=1, 輸入B的partition=8,那麼這個二維數組inputEdges如下(為簡短,以irp代替IntermediateResultPartition)

[ ExecutionEdge[ A.irp[0]] ]
[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]

所以上麵的代碼就很容易理解了。

到這裏為止,ExecutionJobGraph就創建完成了。接下來看下這個ExecutionGraph是如何轉化成Task並開始執行的。


Task調度和執行

接下來我們以最簡單的mini cluster為例講解一下Task如何被調度和執行。

簡單略過client端job的提交和StreamGraph到JobGraph的翻譯,以及上麵ExecutionGraph的翻譯。

提交後的job的流通過程大致如下:

env.execute('<job name>')
  --> MiniCluster.runJobBlocking(jobGraph)
  --> MiniClusterDispatcher.runJobBlocking(jobGraph)
  --> MiniClusterDispatcher.startJobRunners
    --> JobManagerRunner.start
    --> JobMaster.<init> (build ExecutionGraph)

創建完JobMaster之後,JobMaster就會進行leader election,得到leader之後,會回調grantLeadership方法,從而調用jobManager.start(leaderSessionID);開始運行job。

JobMaster.start 
    --> JobMaster.startJobExecution(這裏還沒開始執行呢..)
    --> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());    

重點是在下麵這行,獲取到resource manage之後,就會回調ResourceManagerLeaderListener.notifyLeaderAddress,整個調用流如下:

ResourceManagerLeaderListener.notifyLeaderAddress
    --> JobMaster.notifyOfNewResourceManagerLeader
    --> ResourceManagerConnection.start
    --> ResourceManagerConnection.onRegistrationSuccess(callback,由flink rpc框架發送並回調)
    --> JobMaster.onResourceManagerRegistrationSuccess

然後終於來到了最核心的調度代碼,在JobMaster.onResourceManagerRegistrationSuccess方法中:

    executionContext.execute(new Runnable() {
        @Override
        public void run() {
            try {
                executionGraph.restoreExternalCheckpointedStore();
                executionGraph.setQueuedSchedulingAllowed(true);
                executionGraph.scheduleForExecution(slotPool.getSlotProvider());
            }
            catch (Throwable t) {
                executionGraph.fail(t);
            }
        }
    });

ExecutionGraph.scheduleForExecution --> ExecutionGraph.scheduleEager

這個方法會計算所有的ExecutionVertex總數,並為每個ExecutionVertex分配一個SimpleSlot(暫時不考慮slot sharing的情況),然後封裝成ExecutionAndSlot,顧名思義,即ExecutionVertex + Slot(更為貼切地說,應該是ExecutionAttempt + Slot)。

然後調用execAndSlot.executionAttempt.deployToSlot(slot);進行deploy,即Execution.deployToSlot

這個方法先會進行一係列狀態遷移和檢查,然後進行deploy,比較核心的代碼如下:

        final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
            attemptId,
            slot,
            taskState,
            attemptNumber);

        // register this execution at the execution graph, to receive call backs
        vertex.getExecutionGraph().registerExecution(this);

        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();     final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);

ExecutionVertex.createDeploymentDescriptor方法中,包含了從Execution Graph到真正物理執行圖的轉換。如將IntermediateResultPartition轉化成ResultPartition,ExecutionEdge轉成InputChannelDeploymentDescriptor(最終會在執行時轉化成InputGate)。

最後通過RPC方法提交task,實際會調用到TaskExecutor.submitTask方法中。
這個方法會創建真正的Task,然後調用task.startTaskThread();開始task的執行。

在Task構造函數中,會根據輸入的參數,創建InputGate, ResultPartition, ResultPartitionWriter等。

startTaskThread方法,則會執行executingThread.start,從而調用Task.run方法。
它的最核心的代碼如下:

     // ...

        // now load the task's invokable code
        invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);

      // ...
      invokable.setEnvironment(env);

      // ...
      this.invokable = invokable;
      invokable.invoke();

      // task finishes or fails, do cleanup
      // ...

這裏的invokable即為operator對象實例,通過反射創建。具體地,即為OneInputStreamTask,或者SourceStreamTask等。這個nameOfInvokableClass是哪裏生成的呢?其實早在生成StreamGraph的時候,這就已經確定了,見StreamGraph.addOperator方法:

        if (operatorObject instanceof StoppableStreamSource) {
            addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
        } else if (operatorObject instanceof StreamSource) {
            addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
        } else {
            addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
        }

這裏的OneInputStreamTask.class即為生成的StreamNode的vertexClass。這個值會一直傳遞,當StreamGraph被轉化成JobGraph的時候,這個值會被傳遞到JobVertex的invokableClass。然後當JobGraph被轉成ExecutionGraph的時候,這個值被傳入到ExecutionJobVertex.TaskInformation.invokableClassName中,一直傳到Task中。

那麼用戶真正寫的邏輯代碼在哪裏呢?比如word count中的Tokenizer,去了哪裏呢?
OneInputStreamTask的基類StreamTask,包含了headOperator和operatorChain。當我們調用dataStream.flatMap(new Tokenizer())的時候,會生成一個StreamFlatMap的operator,這個operator是一個AbstractUdfStreamOperator,而用戶的代碼new Tokenizer,即為它的userFunction。

所以再串回來,以OneInputStreamTask為例,Task的核心執行代碼即為OneInputStreamTask.invoke方法,它會調用StreamTask.run方法,這是個抽象方法,最終會調用其派生類的run方法,即OneInputStreamTask, SourceStreamTask等。

OneInputStreamTask的run方法代碼如下:

    final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
    final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
    final Object lock = getCheckpointLock();

    while (running && inputProcessor.processInput(operator, lock)) {
        // all the work happens in the "processInput" method
    }

就是一直不停地循環調用inputProcessor.processInput(operator, lock)方法,即StreamInputProcessor.processInput方法:

    public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
     // ...

        while (true) {
            if (currentRecordDeserializer != null) {
           // ...

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

              // 處理watermark,則框架處理
                    if (recordOrMark.isWatermark()) {
                       // watermark處理邏輯
                       // ...
                        continue;
                    } else if(recordOrMark.isLatencyMarker()) {
                        // 處理latency mark,也是由框架處理
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // ***** 這裏是真正的用戶邏輯代碼 *****
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

        // 其他處理邏輯
        // ...
        }
    }

上麵的代碼中,streamOperator.processElement(record);才是真正處理用戶邏輯的代碼,以StreamFlatMap為例,即為它的processElement方法:

    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }

這樣,整個調度和執行邏輯就全部串起來啦。

最後更新:2017-10-19 15:03:25

  上一篇:go  Flink原理與實現:Window的實現原理
  下一篇:go  Flink原理與實現:詳解Flink中的狀態管理