364
技術社區[雲棲]
Peeking into Apache Flink's Engine Room
Join Processing in Apache Flink
In this blog post, we cut through Apache Flink’s layered architecture and take a look at its internals with a focus on how it handles joins. Specifically, I will
- show how easy it is to join data sets using Flink’s fluent APIs,
- discuss basic distributed join strategies, Flink’s join implementations, and its memory management,
- talk about Flink’s optimizer that automatically chooses join strategies,
- show some performance numbers for joining data sets of different sizes, and finally
- briefly discuss joining of co-located and pre-sorted data sets
這篇blog會從flink內部詳細看看是如何處理join的,尤其,
如何用Flink API,簡單的實現join
討論基本的join策略,flink join的實現和內存管理
討論flink的優化器,如何自動選擇join策略
顯示不同數據size上的性能數據,
最後簡單討論一下co-located和預排序的數據集的join問題
How do I join with Flink?
Flink provides fluent APIs in Java and Scala to write data flow programs.
// define your data types case class PageVisit(url: String, ip: String, userId: Long) case class User(id: Long, name: String, email: String, country: String) // get your data from somewhere val visits: DataSet[PageVisit] = ... val users: DataSet[User] = ... // filter the users data set val germanUsers = users.filter((u) => u.country.equals("de")) // join data sets val germanVisits: DataSet[(PageVisit, User)] = // equi-join condition (PageVisit.userId = User.id) visits.join(germanUsers).where("userId").equalTo("id")
可以看到用flink api實現join還是比較簡單的
How does Flink join my data?
Flink如何做join,分兩個階段,
Ship Strategy
Local Strategy
Ship Strategy
對於分布式數據,數據是散落在各個partition中的,要做join,首先要把相同join key的數據放到一起,這個過程稱為Ship Strategy
Flink的Ship Strategy有兩種,
Repartition-Repartition strategy (RR)
The Repartition-Repartition strategy partitions both inputs, R and S, on their join key attributes using the same partitioning function.
比如,R,S做join,我們就用一個相同partition函數,按join key,把R,S的所有分區,shuffle到3個local join上
這樣做需要把full shuffle of both data sets over the network
Broadcast-Forward strategy (BF)
The Broadcast-Forward strategy sends one complete data set (R) to each parallel instance that holds a partition of the other data set (S)
S不動,把R廣播到所有的join實例上;顯然如果R足夠小,這樣做是很有效的
Flink’s Memory Management
對join的場景,很容易想到,在ship階段,需要shuffle大量的數據,內存是否會OutOfMemoryException
,或是否會發生full gc
Flink handles this challenge by actively managing its memory.
When a worker node (TaskManager) is started, it allocates a fixed portion (70% by default) of the JVM’s heap memory that is available after initialization as 32KB byte arrays.
This design has several nice properties.
First, the number of data objects on the JVM heap is much lower resulting in less garbage collection pressure.
Second, objects on the heap have a certain space overhead and the binary representation is more compact. Especially data sets of many small elements benefit from that.
Third, an algorithm knows exactly when the input data exceeds its working memory and can react by writing some of its filled byte arrays to the worker’s local filesystem. After the content of a byte array is written to disk, it can be reused to process more data. Reading data back into memory is as simple as reading the binary data from the local filesystem.
The following figure illustrates Flink’s memory management.
Flink是主動管理JVM的heap內存的,會申請一組32KB的memory segments,給各種算法用
好處就是,這樣減少GC的壓力;而且數據是序列化成binary後存儲,overhead很小;自己管理內存,內存不夠的時候,可以知道並寫磁盤文件
Local Strategies
After the data has been distributed across all parallel join instances using either a Repartition-Repartition or Broadcast-Forward ship strategy, each instance runs a local join algorithm to join the elements of its local partition.
Flink’s runtime features two common join strategies to perform these local joins:
- the Sort-Merge-Join strategy (SM) and
- the Hybrid-Hash-Join strategy (HH).
在ship strategy,我們已經把相同join key的數據,放到同一個local partition上,現在要做的隻是run一個local join算法
Flink有兩種local join算法,
The Sort-Merge-Join works by first sorting both input data sets on their join key attributes (Sort Phase) and merging the sorted data sets as a second step (Merge Phase).
The sort is done in-memory if the local partition of a data set is small enough.
Otherwise, an external merge-sort is done by collecting data until the working memory is filled, sorting it, writing the sorted data to the local filesystem, and starting over by filling the working memory again with more incoming data. After all input data has been received, sorted, and written as sorted runs to the local file system, a fully sorted stream can be obtained. This is done by reading the partially sorted runs from the local filesystem and sort-merging the records on the fly. Once the sorted streams of both inputs are available, both streams are sequentially read and merge-joined in a zig-zag fashion by comparing the sorted join key attributes, building join element pairs for matching keys, and advancing the sorted stream with the lower join key. The figure below shows how the Sort-Merge-Join strategy works.
這個算法,如其名,兩個過程,sort,merge
首先是,sort,如果數據足夠小,在內存中直接sort,這個比較簡單
如果數據比較大,需要用到外排算法,比如內存可以排序1G數據,而我有3G數據,那就每次隻讀1G,排序,存入一個文件,這樣最終會產生3個已局部排序的文件,如圖
讀的時候,同時打開3個文件,邊讀邊merge就可以產生一個全局有序的stream
然後是,merge,對於兩個已排序的inputs,做join很簡單
The Hybrid-Hash-Join distinguishes its inputs as build-side and probe-side input and works in two phases, a build phase followed by a probe phase.
In the build phase, the algorithm reads the build-side input and inserts all data elements into an in-memory hash table indexed by their join key attributes. If the hash table outgrows the algorithm’s working memory, parts of the hash table (ranges of hash indexes) are written to the local filesystem. The build phase ends after the build-side input has been fully consumed.
In the probe phase, the algorithm reads the probe-side input and probes the hash table for each element using its join key attribute. If the element falls into a hash index range that was spilled to disk, the element is also written to disk. Otherwise, the element is immediately joined with all matching elements from the hash table. If the hash table completely fits into the working memory, the join is finished after the probe-side input has been fully consumed. Otherwise, the current hash table is dropped and a new hash table is built using spilled parts of the build-side input.
This hash table is probed by the corresponding parts of the spilled probe-side input. Eventually, all data is joined. Hybrid-Hash-Joins perform best if the hash table completely fits into the working memory because an arbitrarily large the probe-side input can be processed on-the-fly without materializing it. However even if build-side input does not fit into memory, the the Hybrid-Hash-Join has very nice properties. In this case, in-memory processing is partially preserved and only a fraction of the build-side and probe-side data needs to be written to and read from the local filesystem. The next figure illustrates how the Hybrid-Hash-Join works.
這個算法會把兩個input,分成bulid input和probe input
其中build input會用於以join key來build一個hash table,如果這個hash table足夠小,那麼很簡單
當build完hash table後,我們隻需要遍曆probe input,如果落在hash table中,就做join
這個方法,如果build input足夠小,會非常高效,因為我們不需要在內存中存probe input,隻需要讀一條處理一條即可
但如果build input比較大,內存放不下整個hash table怎麼辦?
也很簡單,內存不夠的時候,把部分hash table,以hash index range,存入磁盤
這樣當遍曆probe input的時候,如果對應的hash table在磁盤,那麼暫時把這部分probe input也存入磁盤
最後,當遍曆完probe input後,內存中的hash table已經完成join,刪掉,載入磁盤中的hash table完成最後的join
How does Flink choose join strategies?
Ship and local strategies do not depend on each other and can be independently chosen.
Therefore, Flink can execute a join of two data sets R and S in nine different ways by combining any of the three ship strategies (RR, BF with R being broadcasted, BF with S being broadcasted) with any of the three local strategies (SM, HH with R being build-side, HH with S being build-side).
Flink features a cost-based optimizer which automatically chooses the execution strategies for all operators including joins.
Without going into the details of cost-based optimization, this is done by computing cost estimates for execution plans with different strategies and picking the plan with the least estimated costs.
Thereby, the optimizer estimates the amount of data which is shipped over the the network and written to disk.
If no reliable size estimates for the input data can be obtained, the optimizer falls back to robust default choices.
A key feature of the optimizer is to reason about existing data properties.
For example, if the data of one input is already partitioned in a suitable way, the generated candidate plans will not repartition this input. Hence, the choice of a RR ship strategy becomes more likely. The same applies for previously sorted data and the Sort-Merge-Join strategy. Flink programs can help the optimizer to reason about existing data properties by providing semantic information about user-defined functions [4].
While the optimizer is a killer feature of Flink, it can happen that a user knows better than the optimizer how to execute a specific join. Similar to relational database systems, Flink offers optimizer hints to tell the optimizer which join strategies to pick [5].
總的來說,Ship和local的策略可以分開選擇;
Flink的optimizer會自動選擇策略,根據就是,optimizer會對每個策略進行cost estimates,選擇cost相對較小的策略
最後更新:2017-04-07 21:05:50