閱讀136 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flink原理與實現:Window的實現原理

Flink原理與實現係列文章:

Flink 原理與實現:架構和拓撲概覽
Flink 原理與實現:如何生成 StreamGraph
Flink 原理與實現:如何生成 JobGraph
Flink原理與實現:如何生成ExecutionGraph及物理執行圖
Flink原理與實現:Operator Chain原理
Flink原理與實現:詳解Flink中的狀態管理

在閱讀本文之前,請先閱讀Flink 原理與實現:Window機製,這篇文章從用戶的角度,對Window做了比較詳細的分析,而本文主要是從Flink框架的實現層麵,對Window做另一個角度的分析。

首先看一個比較簡單的情況,假設我們在一個KeyedStream上做了一個10秒鍾的tumbling processing time window,也就是說,每隔10秒鍾窗口會觸發一次,即:

  dataStream.keyBy(0).timeWindow(Time.seconds(10)).sum(1);

在研究源碼之前,我們可以在腦子裏大概想象一下它應該會怎麼處理:

  1. 給定一條數據,會給這條數據assign windows,在這個例子中,因為是翻滾窗口,所以隻會assign出一個窗口。
  2. assign了窗口之後,我們就知道這條消息對應的窗口起始時間,這裏比較重要的是窗口的右邊界,即窗口過期時間。
  3. 我們可以在窗口的過期時間之上,注冊一個Scheduled Future,到時間之後讓它自動回調窗口的聚合函數,即觸發窗口內的數據計算。

上麵的第3步,針對KeyedStream,需要再擴展一下,針對一條數據,我們應該注冊一個基於Key + Window的Scheduled Future,到時間之後隻觸發對於這個key的數據計算。當然,這裏我們不自覺地會想,當key的數目非常大的時候,可能會創建出大量的Future,這會是一個問題。

腦子中大致有上麵的思路之後,我們來看一下Flink的實現。

首先,KeyedStream.timeWindow方法會生成一個WindowedStream,sum則是我們的aggregator,因此在WindowedStream中,實際調用了aggregate(new SumAggregator(...)),然後一層層調下來,目標就是生成一個WindowOperator。

在Flink中,根據是否配置了evictor,會生成兩種不同的WindowOperator:

  • 有evictor:這種情況下意味著用戶需要精確控製如何evict窗口的元素,因此所有的數據都會先緩存起來。此時會生成EvictingWindowOperator
  • 無evictor:這種情況下,通過指定一個reduce function,來一條數據就會進行reduce,當到達窗口邊界之後,直接輸出結果就可以了。此時會生成WindowOperator

不管哪一種operator,都需要指定兩個function:

  • window function:控製如何處理窗口內的元素結果
  • reduce function:控製如何對窗口內元素做聚合

當一個窗口被fire的時候,就需要通過window function來控製如何處理窗口的結果了。比如PassThroughWindowFunction啥也不做,對每一條結果都直接調用out.collect發送到下遊;而ReduceApplyWindowFunction則在這個時候針對窗口所有元素,調用reduce function進行聚合計算,再將計算的結果發射出去。

在上麵的例子中,由於沒有指定evictor,因此會生成WindowOperator,它的window function為InternalSingleValueWindowFunction,它提供了對PassThroughWindowFunction的代理。而reduce function則用於構造StateDescriptor:

            ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
                reduceFunction,
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

也就是說,對於sum這種情況,每來一條消息,都會調用reduce function,然後更新reducing state,最後窗口被觸發的時候,直接通過PassThroughWindowFunction輸出reducing state的結果。

接下來看一下WindowOperator.processElement方法:

     // 給元素分配窗口
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        if (windowAssigner instanceof MergingWindowAssigner) {
            // session window的處理邏輯
            // ...
      } else {
        // 遍曆每一個窗口
            for (W window: elementWindows) {
                // 如果窗口已經過期,直接忽略
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

           // 窗口被觸發了
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    // 輸出窗口結果
                    emitWindowContents(window, contents);
                }

           // 如果窗口需要purge,則清理狀態
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }

可以看到,大致的邏輯還是非常簡單明了的,關鍵在於這一行:

    TriggerResult triggerResult = triggerContext.onElement(element);

這裏針對不同的window,會有不同的trigger。其中ProcessingTime的都是ProcessingTimeTrigger。看下它的onElement方法:

        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;

可以看到,onElement方法始終返回TriggerResult.CONTINUE這個結果,不會觸發窗口的fire操作。那麼重點就是第一行了,它實際調用了WindowOperator.registerProcessingTimeTimer方法:

    internalTimerService.registerProcessingTimeTimer(window, time);

這是一個InternalTimerService對象,在WindowOperator.open方法中被創建:

    internalTimerService =
                getInternalTimerService("window-timers", windowSerializer, this);

它通過InternalTimeServiceManager.getInternalTimeService獲取:

        HeapInternalTimerService<K, N> timerService = timerServices.get(name);
        if (timerService == null) {
            timerService = new HeapInternalTimerService<>(totalKeyGroups,
                localKeyGroupRange, keyContext, processingTimeService);
            timerServices.put(name, timerService);
        }
        timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
        return timerService;

即創建了一個HeapInternalTimerService實例。

看一下這個類的成員,這裏省掉了序列化和反序列化相關的成員變量:

  // 目前使用SystemProcessingTimeService,包含了窗口到期回調線程池
    private final ProcessingTimeService processingTimeService;

    private final KeyContext keyContext;

    /**
     * Processing time相關的timer
     */
    private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;

    /**
     * Event time相關的timer
     */
    private final Set<InternalTimer<K, N>>[] eventTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;

    /**
     * 當前task的key-group range信息
     */
    private final KeyGroupsList localKeyGroupRange;
    private final int totalKeyGroups;
    private final int localKeyGroupRangeStartIdx;

    /**
     * 當前task的watermark,針對event time
     */
    private long currentWatermark = Long.MIN_VALUE;

    /**
     * 最接近的未被觸發的窗口的Scheduled Future
     * */
    private ScheduledFuture<?> nextTimer;

  // 窗口的回調函數,如果是WindowOperator,則會根據時間類型,回調WindowOperator.onEventTime或onProcessingTime方法
    private Triggerable<K, N> triggerTarget;

可以看到,存儲窗口timer的數據結構processingTimeTimersByKeyGroup或者eventTimeTimersByKeyGroup,跟存儲state的數據結構很像,也是根據當前task的key group range,創建一個數組。每一個key都會落到數組的一個下標,這個數組元素值是一個Set<InternalTimer<K,N>>,即Key + Window作為這個集合的key。

此外,還有一個processingTimeTimersQueue或者eventTimeTimersQueue,這是一個優先隊列,會存儲所有的 Key + Window的timer,主要作用就是用於快速取出最接近的未被觸發的窗口。

接下來看下這個類的registerProcessingTimeTimer方法:

    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);

        // 獲取數組下標下的Timer Set
        Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
        // 判斷是否已經添加過這個timer
        if (timerSet.add(timer)) {
            InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;

            processingTimeTimersQueue.add(timer);

            // 如果新添加的timer的窗口觸發時間早於nextTimer,則取消nextTimer的觸發,
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                // 注冊新添加的timer回調
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

processingTimeService.registerTimer(time, this)注冊回調,會調用SystemProcessingTimeService.registerTimer,這個方法很簡單,會計算出當前時間跟窗口邊界的delay,然後通過ScheduledExecutorService注冊一個定時的回調,其中target為HeapInternalTimerService本身。

舉例來說,當前時間為 2017-06-15 19:00:01,來了一條消息,那麼它被assign的窗口為[2017-06-15 19:00:00, 2017-06-15 19:00:10),計算出來的delay為9秒,因此在9秒之後,會觸發HeapInternalTimerService.onProcessingTime方法。

看下這個方法的代碼:

    public void onProcessingTime(long time) throws Exception {
        nextTimer = null;
        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

            Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);

            timerSet.remove(timer);
            processingTimeTimersQueue.remove();

            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null) {
            if (nextTimer == null) {
                nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
            }
        }
    }

這個方法也比較簡單,根據當前的窗口邊界,從processingTimeTimersQueue這個隊列中一個個取timer,隻要timer所對應的窗口邊界<=當前窗口邊界時間,就將timer從timer set中刪除,並調用triggerTarget.onProcessingTime(timer)觸發該窗口。最後設置並注冊nextTimer,即下一個最接近的窗口的回調。

對於KeyedStream下的窗口,實際上的情況是,在同一個窗口到達的多個不同的key,實際上窗口的邊界都是相同的,所以當一個key的窗口被觸發,同時也會觸發該Task上所有key group range的窗口。


看了對Processing Time的處理,接下來看看Event time的情況。

event time跟processing time稍有不同,因為它的窗口觸發時間,會依賴於watermark,並不是確定的(會有一個最遲的觸發時間)。隻要當watermark越過當前窗口的邊界,這個窗口就可以被觸發。因此它並沒有使用nextTimer這個變量來注冊和標識一個最接近的未被觸發的窗口。

注冊event time的timer時也比較簡單,隻是同時往timerSet和eventTimeTimersQueue中添加timer。

另外,Event Time的trigger使用的是EventTimeTrigger,它的onElement方法如下:

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

可以看到,每處理一個元素,都會拿當前元素所屬窗口的max timestamp去跟當前的watermark對比,如果小於watermark,說明已經越過窗口的邊界,則fire該窗口。

ctx.getCurrentWatermark()方法實際調用的是WindowOperator.WindowContext.getCurrentWatermark方法,返回的是HeapInternalTimerService的currentWatermark。

那麼,watermark是在哪裏被更新的呢?在HeapInternalTimerService.advanceWatermark方法中。代碼如下:

        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

            Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
            timerSet.remove(timer);
            eventTimeTimersQueue.remove();

            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }

可以看到,這個方法不僅更新了當前watermark,而且會用觸發processing time window很接近的邏輯,來觸發event time window。

這個方法在AbstractStreamOperator.processWatermark方法中被調用。processWatermark則在StreamInputProcessor中的ForwardingValveOutputHandler.handleWatermark方法中被調用。一個自下往上的反向調用鏈如下:

HeapInternalTimerService.advanceWatermark
  <-- AbstractStreamOperator.processWatermark
  <-- StreamInputProcessor.StatusWatermarkValve.handleWatermark
  <-- StatusWatermarkValve.inputWatermark
  <-- StreamInputProcessor.processInput

這樣,當一個processor收到的消息是一個watermark的時候,就會更新time service中的watermark。這裏也可以看到,對於普通的用戶消息,是不會主動更新watermark的。因此在Flink中,如果要使用Event Time,就必須實現一個發射watermark的策略:

  • 要麼數據源自己會發送watermark(實際情況中不大可能,除非用戶基於特定數據源自行封裝)
  • 要麼實現TimestampAssigner接口,定時往下遊發送watermark。這還是有一定的限製的。

還有一個需要考慮的問題是,一個Task可能接受到上遊多個channel的輸入,每個channel都會有watermark,但是每個channel的進度是不一樣的,這個時候該如何選擇和計算?舉例來說,如果有一個消費TT或kafka的Task,它會同時消費多個partition,每個partition的消費進度不一樣,當我們需要獲取到當前task的watermark的時候,應該取哪個值?

邏輯上來說,應該是取所有partition中最小的值。因為按照watermark的定義,這個值表示上遊的數據中,已經沒有比它更小的了。那麼看一下Flink是如何做的,在StatusWatermarkValve.inputWatermark方法中:

        if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
            long watermarkMillis = watermark.getTimestamp();

            // 如果當前輸入的watermark小於該channel的watermark,直接忽略
            if (watermarkMillis > channelStatuses[channelIndex].watermark) {
                channelStatuses[channelIndex].watermark = watermarkMillis;

                // 標識當前channel的watermark已經檢查過
                if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
                    channelStatuses[channelIndex].isWatermarkAligned = true;
                }

                // 取所有channel的watermark最小值並調用handleWatermark方法
                findAndOutputNewMinWatermarkAcrossAlignedChannels();
            }
        }

的確也是這麼做的。

最後總結一下,Flink的window,很靈活很強大,不過在有的時候還是會有一些問題:

  1. 當key的規模很大,而任務的並發不高的時候,會存大大量的timer對象,會消耗掉不少的內存。
  2. watermark需要用戶來定義實現,實現得不好很容易會得出錯誤的窗口計算結果,這點不太方便。
  3. watermark的計算策略過於單一,目前隻能取各channel的最小值,用戶無法自定義這一塊的邏輯。

最後更新:2017-10-19 15:03:27

  上一篇:go  黑客攻擊和廣告騷擾,你更擔心哪個?
  下一篇:go  Flink原理與實現:如何生成ExecutionGraph及物理執行圖