閱讀266 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flink – window operator

參考,

https://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

https://wuchong.me/blog/2016/06/06/flink-internals-session-window/ 

 

WindowOperator

window operator通過WindowAssigner和Trigger來實現它的邏輯

當一個element到達時,通過KeySelector先assign一個key,並且通過WindowAssigner assign若幹個windows,這樣這個element會被放入若幹個pane

一個pane會存放所有相同key和相同window的elements

複製代碼
/**
 * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
 * {@link Trigger}.
 *
 * <p>
 * When an element arrives it gets assigned a key using a {@link KeySelector} and it gets
 * assigned to zero or more windows using a {@link WindowAssigner}. Based on this, the element
 * is put into panes. A pane is the bucket of elements that have the same key and same
 * {@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the
 * {@code WindowAssigner}.
 *
 * <p>
 * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
 * the contents of the pane should be processed to emit results. When a trigger fires,
 * the given {@link InternalWindowFunction} is invoked to produce the results that are emitted for
 * the pane to which the {@code Trigger} belongs.
 *
 * @param <K> The type of key returned by the {@code KeySelector}.
 * @param <IN> The type of the incoming elements.
 * @param <OUT> The type of elements emitted by the {@code InternalWindowFunction}.
 * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
 */
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {

    // ------------------------------------------------------------------------
    // Configuration values and user functions
    // ------------------------------------------------------------------------

    protected final WindowAssigner<? super IN, W> windowAssigner;

    protected final KeySelector<IN, K> keySelector;

    protected final Trigger<? super IN, ? super W> trigger;

    protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;

    /**
     * The allowed lateness for elements. This is used for:
     * <ul>
     *     <li>Deciding if an element should be dropped from a window due to lateness.
     *     <li>Clearing the state of a window if the system time passes the
     *         {@code window.maxTimestamp + allowedLateness} landmark.
     * </ul>
     */
    protected final long allowedLateness; //允許late多久,即當watermark已經觸發後


    /**
     * To keep track of the current watermark so that we can immediately fire if a trigger
     * registers an event time callback for a timestamp that lies in the past.
     */
    protected transient long currentWatermark = Long.MIN_VALUE;

    protected transient Context context = new Context(null, null); //Trigger Context

    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; //隻為獲取getCurrentProcessingTime

    // ------------------------------------------------------------------------
    // State that needs to be checkpointed
    // ------------------------------------------------------------------------

    /**
     * Processing time timers that are currently in-flight.
     */
    protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; //Timer用於存儲timestamp,key,window, queue按時間排序

    /**
     * Current waiting watermark callbacks.
     */
    protected transient Set<Timer<K, W>> watermarkTimers;
    protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue; //

    protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey; //用於記錄merge後的stateWindow和window的對應關係
複製代碼

 

對於window operator而已,最關鍵的是WindowAssigner和Trigger

 

WindowAssigner

WindowAssigner,用於指定一個tuple應該被分配到那些windows去

借用個圖,可以看出有多少種WindowAssigner

image

對於WindowAssigner,最關鍵的接口是,assignWindows

為一個element,分配一組windows, Collection<W>

複製代碼
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * Returns a {@code Collection} of windows that should be assigned to the element.
     *
     * @param element The element to which windows should be assigned.
     * @param timestamp The timestamp of the element.
     * @param context The {@link WindowAssignerContext} in which the assigner operates.
     */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);

    /**
     * Returns the default trigger associated with this {@code WindowAssigner}.
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

    /**
     * Returns a {@link TypeSerializer} for serializing windows that are assigned by
     * this {@code WindowAssigner}.
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
複製代碼

實際看下,具體WindowAssigner的實現

複製代碼
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        final long now = context.getCurrentProcessingTime();
        long start = now - (now % size);
        return Collections.singletonList(new TimeWindow(start, start + size)); //很簡單,分配一個TimeWindow
    }
    
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create(); //默認給出的是ProcessingTimeTrigger,如其名
    }
複製代碼
複製代碼
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {

    private final long size;
    private final long slide;

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = timestamp - timestamp % slide;
            for (long start = lastStart;
                start > timestamp - size;
                start -= slide) {
                windows.add(new TimeWindow(start, start + size)); //可以看到這裏會assign多個TimeWindow,因為是slide
            }
            return windows;
        } else {

        }
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
複製代碼

 

Trigger, Evictor

參考,Flink – Trigger,Evictor

 

下麵看看3個主要的接口,分別觸發,onElement,onEventTime,onProcessingTime

processElement

處理element到達的邏輯,觸發onElement

複製代碼
public void processElement(StreamRecord<IN> element) throws Exception {
    Collection<W> elementWindows = windowAssigner.assignWindows(  //通過WindowAssigner為element分配一係列windows
        element.getValue(), element.getTimestamp(), windowAssignerContext);

    final K key = (K) getStateBackend().getCurrentKey();

    if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow
        //.......
    } else { //如果是普通window
        for (W window: elementWindows) {

            // drop if the window is already late
            if (isLate(window)) { //late data的處理,默認是丟棄  
                continue;
            }

            AppendingState<IN, ACC> windowState = getPartitionedState( //從backend中取出該window的狀態,就是buffer的element
                window, windowSerializer, windowStateDescriptor);
            windowState.add(element.getValue()); //把當前的element加入buffer state

            context.key = key;
            context.window = window; //context的設計相當tricky和晦澀

            TriggerResult triggerResult = context.onElement(element); //觸發onElment,得到triggerResult

            if (triggerResult.isFire()) { //對triggerResult做各種處理
                ACC contents = windowState.get();
                if (contents == null) {
                    continue;
                }
                fire(window, contents); //如果fire,真正去計算窗口中的elements
            }

            if (triggerResult.isPurge()) {
                cleanup(window, windowState, null); //purge,即去cleanup elements
            } else {
                registerCleanupTimer(window);
            }
        }
    }
}
複製代碼

 

判斷是否是late data的邏輯

複製代碼
protected boolean isLate(W window) {
    return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));
}
private long cleanupTime(W window) {
    long cleanupTime = window.maxTimestamp() + allowedLateness; //allowedLateness; 
    return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
}
複製代碼

 

fire邏輯

private void fire(W window, ACC contents) throws Exception {
    timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
    userFunction.apply(context.key, context.window, contents, timestampedCollector);
}

 

processWatermark

處理watermark,onEvent觸發

複製代碼
@Override
public void processWatermark(Watermark mark) throws Exception {
    boolean fire;
    do {
        Timer<K, W> timer = watermarkTimersQueue.peek(); //這叫watermarkTimersQueue,是否有些歧義,叫eventTimerQueue更好理解些
        if (timer != null && timer.timestamp <= mark.getTimestamp()) {
            fire = true;

            watermarkTimers.remove(timer);
            watermarkTimersQueue.remove();

            context.key = timer.key;
            context.window = timer.window;
            setKeyContext(timer.key);  //stateBackend.setCurrentKey(key);

            AppendingState<IN, ACC> windowState;
            MergingWindowSet<W> mergingWindows = null;

            if (windowAssigner instanceof MergingWindowAssigner) { //MergingWindow
                mergingWindows = getMergingWindowSet();
                W stateWindow = mergingWindows.getStateWindow(context.window);
                if (stateWindow == null) {
                    // then the window is already purged and this is a cleanup
                    // timer set due to allowed lateness that has nothing to clean,
                    // so it is safe to just ignore
                    continue;
                }
                windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
            } else { //普通window
                windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); //取得window的state
            }

            ACC contents = windowState.get();
            if (contents == null) {
                // if we have no state, there is nothing to do
                continue;
            }

            TriggerResult triggerResult = context.onEventTime(timer.timestamp); //觸發onEvent
            if (triggerResult.isFire()) {
                fire(context.window, contents);
            }

            if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
                cleanup(context.window, windowState, mergingWindows);
            }

        } else {
            fire = false;
        }
    } while (fire); //如果fire為true,繼續看下個waterMarkTimer是否需要fire

    output.emitWatermark(mark); //把waterMark傳遞下去

    this.currentWatermark = mark.getTimestamp(); //更新currentWaterMark
}
複製代碼

 

trigger

首先,這個函數的命名有問題,為何和前麵的process…不匹配

這個是用來觸發onProcessingTime,這個需要依賴係統時間的定時器來觸發,邏輯和processWatermark基本等同,隻是觸發條件不一樣

複製代碼
@Override
public void trigger(long time) throws Exception {
    boolean fire;

    //Remove information about the triggering task
    processingTimeTimerFutures.remove(time);
    processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));

    do {
        Timer<K, W> timer = processingTimeTimersQueue.peek();
        if (timer != null && timer.timestamp <= time) {
            fire = true;

            processingTimeTimers.remove(timer);
            processingTimeTimersQueue.remove();

            context.key = timer.key;
            context.window = timer.window;
            setKeyContext(timer.key);

            AppendingState<IN, ACC> windowState;
            MergingWindowSet<W> mergingWindows = null;

            if (windowAssigner instanceof MergingWindowAssigner) {
                mergingWindows = getMergingWindowSet();
                W stateWindow = mergingWindows.getStateWindow(context.window);
                if (stateWindow == null) {
                    // then the window is already purged and this is a cleanup
                    // timer set due to allowed lateness that has nothing to clean,
                    // so it is safe to just ignore
                    continue;
                }
                windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
            } else {
                windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
            }

            ACC contents = windowState.get();
            if (contents == null) {
                // if we have no state, there is nothing to do
                continue;
            }

            TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
            if (triggerResult.isFire()) {
                fire(context.window, contents);
            }

            if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
                cleanup(context.window, windowState, mergingWindows);
            }

        } else {
            fire = false;
        }
    } while (fire);
}
複製代碼

 

EvictingWindowOperator

Evicting對於WindowOperator而言,就是多了Evictor

複製代碼
private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {
    timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());

    // Work around type system restrictions...
    int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); //執行evict

    FluentIterable<IN> projectedContents = FluentIterable
        .from(contents)
        .skip(toEvict)
        .transform(new Function<StreamRecord<IN>, IN>() {
            @Override
            public IN apply(StreamRecord<IN> input) {
                return input.getValue();
            }
        });
    userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
}
複製代碼

關鍵的邏輯就是在fire的時候,在apply function之前,會先remove需要evict的elements

最後更新:2017-04-07 21:23:50

  上一篇:go kafka - advertised.listeners and listeners
  下一篇:go Flink 1.1 – ResourceManager