Flink - Juggling with Bits and Bytes
https://www.36dsj.com/archives/33650
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
https://www.bigsynapse.com/addressing-big-data-performance ,addressing-big-data-performance
第一篇描述,當前JVM存在的問題,
1. Java對象開銷
Java對象的存儲密度相對偏低,對於“abcd”這樣簡單的字符串在UTF-8編碼中需要4個字節存儲,但Java采用UTF-16編碼存儲字符串,需要8個字節存儲
同時Java對象還有header等其他額外信息,一個4字節字符串對象,在Java中需要48字節的空間來存儲
2. 對象存儲結構引發的cache miss
當CPU訪問的數據如果是在內存中連續存儲的話,訪問的效率會非常高。如果CPU要訪問的數據不在當前緩存所有的cache line中,則需要從內存中加載對應的數據,這被稱為一次cache miss。
當cache miss非常高的時候,CPU大部分的時間都在等待數據加載,而不是真正的處理數據
Java對象並不是連續的存儲在內存上,同時很多的Java數據結構的數據聚集性也不好,在Spark的性能調優中,經常能夠觀測到大量的cache miss
3. 大數據的垃圾回收
秒級甚至是分鍾級的gc,尤其是full gc,極大的影響了Java應用的性能和可用性
4. OOM問題
OutOfMemoryError是分布式計算框架經常會遇到的問題,當JVM中所有對象大小超過分配給JVM的內存大小時,就會fOutOfMemoryError錯誤,JVM崩潰,分布式框架的健壯性和性能都會受到影響
解決方法,
定製的序列化工具
前麵說了java對象的開銷很大,如果使用Java Serialization和Kryo,會把所有的信息完整的進行序列化,會很占空間
一般如果是放內存是不需要序列化的,隻有網絡傳輸或存儲的時候,需要做序列化,這樣如果以java對象的開銷,大大增加了網絡和磁盤的開銷
當前由於memory資源很緊張,就算在內存中,我們也不能存放java對象本身,而是通過內存管理,存儲經過序列化的Java對象
這樣帶來的問題,如果序列化完後,如果數據沒有明顯變小就沒有意義
序列化帶來的最大的問題是,如果每次使用的時候都需要做反序列化,這個性能也好不到哪去
所以傳統的序列化的問題,
- 占用較多內存。
- 反序列化時,必須反序列化整個Java對象。
- 無法直接操作序列化後的二進製數據。
對於大數據方案,序列化的思路,基本是,對於一個數據集,schema是固定的,那麼我們沒有必要在每個對象裏麵都記錄元數據,隻要記錄一份就好,而在序列化的時候隻需要記錄真正的數據就好
比如spark的方案,Project Tungsten
圖中,顯示出對於上麵3個數據,如果高效的進行序列化的,
對於123,整形,直接以4byte存入
對於‘data’,可變長的數據,隻存入真正數據所在的地址32L,在32L先存儲數據length,接著是真正的數據
這樣大大降低了存儲的空間,並且更關鍵的是,在讀取某個數據時,不需要反序列化整行,隻需要根據offset偏移找到相應的數據,隻序列化你需要的字段
當然對於這段數據的元數據,你是要單獨存儲的,但是對於比如10000行,隻需要保存一份元數據,就顯得很高效
第二篇文章會詳細的描述Flink的方案
顯式的內存管理
不依賴JVM的GC進行內存管理,顯示的自己進行管理
有兩種方式,
on-heap,仍然是用JVM的內存,但是使用Memory Manager pool的方式,
Memory Manager pool由多個MemorySegment組成,每個MemorySegment代表一塊連續的內存,底層存儲是byte[],默認32KB大小
你使用的時候,申請一個MemorySegment,然後把你的數據序列化後放到這個byte[]中,用完就把MemorySegment釋放回Memory Manager pool
這樣,你雖然不停的產生和銷毀對象,但是在JVM看來,存在是隻是pool中的那幾個MemorySegment,這樣gc的工作量和頻度都會小很多
off-heap,幹脆不使用JVM的內存,使用java unsafe接口直接使用係統的內存,就完全和c一樣
這樣做的好處,
on-heap方式,啟動時分配大內存(例如100G)的JVM仍然很耗時間,垃圾回收也很慢
更有效率的IO操作。在off-heap下,將MemorySegment寫到磁盤或是網絡,可以支持zeor-copy技術,而on-heap的話,則至少需要一次內存拷貝(從內存中byte[]對象copy到設備的cache)
off-heap可用於錯誤恢複,比如JVM崩潰,在on-heap時,數據也隨之丟失,但在off-heap下,off-heap的數據可能還在。
off-heap上的數據是進程間共享的,不同的JVM可以通過off-heap共享
緩存友好的計算
由於CPU處理速度和內存訪問速度的差距,提升CPU的處理效率的關鍵在於最大化的利用L1/L2/L3/Memory,減少任何不必要的Cache miss。
定製的序列化工具給Spark和Flink提供了可能,通過定製的序列化工具,Spark和Flink訪問的二進製數據本身,因為占用內存較小,存儲密度比較大,而且還可以在設計數據結構和算法時,盡量連續存儲,減少內存碎片化對Cache命中率的影響,甚至更進一步,Spark與Flink可以將需要操作的部分數據(如排序時的Key)連續存儲,而將其他部分的數據存儲在其他地方,從而最大可能的提升Cache命中的概率。
Juggling with Bits and Bytes
Data Objects? Let’s put them on the heap!
The most straight-forward approach to process lots of data in a JVM is to put it as objects on the heap and operate on these objects.
Caching a data set as objects would be as simple as maintaining a list containing an object for each record.
An in-memory sort would simply sort the list of objects.
However, this approach has a few notable drawbacks.
First of all it is not trivial to watch and control heap memory usage when a lot of objects are created and invalidated constantly. Memory overallocation instantly kills the JVM with an OutOfMemoryError
.
Another aspect is garbage collection on multi-GB JVMs which are flooded with new objects. The overhead of garbage collection in such environments can easily reach 50% and more.
Finally, Java objects come with a certain space overhead depending on the JVM and platform. For data sets with many small objects this can significantly reduce the effectively usable amount of memory.
Given proficient system design and careful, use-case specific system parameter tuning, heap memory usage can be more or less controlled andOutOfMemoryErrors
avoided. However, such setups are rather fragile especially if data characteristics or the execution environment change
直接把對象放在heap裏麵用好不好?當然好,不然了
但這樣對於海量數據,是有性能問題的,比如比較容易OutOfMemoryError
, gc的效率,java對象的overhead
What is Flink doing about that?
Apache Flink has its roots at a research project which aimed to combine the best technologies of MapReduce-based systems and parallel database systems.
Coming from this background, Flink has always had its own way of processing data in-memory.
Instead of putting lots of objects on the heap, Flink serializes objects into a fixed number of pre-allocated memory segments.
Flink致力於研究將DAG和MPP中最好的技術combine起來,Flink不是直接把對象放入heap,而是把對象進行序列化放入事先分配好的memeory segments中
Its DBMS-style sort and join algorithms operate as much as possible on this binary data to keep the de/serialization overhead at a minimum.
If more data needs to be processed than can be kept in memory, Flink’s operators partially spill data to disk.
In fact, a lot of Flink’s internal implementations look more like C/C++ rather than common Java.
Flink采用類似DBMS的sort和join算法,會直接操作二進製數據,而使de/serialization overhead 達到最小;
所以從Flink的內部實現看,更像C/C++而非java
The following figure gives a high-level overview of how Flink stores data serialized in memory segments and spills to disk if necessary.
Flink’s style of active memory management and operating on binary data has several benefits:
-
Memory-safe execution & efficient out-of-core algorithms. Due to the fixed amount of allocated memory segments, it is trivial to monitor remaining memory resources. In case of memory shortage, processing operators can efficiently write larger batches of memory segments to disk and later them read back. Consequently,
OutOfMemoryErrors
are effectively prevented. - Reduced garbage collection pressure. Because all long-lived data is in binary representation in Flink’s managed memory, all data objects are short-lived or even mutable and can be reused. Short-lived objects can be more efficiently garbage-collected, which significantly reduces garbage collection pressure. Right now, the pre-allocated memory segments are long-lived objects on the JVM heap, but the Flink community is actively working on allocating off-heap memory for this purpose. This effort will result in much smaller JVM heaps and facilitate even faster garbage collection cycles.
- Space efficient data representation. Java objects have a storage overhead which can be avoided if the data is stored in a binary representation.
- Efficient binary operations & cache sensitivity. Binary data can be efficiently compared and operated on given a suitable binary representation. Furthermore, the binary representations can put related values, as well as hash codes, keys, and pointers, adjacently into memory. This gives data structures with usually more cache efficient access patterns.
這樣做的好處有:
不會出現OutOfMemoryErrors
,當內存不夠的時候,會批量把memory segments寫入磁盤,後續再read back
降低gc壓力,這個顯而易見,因為所有all long-lived data 都已經以二進製的方式存在flink的memory segments,並且這些segments都是可mutable的,可重用的;
節省空間,這個也顯而易見,因為不需要存儲java對象的overhead
有效的二進製操作和cache,二進製數據可以有效的被compared和操作
These properties of active memory management are very desirable in a data processing systems for large-scale data analytics but have a significant price tag attached. Active memory management and operating on binary data is not trivial to implement, i.e., using java.util.HashMap
is much easier than implementing a spillable hash-table backed by byte arrays and a custom serialization stack.
Of course Apache Flink is not the only JVM-based data processing system that operates on serialized binary data. Projects such as Apache Drill, Apache Ignite (incubating) or Apache Geode (incubating) apply similar techniques and it was recently announced that also Apache Spark will evolve into this direction with Project Tungsten.
當然這種使用java的方式,個人看來,從技術上將,不是一種進步,而是一種倒退
這是JVM內存管理技術的發展無法跟上大數據時代的腳步導致的。。。。。。
所以問題是,你要用原始的方式,就會很麻煩
後麵列出,其他的開源項目也在做類似的努力
How does Flink allocate memory?
A Flink worker, called TaskManager, is composed of several internal components such as an actor system for coordination with the Flink master, an IOManager that takes care of spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage. In the context of this blog post, the MemoryManager is of most interest.
The MemoryManager takes care of allocating, accounting, and distributing MemorySegments to data processing operators such as sort and join operators.
A MemorySegment is Flink’s distribution unit of memory and is backed by a regular Java byte array (size is 32 KB by default).
A MemorySegment provides very efficient write and read access to its backed byte array using Java’s unsafe methods. You can think of a MemorySegment as a custom-tailored version of Java’s NIO ByteBuffer.
In order to operate on multiple MemorySegments like on a larger chunk of consecutive memory, Flink uses logical views that implement Java’s java.io.DataOutput
and java.io.DataInput
interfaces.
MemoryManager 負責管理所有的MemorySegments,一個MemorySegments 是一個32KB大小的byte數組
MemorySegment 用java的unsafe method提供有效的基於byte數組的讀寫,你可以認為MemorySegment 就是一個用戶裁剪版的NIO的ByteBuffer
如果要操作多塊MemorySegment就像操作一塊大的連續內存,Flink會使用邏輯view來實現java的dataOutput或dataInput接口
MemorySegments are allocated once at TaskManager start-up time and are destroyed when the TaskManager is shut down.
Hence, they are reused and not garbage-collected over the whole lifetime of a TaskManager.
After all internal data structures of a TaskManager have been initialized and all core services have been started, the MemoryManager starts creating MemorySegments.
By default 70% of the JVM heap that is available after service initialization is allocated by the MemoryManager. It is also possible to configure an absolute amount of managed memory.
The remaining JVM heap is used for objects that are instantiated during task processing, including objects created by user-defined functions.
MemorySegments 在TaskManager啟動的時候被分配,在TaskManager關閉的時候被銷毀
他們在TaskManager 整個生命周期中可以被重用,而不會被gc掉
Default設置,會有70%的JVM heap會被MemoryManager申請分配成MemorySegments,
剩下的JVM heap用於在task執行中的臨時對象實例化,比如udf中創建的對象
The following figure shows the memory distribution in the TaskManager JVM after startup.
How does Flink serialize objects?
The Java ecosystem offers several libraries to convert objects into a binary representation and back.
Common alternatives are standard Java serialization, Kryo, Apache Avro, Apache Thrift, or Google’s Protobuf.
Flink includes its own custom serialization framework in order to control the binary representation of data.
This is important because operating on binary data such as comparing or even manipulating binary data requires exact knowledge of the serialization layout.
Further, configuring the serialization layout with respect to operations that are performed on binary data can yield a significant performance boost.
Flink’s serialization stack also leverages the fact, that the type of the objects which are going through de/serialization are exactly known before a program is executed.
在java ecosystem中,除了java serialization,還有一堆序列化方案,Kryo, Apache Avro, Apache Thrift, or Google’s Protobuf.
但是Flink還是使用一套自己的serialization framework,以便於控製數據的二進製表示方式
因為對於二進製數據的操作,比如comparing甚至是直接操作,需要確切的知道二進製數據的layout
Flink programs can process data represented as arbitrary Java or Scala objects. Before a program is optimized, the data types at each processing step of the program’s data flow need to be identified.
For Java programs, Flink features a reflection-based type extraction component to analyze the return types of user-defined functions. Scala programs are analyzed with help of the Scala compiler.
Flink需要明確的知道處理過程每步的,數據流的數據類型,比如通過Java的reflection-based type extraction或Scala compiler
然後Flink會使用自己的類型係統來表示數據類型,TypeInformation
Flink represents each data type with a TypeInformation. Flink hasTypeInformations for several kinds of data types, including:
- BasicTypeInfo: Any (boxed) Java primitive type or java.lang.String.
- BasicArrayTypeInfo: Any array of a (boxed) Java primitive type or java.lang.String.
- WritableTypeInfo: Any implementation of Hadoop’s Writable interface.
- TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples are Java representations for fixed-length tuples with typed fields.
- CaseClassTypeInfo: Any Scala CaseClass (including Scala tuples).
- PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either being public or accessible through getters and setter that follow the common naming conventions.
- GenericTypeInfo: Any data type that cannot be identified as another type.
Each TypeInformation provides a serializer for the data type it represents.
For example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields() methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a serializer that delegates serialization to Kryo.
Object serialization to a DataOutput which is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations.
For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete data type - also efficiently compare binary representations and extract fixed-length binary key prefixes.
在TypeInformation中,主要包含serializer,類型會自動通過serializer進行序列化,然後用java unsafe接口寫入MemorySegments
當然對於可以用作key的數據類型,還包含TypeComparators,用於compared和hashed
Tuple, Pojo, and CaseClass types are composite types, i.e., containers for one or more possibly nested data types. As such, their serializers and comparators are also composite and delegate the serialization and comparison of their member data types to the respective serializers and comparators. The following figure illustrates the serialization of a (nested)Tuple3<Integer, Double, Person>
object where Person
is a POJO and defined as follows:
public class Person {
public int id;
public String name;
}
對於Pojo對象,我們具體看看是如何被存入MemorySegments的,
可以看出,存儲是相當緊湊的,
int占4個字節,double占8個字節
pojo,也隻多了1個字節的header
serializer也是經過優化的,Pojo Serializer隻負責序列化Pojo head
How does Flink operate on binary data?
Similar to many other data processing APIs (including SQL), Flink’s APIs provide transformations to group, sort, and join data sets. These transformations operate on potentially very large data sets.
Relational database systems feature very efficient algorithms for these purposes since several decades including external merge-sort, merge-join, and hybrid hash-join.
Flink builds on this technology, but generalizes it to handle arbitrary objects using its custom serialization and comparison stack.
In the following, we show how Flink operates with binary data by the example of Flink’s in-memory sort algorithm.
Flink提供如group,sort,join等操作,這些操作可能需要access海量的數據集,關係型數據庫在過去幾十年中積累的大量有效的算法來解決這類問題;
Flink基於這些技術,並使其通用化,用於處理flink中的非關係型數據
下麵就看看Flink是如何進行in-memory sort
Flink assigns a memory budget to its data processing operators.
Upon initialization, a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding set of MemorySegments.
The set of MemorySegments becomes the memory pool of a so-called sort buffer which collects the data that is be sorted.
首先,Flink會從MemorySegmentPool裏麵來申請MemorySegments來進行存放排序的結果,
The sort buffer is internally organized into two memory regions.
The first region holds the full binary data of all objects. The second region contains pointers to the full binary object data and - depending on the key data type - fixed-length sort keys.
When an object is added to the sort buffer, its binary data is appended to the first region, and a pointer (and possibly a key) is appended to the second region.
The separation of actual data and pointers plus fixed-length keys is done for two purposes.
It enables efficient swapping of fix-length entries (key+pointer) and also reduces the data that needs to be moved when sorting.
If the sort key is a variable length data type such as a String, the fixed-length sort key must be a prefix key such as the first n characters of a String. Note, not all data types provide a fixed-length (prefix) sort key. When serializing objects into the sort buffer, both memory regions are extended with MemorySegments from the memory pool.
我們排序的時候,在內存中把數據的key和真實的數據object,分成兩部分存放;
這樣做有兩個好處,第一是排序的時候,需要做swap,這樣隻需要swap key而不需要swap真實的數據;
第二,這樣是cache友好的,key都是連續的存儲在內存中,大大減少cache miss
Once the memory pool is empty and no more objects can be added, the sort buffer is completely filled and can be sorted.
Flink’s sort buffer provides methods to compare and swap elements. This makes the actual sort algorithm pluggable. By default, Flink uses a Quicksort implementation which can fall back to HeapSort. The following figure shows how two objects are compared.
The sort buffer compares two elements by comparing their binary fix-length sort keys.
The comparison is successful if either done on a full key (not a prefix key) or if the binary prefix keys are not equal.
If the prefix keys are equal (or the sort key data type does not provide a binary prefix key), the sort buffer follows the pointers to the actual object data, deserializes both objects and compares the objects. Depending on the result of the comparison, the sort algorithm decides whether to swap the compared elements or not.
The sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data is not moved.
Once the sort algorithm finishes, the pointers in the sort buffer are correctly ordered. The following figure shows how the sorted data is returned from the sort buffer.
上麵的圖,給出排序的過程,排序首先是要比大小
這裏可以先用key比大小,這樣就可以直接用二進製的key而不需要做反序列化
如果,通過二進製的key無法比出大小,或者根本就沒有二進製的key,那就必須要把object數據,反序列化出來,然後再比較
然後,隻需要swap key,就可以達到排序的效果
The sorted data is returned by sequentially reading the pointer region of the sort buffer, skipping the sort keys and following the sorted pointers to the actual data.
This data is either deserialized and returned as objects or the binary representation is copied and written to disk in case of an external merge-sort (see this blog post on joins in Flink).
然後最終,安裝排好序key,通過point找到相應的data,相應的存入文件中
最後更新:2017-04-07 21:05:50