閱讀507 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flink 原理與實現:Operator Chain原理

Flink原理與實現係列文章 :

Flink 原理與實現:架構和拓撲概覽
Flink 原理與實現:如何生成 StreamGraph
Flink 原理與實現:如何生成 JobGraph
Flink原理與實現:如何生成ExecutionGraph及物理執行圖

Flink的邏輯/執行計劃優化,有一個很大的特點就是,會將多個operator,串在一起作為一個operator chain來執行。關於operator chain,在 Flink 原理與實現:理解 Flink 中的計算資源 中已經有了初步的介紹,在閱讀本文之前,建議先閱讀上文。
本文將從源碼上進一步分析,探究operator chain內部是如何實現的。

OperatorChain是在StreamTask的invoke方法中被創建的:

     // ...
        operatorChain = new OperatorChain<>(this);
        headOperator = operatorChain.getHeadOperator();
        // ...

Flink原理與實現:如何生成ExecutionGraph及物理執行圖中提到,StreamTask是真正的執行task中的invokable operator(的基類),因此所有的task都會創建OperatorChain這個對象。隻是在執行的時候,如果一個operator無法被chain起來,那它就隻有headOperator,chain裏就沒有其他operator了。

OperatorChain構造函數:

            List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
            this.chainEntryPoint = createOutputCollector(containingTask, configuration,
                    chainedConfigs, userCodeClassloader, streamOutputMap, allOps);

            if (headOperator != null) {
                headOperator.setup(containingTask, configuration, getChainEntryPoint());
            }

            // add head operator to end of chain
            allOps.add(headOperator);

這裏headerOperator.setup方法第三個參數為Output,相當於把chainEntryPoint作為output傳入head operator。setup方法一路調用,直到基類AbstractStreamOperator,可以看到:

    this.output = new CountingOutput(output,((OperatorMetricGroup)this.metrics).getIOMetricGroup().getNumRecordsOutCounter());

即對output封裝成了AbstractStreamOperator.CountingOutput,主要是為了統計metrics信息。

而output自身在operator chain中,是一個CopyingChainingOutput,或者ChainingOutput(根據是否配置了reuse objects)。

這裏的headOperator即為operator chain中第一個operator,在這裏即為StreamGroupedReduce。
它在執行processElement的時候,如果有調用output.collect,則會調用CountingOutput。它的collect方法很簡單:

        @Override
        public void collect(StreamRecord<OUT> record) {
            numRecordsOut.inc();
            output.collect(record);
        }

即更新metrics和調用ChainingOutput.collect方法,看看這個方法:

        @Override
        public void collect(StreamRecord<T> record) {
            try {
                numRecordsIn.inc();
                StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
                operator.setKeyContextElement1(copy);
                operator.processElement(copy);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not forward element to next operator", e);
            }
        }

這裏的operator是chainedOperator,即除了headOperator之外,剩餘的operators的chain。
調用這個operator.processElement,就會像上麵一樣,循環調用operator chain裏的所有operator,一直到chain end。

以word count為例,應用代碼如下:

    // ...
    DataStream<String> text = env.fromElements(WordCountData.WORDS);
    DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .keyBy(0).sum(1).filter(new FilterFunction<Tuple2<String, Integer>>() {
                @Override
                public boolean filter(Tuple2<String, Integer> value) throws Exception {
                    return value.f1 > 1;
                }
            });
     env.execute("Streaming WordCount");

它實際上形成了以下的調用鏈:

StreamGroupedReduce.processElement
--> CountingOutput.collect
--> CopyChainingOutput.collect
    --> StreamFilter.processElement
    --> CountingOutput.collect
    --> CopyChainingOutput.collect
        --> StreamSink.processElement
        --> CountingOutput.collect
        --> BroadcastingOutputCollector.collect

下麵會解析如何生成這個調用鏈。我們返回到OperatorChain的構造函數中,看一下這行代碼:

            this.chainEntryPoint = createOutputCollector(containingTask, configuration,
                    chainedConfigs, userCodeClassloader, streamOutputMap, allOps);

到底做了什麼。

這個方法的重要代碼如下:

        // 遍曆當前operatorConfig的輸出邊
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
           // 下遊operator id
            int outputId = outputEdge.getTargetId();
            // 得到下遊operator的stream config
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);

        // 根據下遊operator的stream config,創建chained operator
            Output<StreamRecord<T>> output = createChainedOperator(
                    containingTask,
                    chainedOpConfig,
                    chainedConfigs,
                    userCodeClassloader,
                    streamOutputs,
                    allOperators);
            allOutputs.add(new Tuple2<>(output, outputEdge));
        }

再看下createChainedOperator方法:

     // 第一行就遞歸調用了createOutputCollector方法,創建當前operator下遊operator的collector
        Output<StreamRecord<OUT>> output = createOutputCollector(
                containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);

     // setup當前operator,其實是把下遊operator的collector作為當前operator的output
     // 這樣當前operator調用collect的時候,就會調用下遊operator的方法。
        OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
        chainedOperator.setup(containingTask, operatorConfig, output);

        allOperators.add(chainedOperator);

      // 根據是否reuse object,創建ChainingOutput或者CopyingChainingOutput
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            return new ChainingOutput<>(chainedOperator);
        }
        else {
            TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            return new CopyingChainingOutput<>(chainedOperator, inSerializer);
        }

由於這個過程是遞歸的,所以chained operators實際上是從下遊往上遊去反向一個個創建和setup的。以word count為例,chained operators為:StreamGroupedReduce - StreamFilter - StreamSink,而實際初始化順序則相反:StreamSink - StreamFilter - StreamGroupedReduce。

在OperatorChain類中,headOperator為StreamGroupedReduce。createOutputCollector的調用過程如下:

createOutputCollector(operatorConfig=<StreamGroupedReduce config>, ...)
 --> chainedOpConfig = <StreamFilter config>
 --> createChainedOperator(chainedOpConfig=<StreamFilter config>)
    --> createOutputCollector(<StreamFilter config>)
    --> chainedOpConfig = <StreamSink config>
        --> createChainedOperator(<StreamSink config>)
            --> createOutputCollector(<StreamSink config>)
            --> chainedOpConfig = null, 返回BroadcastingOutputCollector
            --> StreamSink.setup(<output=BroadcastingOutputCollector>)
            --> return CopyingChainingOutput
    --> output = CopyingChainingOutput
    --> StreamFilter.setup(<output=CopyingChainingOutput>)
    --> return CopyingChainingOutput
--> output = CopyingChainingOutput
--> headOperator.setup(<output=CopyingChainingOutput>)            

最後我們來看一下,如果operator chain中隻有一個operator的情況,它生成了什麼。
在word count的例子中,在StreamSource之後的flatMap,就是這種情況,它不能跟後麵的操作chain在一起。

首先OperatorChain構造函數中的chainedConfigs會為空,因為下遊沒有跟它chain在一起的operator。接下來看下它的chainEntryPoint

createOutputCollector方法中,由於沒有chained outputs,因此會直接返回RecordWriterOutput,即headOperator的output就直接交給record writer輸出了。

最後更新:2017-10-19 14:33:46

  上一篇:go  Flink原理與實現:詳解Flink中的狀態管理
  下一篇:go  Work Like Alibaba第三期:數據驅動研發效能提升實踐