Flink原理與實現:Window的實現原理
Flink原理與實現係列文章:
Flink 原理與實現:架構和拓撲概覽
Flink 原理與實現:如何生成 StreamGraph
Flink 原理與實現:如何生成 JobGraph
Flink原理與實現:如何生成ExecutionGraph及物理執行圖
Flink原理與實現:Operator Chain原理
Flink原理與實現:詳解Flink中的狀態管理
在閱讀本文之前,請先閱讀Flink 原理與實現:Window機製,這篇文章從用戶的角度,對Window做了比較詳細的分析,而本文主要是從Flink框架的實現層麵,對Window做另一個角度的分析。
首先看一個比較簡單的情況,假設我們在一個KeyedStream上做了一個10秒鍾的tumbling processing time window,也就是說,每隔10秒鍾窗口會觸發一次,即:
dataStream.keyBy(0).timeWindow(Time.seconds(10)).sum(1);
在研究源碼之前,我們可以在腦子裏大概想象一下它應該會怎麼處理:
- 給定一條數據,會給這條數據assign windows,在這個例子中,因為是翻滾窗口,所以隻會assign出一個窗口。
- assign了窗口之後,我們就知道這條消息對應的窗口起始時間,這裏比較重要的是窗口的右邊界,即窗口過期時間。
- 我們可以在窗口的過期時間之上,注冊一個Scheduled Future,到時間之後讓它自動回調窗口的聚合函數,即觸發窗口內的數據計算。
上麵的第3步,針對KeyedStream,需要再擴展一下,針對一條數據,我們應該注冊一個基於Key + Window的Scheduled Future,到時間之後隻觸發對於這個key的數據計算。當然,這裏我們不自覺地會想,當key的數目非常大的時候,可能會創建出大量的Future,這會是一個問題。
腦子中大致有上麵的思路之後,我們來看一下Flink的實現。
首先,KeyedStream.timeWindow
方法會生成一個WindowedStream,sum
則是我們的aggregator,因此在WindowedStream中,實際調用了aggregate(new SumAggregator(...))
,然後一層層調下來,目標就是生成一個WindowOperator。
在Flink中,根據是否配置了evictor,會生成兩種不同的WindowOperator:
- 有evictor:這種情況下意味著用戶需要精確控製如何evict窗口的元素,因此所有的數據都會先緩存起來。此時會生成
EvictingWindowOperator
- 無evictor:這種情況下,通過指定一個reduce function,來一條數據就會進行reduce,當到達窗口邊界之後,直接輸出結果就可以了。此時會生成
WindowOperator
。
不管哪一種operator,都需要指定兩個function:
- window function:控製如何處理窗口內的元素結果
- reduce function:控製如何對窗口內元素做聚合
當一個窗口被fire的時候,就需要通過window function來控製如何處理窗口的結果了。比如PassThroughWindowFunction
啥也不做,對每一條結果都直接調用out.collect
發送到下遊;而ReduceApplyWindowFunction
則在這個時候針對窗口所有元素,調用reduce function進行聚合計算,再將計算的結果發射出去。
在上麵的例子中,由於沒有指定evictor,因此會生成WindowOperator
,它的window function為InternalSingleValueWindowFunction
,它提供了對PassThroughWindowFunction
的代理。而reduce function則用於構造StateDescriptor:
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
reduceFunction,
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
也就是說,對於sum
這種情況,每來一條消息,都會調用reduce function,然後更新reducing state,最後窗口被觸發的時候,直接通過PassThroughWindowFunction
輸出reducing state的結果。
接下來看一下WindowOperator.processElement
方法:
// 給元素分配窗口
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
if (windowAssigner instanceof MergingWindowAssigner) {
// session window的處理邏輯
// ...
} else {
// 遍曆每一個窗口
for (W window: elementWindows) {
// 如果窗口已經過期,直接忽略
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
// 窗口被觸發了
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
// 輸出窗口結果
emitWindowContents(window, contents);
}
// 如果窗口需要purge,則清理狀態
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
可以看到,大致的邏輯還是非常簡單明了的,關鍵在於這一行:
TriggerResult triggerResult = triggerContext.onElement(element);
這裏針對不同的window,會有不同的trigger。其中ProcessingTime的都是ProcessingTimeTrigger
。看下它的onElement
方法:
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
可以看到,onElement
方法始終返回TriggerResult.CONTINUE這個結果,不會觸發窗口的fire操作。那麼重點就是第一行了,它實際調用了WindowOperator.registerProcessingTimeTimer
方法:
internalTimerService.registerProcessingTimeTimer(window, time);
這是一個InternalTimerService
對象,在WindowOperator.open
方法中被創建:
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
它通過InternalTimeServiceManager.getInternalTimeService
獲取:
HeapInternalTimerService<K, N> timerService = timerServices.get(name);
if (timerService == null) {
timerService = new HeapInternalTimerService<>(totalKeyGroups,
localKeyGroupRange, keyContext, processingTimeService);
timerServices.put(name, timerService);
}
timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
return timerService;
即創建了一個HeapInternalTimerService
實例。
看一下這個類的成員,這裏省掉了序列化和反序列化相關的成員變量:
// 目前使用SystemProcessingTimeService,包含了窗口到期回調線程池
private final ProcessingTimeService processingTimeService;
private final KeyContext keyContext;
/**
* Processing time相關的timer
*/
private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
/**
* Event time相關的timer
*/
private final Set<InternalTimer<K, N>>[] eventTimeTimersByKeyGroup;
private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;
/**
* 當前task的key-group range信息
*/
private final KeyGroupsList localKeyGroupRange;
private final int totalKeyGroups;
private final int localKeyGroupRangeStartIdx;
/**
* 當前task的watermark,針對event time
*/
private long currentWatermark = Long.MIN_VALUE;
/**
* 最接近的未被觸發的窗口的Scheduled Future
* */
private ScheduledFuture<?> nextTimer;
// 窗口的回調函數,如果是WindowOperator,則會根據時間類型,回調WindowOperator.onEventTime或onProcessingTime方法
private Triggerable<K, N> triggerTarget;
可以看到,存儲窗口timer的數據結構processingTimeTimersByKeyGroup
或者eventTimeTimersByKeyGroup
,跟存儲state的數據結構很像,也是根據當前task的key group range,創建一個數組。每一個key都會落到數組的一個下標,這個數組元素值是一個Set<InternalTimer<K,N>>
,即Key + Window作為這個集合的key。
此外,還有一個processingTimeTimersQueue
或者eventTimeTimersQueue
,這是一個優先隊列,會存儲所有的 Key + Window的timer,主要作用就是用於快速取出最接近的未被觸發的窗口。
接下來看下這個類的registerProcessingTimeTimer
方法:
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
// 獲取數組下標下的Timer Set
Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
// 判斷是否已經添加過這個timer
if (timerSet.add(timer)) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
processingTimeTimersQueue.add(timer);
// 如果新添加的timer的窗口觸發時間早於nextTimer,則取消nextTimer的觸發,
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
// 注冊新添加的timer回調
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}
processingTimeService.registerTimer(time, this)
注冊回調,會調用SystemProcessingTimeService.registerTimer
,這個方法很簡單,會計算出當前時間跟窗口邊界的delay,然後通過ScheduledExecutorService注冊一個定時的回調,其中target為HeapInternalTimerService本身。
舉例來說,當前時間為 2017-06-15 19:00:01,來了一條消息,那麼它被assign的窗口為[2017-06-15 19:00:00, 2017-06-15 19:00:10),計算出來的delay為9秒,因此在9秒之後,會觸發HeapInternalTimerService.onProcessingTime
方法。
看下這個方法的代碼:
public void onProcessingTime(long time) throws Exception {
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
timerSet.remove(timer);
processingTimeTimersQueue.remove();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null) {
if (nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
}
這個方法也比較簡單,根據當前的窗口邊界,從processingTimeTimersQueue這個隊列中一個個取timer,隻要timer所對應的窗口邊界<=當前窗口邊界時間,就將timer從timer set中刪除,並調用triggerTarget.onProcessingTime(timer)
觸發該窗口。最後設置並注冊nextTimer,即下一個最接近的窗口的回調。
對於KeyedStream下的窗口,實際上的情況是,在同一個窗口到達的多個不同的key,實際上窗口的邊界都是相同的,所以當一個key的窗口被觸發,同時也會觸發該Task上所有key group range的窗口。
看了對Processing Time的處理,接下來看看Event time的情況。
event time跟processing time稍有不同,因為它的窗口觸發時間,會依賴於watermark,並不是確定的(會有一個最遲的觸發時間)。隻要當watermark越過當前窗口的邊界,這個窗口就可以被觸發。因此它並沒有使用nextTimer這個變量來注冊和標識一個最接近的未被觸發的窗口。
注冊event time的timer時也比較簡單,隻是同時往timerSet和eventTimeTimersQueue中添加timer。
另外,Event Time的trigger使用的是EventTimeTrigger
,它的onElement
方法如下:
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
可以看到,每處理一個元素,都會拿當前元素所屬窗口的max timestamp去跟當前的watermark對比,如果小於watermark,說明已經越過窗口的邊界,則fire該窗口。
ctx.getCurrentWatermark()
方法實際調用的是WindowOperator.WindowContext.getCurrentWatermark
方法,返回的是HeapInternalTimerService
的currentWatermark。
那麼,watermark是在哪裏被更新的呢?在HeapInternalTimerService.advanceWatermark
方法中。代碼如下:
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
timerSet.remove(timer);
eventTimeTimersQueue.remove();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
可以看到,這個方法不僅更新了當前watermark,而且會用觸發processing time window很接近的邏輯,來觸發event time window。
這個方法在AbstractStreamOperator.processWatermark
方法中被調用。processWatermark
則在StreamInputProcessor
中的ForwardingValveOutputHandler.handleWatermark
方法中被調用。一個自下往上的反向調用鏈如下:
HeapInternalTimerService.advanceWatermark
<-- AbstractStreamOperator.processWatermark
<-- StreamInputProcessor.StatusWatermarkValve.handleWatermark
<-- StatusWatermarkValve.inputWatermark
<-- StreamInputProcessor.processInput
這樣,當一個processor收到的消息是一個watermark的時候,就會更新time service中的watermark。這裏也可以看到,對於普通的用戶消息,是不會主動更新watermark的。因此在Flink中,如果要使用Event Time,就必須實現一個發射watermark的策略:
- 要麼數據源自己會發送watermark(實際情況中不大可能,除非用戶基於特定數據源自行封裝)
- 要麼實現
TimestampAssigner
接口,定時往下遊發送watermark。這還是有一定的限製的。
還有一個需要考慮的問題是,一個Task可能接受到上遊多個channel的輸入,每個channel都會有watermark,但是每個channel的進度是不一樣的,這個時候該如何選擇和計算?舉例來說,如果有一個消費TT或kafka的Task,它會同時消費多個partition,每個partition的消費進度不一樣,當我們需要獲取到當前task的watermark的時候,應該取哪個值?
邏輯上來說,應該是取所有partition中最小的值。因為按照watermark的定義,這個值表示上遊的數據中,已經沒有比它更小的了。那麼看一下Flink是如何做的,在StatusWatermarkValve.inputWatermark
方法中:
if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
long watermarkMillis = watermark.getTimestamp();
// 如果當前輸入的watermark小於該channel的watermark,直接忽略
if (watermarkMillis > channelStatuses[channelIndex].watermark) {
channelStatuses[channelIndex].watermark = watermarkMillis;
// 標識當前channel的watermark已經檢查過
if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
channelStatuses[channelIndex].isWatermarkAligned = true;
}
// 取所有channel的watermark最小值並調用handleWatermark方法
findAndOutputNewMinWatermarkAcrossAlignedChannels();
}
}
的確也是這麼做的。
最後總結一下,Flink的window,很靈活很強大,不過在有的時候還是會有一些問題:
- 當key的規模很大,而任務的並發不高的時候,會存大大量的timer對象,會消耗掉不少的內存。
- watermark需要用戶來定義實現,實現得不好很容易會得出錯誤的窗口計算結果,這點不太方便。
- watermark的計算策略過於單一,目前隻能取各channel的最小值,用戶無法自定義這一塊的邏輯。
最後更新:2017-10-19 15:03:27