閱讀218 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flink - FLIP

https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

 

FLIP-1 : Fine Grained Recovery from Task Failures

 

When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks.

如果一個task失敗,當前需要完全停掉整個job恢複,這個明顯太重了;

proposal

簡單的方案,如果一個task失敗,就把和它相連的整條pipeline都重啟,但如果所有node都和該task相連,那還是要重啟整個job

image 、

 

但這個方案太naive了,是否可以盡量的減少重啟的範圍?

如果要隻重啟fail的task,以及後續的tasks,而不想重啟源,隻有cache 
每個node,把要發出去的Intermediate Result緩存下來,當一個node的task掛了後, 隻需要從上一層node把Intermediate Result從發出來,就可以避免從source重啟 
至於如何cache Intermediate Result,在memory還是disk,還是其他,隻是方案不同

Caching Intermediate Result

This type of data stream caches all elements since the latest checkpoint, possibly spilling them to disk, if the data exceeds the memory capacity.

When a downstream operator restarts from that checkpoint, it can simply re-read that data stream without requiring the producing operator to restart. Applicable to both batch (bounded) and streaming (unbounded) operations. When no checkpoints are used (batch), it needs to cache all data.

Memory-only caching Intermediate Result

Similar to the caching intermediate result, but discards sent data once the memory buffering capacity is exceeded. Acts as a “best effort” helper for recovery, which will bound recovery when checkpoints are frequent enough to hold data in between checkpoints in memory. On the other hand, it comes absolutely for free, it simply used memory that would otherwise not be used anyways.

Blocking Intermediate Result

This is applicable only to bounded intermediate results (batch jobs). It means that the consuming operator starts only after the entire bounded result has been produced. This bounds the cancellations/restarts downstream in batch jobs.

image

 

FLIP-2 Extending Window Function Metadata

Right now, in Flink a WindowFunction does not get a lot of information when a window fires. 

The signature of WindowFunction is this:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out);

}

 

i.e , the user code only has access to the key for which the window fired, the window for which we fired and the data of the window itself. In the future, we might like to extend the information available to the user function. We initially propose this as additional information:

  • Why/when did the window fire. Did it fire on time, i.e. when the watermark passed the end of the window. Did it fire early because of a speculative early trigger or did it fire on late-arriving data.

  • How many times did we fire before for the current window. This would probably be an increasing index, such that each firing for a window can be uniquely identified.

當前在window functions中暴露出來的信息不夠,需要給出更多的信息,比如why,when fire等

FLIP-3 - Organization of Documentation

 

FLIP-4 : Enhance Window Evictor

Right now, the ability of Window Evictor is limited

  • The Evictor is called only before the WindowFunction. (There can be use cases where the elements have to be evicted after the WindowFunction is applied)
  • Elements are evicted only from the beginning of the Window. (There can be cases where we need to allow eviction of elements from anywhere within in the Window as per the eviction logic that user wish to implement)

當前Evictor隻是在WindowFunction 之前被執行,是否可以在WindowFunction 之後被執行?

當前的接口隻是從beginning of the Window開始,是否可以從任意位置開始evict

 

FLIP-5: Only send data to each taskmanager once for broadcasts

Problem:

We experience some unexpected increase of data sent over the network for broadcasts with increasing number of slots per task manager.

降低在廣播時,發送的冗餘數據

當前狀況是,

image

要達到的效果是,

image

每個taskmanager隻發送一次

 

FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.

核心想法是,把jobmanager的工作分離出來

增加兩個新的模塊, ResourceManagerdispatcher

The ResourceManager (introduced in Flink 1.1) is the cluster-manager-specific component. There is a generic base class, and specific implementations for:

  • YARN

  • Mesos

  • Standalone-multi-job (Standalone mode)

  • Self-contained-single-job (Docker/Kubernetes)

顯然對於不同的資源管理平台,隻需要實現不同的ResourceManager

image

於是JobManager, TaskManager和ResourceManager之間的關係就變成這樣

TaskManager,向ResourceManager進行注冊,並定期匯報solts的情況

JobManager會向ResourceManager請求slot,然後ResourceManager會選擇TaskManager,告訴它向某JobManager提供slots

然後該TaskManager會直接聯係JobManager去提供slots

同時JobManager會有slot pool,來保持申請到的slots

The SlotPool is a modification of what is currently the InstanceManager.

這樣就算ResourceManager掛掉了,JobManager仍然可以繼續使用已經申請的slots

 

The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.

image

In the future run, the dispatcher will also help with the following aspects:

  • The dispatcher is a cross-job service that can run a long-lived web dashboard

  • Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters

  • The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications

把Dispatcher從JobManager中分離出來的好處,

首先dispatcher是可以跨cluster的,是個long-lived web dashboard,比如後麵如果一個cluster或jobmanager掛了,我可以簡單的spawn到另外一個 
第二,client到dispatcher是基於http的很容易穿過防火牆

第三,dispatcher可以當作類似proxy的作用,比如authentications

所以對於不同的cluster manager的具體架構如下,

Yarn,

Compared to the state in Flink 1.1, the new Flink-on-YARN architecture offers the following benefits:

  • The client directly starts the Job in YARN, rather than bootstrapping a cluster and after that submitting the job to that cluster. The client can hence disconnect immediately after the job was submitted

  • All user code libraries and config files are directly in the Application Classpath, rather than in the dynamic user code class loader

  • Containers are requested as needed and will be released when not used any more

  • The “as needed” allocation of containers allows for different profiles of containers (CPU / memory) to be used for different operators

image

 

這個架構把整個Flink都托管在Yarn內部

好處,

你不需要先拉起flink集群,然後再提交job,隻需要直接提交job;Yarn的ResourcManager會先拉起Application Master,其中包含Resource Manager和Job Manager;然後當Flink resource manager需要資源時,會先和YARN ResourceManager請求,它會去創建container,其中包含TaskManager;

 

Mesos,

這個架構和Yarn類似,

image

Mesos-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside a regular Mesos container. The Dispatcher is responsible for monitoring and restarting those containers in case they fail. The Dispatcher itself must be made highly available by a Mesos service like Marathon

 

Standalone,

 

The Standalone Setup is should keep compatibility with current Standalone Setups.

The role of the long running JobManager is now  a “local dispatcher” process that spawns JobManagers with Jobs internally. The ResourceManager lives across jobs and handles TaskManager registration.

For highly-available setups, there are multiple dispatcher processes, competing for being leader, similar as the currently the JobManagers do.

image

 

Component Design and Details

更具體的步驟,

image

 

FLIP-7: Expose metrics to WebInterface

With the introduction of the metric system it is now time to make it easily accessible to users. As the WebInterface is the first stop for users for any details about Flink, it seems appropriate to expose the gathered metrics there as well.

The changes can be roughly broken down into 4 steps:

  1.     Create a data-structure on the Job-/TaskManager containing a metrics snapshot
  2.     Transfer this snapshot to the WebInterface back-end
  3.     Store the snapshot in the WebRuntimeMonitor in an easily accessible way
  4.     Expose the stored metrics to the WebInterface via REST API

 

FLIP-8: Rescalable Non-Partitioned State

要解決的問題是,當dynamic scaling的時候,如何解決狀態的問題

如果沒有狀態,動態的scaling,需要做的隻是把流量分到新的operator的並發上

但是對於狀態,當增加並發的時候,需要把狀態切分,而減少並發的時候,需要把狀態合並

這個就比較麻煩了

同時在Flink裏麵,狀態分為3部分,operator state, the function state and key-value states

其中對於key-value states的方案相對簡單一些,https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#

這裏基本的思想,就是更細粒度的checkpoint; 
原先是以task級別為粒度,這樣加載的時候,隻能加載一個task,如果一個task擴成2個,task級別的checkpoint也需要切分 
而采用更細粒度的checkpoint獨立存儲,而不依賴task,這樣就可以獨立於task進行調度

比如對於key-value,創建一個叫key groups的概念,以key group作為一個checkpoint的單元

In order to efficiently distribute key-value states across the cluster, they should be grouped into key groups. Each key group represents a subset of the key space and is checkpointed as an independent unit. The key groups can then be re-assigned to different tasks if the DOP changes.

這樣當發生增減operator的並發度的時候,隻需要以key group為單位調度到新的operator上,同時在該operator上恢複相應的checkpoint即可,如圖

image

然後,對於non-partitioned operator and function state,這個問題怎麼解

比如對於kafkasource,4個partitions,2個source的並發

image

scaling down後,就會出現下圖,左邊的情況,因為隻有s1 task了,他隻會load他自己的checkpoint,而之前s2的checkpoint就沒人管了 
而理論上,我們是要達到右邊的情況的

image

scaling up後,也會出現下圖左邊的case,因為S1,S2加載了原來的checkpoint,但是當前其實partition3,partition4已經不再分配到s2了

image

 

思路還是一樣,把checkpoint的粒度變細,而不依賴於task,

image

 

FLIP-9: Trigger DSL

當前支持的trigger方式不夠靈活,而且對late element隻能drop,需要設計更為靈活和合理的DSL,用於描述Trigger policy

 

FLIP-10: Unify Checkpoints and Savepoints

Currently checkpoints and savepoints are handled in slightly different ways with respect to storing and restoring them. The main differences are that savepoints 1) are manually triggered, 2) persist checkpoint meta data, and 3) are not automatically discarded.

With this FLIP, I propose to allow to unify checkpoints and savepoints by allowing savepoints to be triggered automatically.

 

FLIP-11: Table API Stream Aggregations

The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support:

  • Group-window aggregates, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group.

  • Row-window aggregates, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows.

Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables.

Since time-windowed aggregates will be the first operation that require the definition of time, this FLIP does also discuss how the Table API handles time characteristics, timestamps, and watermarks.

 

FLIP-12: Asynchronous I/O Design and Implementation

I/O access, for the most case, is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. Starting multiple threads may be an option to handle this problem, but the drawbacks are obvious: The programming model for end users may become more complicated as they have to implement thread model in the operator. Furthermore, they have to pay attention to coordinate with checkpointing.

流最終會碰到外部存儲,那就會有IO瓶頸,比如寫數據庫,那麼會阻塞整個流

這個問題怎麼解?可以用多線程,這個會讓編程模型比較複雜,說白了,不夠優雅

所以解決方法其實就是用reactor模型,典型的解決I/O等待的方法

AsyncFunction: Async I/O will be triggered in AsyncFunction.

AsyncWaitOperator: An StreamOperator which will invoke AsyncFunction.

AsyncCollector: For each input streaming record, an AsyncCollector will be created and passed into user's callback to get the async i/o result.

AsyncCollectorBuffer: A buffer to keep all AsyncCollectors.

Emitter Thread: A working thread in AsyncCollectorBuffer, being signalled while some of AsyncCollectors have finished async i/o and emitting results to the following opeartors.

對於普通的operator,調用function,然後把數據用collector發送出去

但對於AsyncWaitOperator,無法直接得到結果,所以把AsyncCollector傳入callback,當function觸發callback的時候,再emit數據

但這樣有個問題,emit的順序是取決於,執行速度,如果對emit順序沒有要求應該可以

但如果模擬同步的行為,理論上,emit的順序應該等同於收到的順序

這樣就需要一個buffer,去cache所有的AsyncCollector,即AsyncCollectorBuffer

當callback被執行時,某個AsyncCollector會被填充上數據,這個時候被mark成可發送

但是否發送,需要依賴一個外部的emitter

他會check,並決定是否真正的emit這個AsyncCollector,比如check是否它之前的所有的都已經emit,否則需要等待

這裏還需要考慮的是, watermark,它必須要等到前麵的數據都已經被成功emit後,才能被emit;這樣才能保證一致性

最後更新:2017-04-07 21:05:50

  上一篇:go Flink - DataStream
  下一篇:go Flink - FlinkKafkaConsumer08