閱讀37 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Flink - TypeInformation

Flink 自己創建一套獨立的類型係統,

參考, https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/types_serialization.html

為何要自己搞一套,而不像其他的平台一樣讓編程語言或serialization framework來天然做掉?

Flink tries to know as much information about what types enter and leave user functions as possible. This stands in contrast to the approach to just assuming nothing and letting the programming language and serialization framework handle all types dynamically.

  • To allow using POJOs and grouping/joining them by referring to field names, Flink needs the type information to make checks (for typos and type compatibility) before the job is executed.

  • The more we know, the better serialization and data layout schemes the compiler/optimizer can develop. That is quite important for the memory usage paradigm in Flink (work on serialized data inside/outside the heap and make serialization very cheap).

  • For the upcoming logical programs (see roadmap draft) we need this to know the “schema” of functions.

  • Finally, it also spares users having to worry about serialization frameworks and having to register types at those frameworks.

Note. POJOs是什麼?Plain Old Java Object(簡單的Java對象),即輕量java對象的花式叫法

主要的理由,

第一是要做類型檢查,Flink支持比較靈活的基於field的join或group,需要先檢查這個field是否可以作為key,或這個field是否可以做join或group

第二是性能優化,便於使用更好的序列化和數據的layout

Flink主要定義如下幾種類型,

Internally, Flink makes the following distinctions between types:

  • Basic types: All Java primitives and their boxed form, plus voidString, and Date.

  • Primitive arrays and Object arrays

  • Composite types

    • Flink Java Tuples (part of the Flink Java API)

    • Scala case classes (including Scala tuples)

    • POJOs: classes that follow a certain bean-like pattern

  • Scala auxiliary types (Option, Either, Lists, Maps, …)

  • Generic types: These will not be serialized by Flink itself, but by Kryo.

基本類型

數組(包含Primitive數組和對象數組)

組合類型,包含Flink Tuples, Scala case classes, 和POJOS

Scala增加的輔助類型

泛型,這個Flink不處理,而是用kryo

這裏尤其需要注意POJOs,因為它的field是可以直接用name引用的,非常方便

dataSet.join(another).where("name").equalTo("personName")

那麼對於Flink的準確的POJO的定義是什麼?

  • The class is public and standalone (no non-static inner class)
  • The class has a public no-argument constructor
  • All fields in the class (and all superclasses) are either public or or have a public getter and a setter method that follows the Java beans naming conventions for getters and setters.

很簡單,隻要滿足上麵的規範,就支持“by-name” field referencing

文檔裏麵還描述了在Scala和Java API中的類型問題,

對於Scala,用manifest或typetag來解決了泛型擦除的問題,所以主要是Flink用macro實現了TypeInformation,便於使用

對於Java,就必須要解決泛型擦除的問題,

DataSet<SomeType> result = dataSet
    .map(new MyGenericNonInferrableFunction<Long, SomeType>())
        .returns(SomeType.class);

所以Flink使用returns原語來增加hints

 

來看看源碼,

基類為,

package org.apache.flink.api.common.typeinfo;
TypeInformation

目的, This type information class acts as the tool

to generate serializers and comparators
to perform semantic checks such as whether the fields that are uses as join/grouping keys actually exist.
bridges between the programming languages object model and a logical flat schema

前兩個目的好理解,

最後一個目的,搞清兩個概念,

arity,the number of fields it contains directly 
total number of fields,number of fields in the entire schema of this type, including nested types

舉個例子,

* public class InnerType {
* public int id;
* public String text;
* }
*
* public class OuterType {
* public long timestamp;
* public InnerType nestedType;
* }

對於Inner type,arity和fields都是2

但對於OuterType,雖然arity是2,但fields是3,因為要把嵌套類型的fields也算上,這就是把編程語言對象模型轉換為flat的邏輯schema

如何算fields的規則如下:

*   <li>Basic types are indivisible and are considered a single field.</li>
* <li>Arrays and collections are one field</li>
* <li>Tuples and case classes represent as many fields as the class has fields</li>

 

IntegerTypeInfo
用這個作為例子,分析一下
public class IntegerTypeInfo<T> extends NumericTypeInfo<T> 
public abstract class NumericTypeInfo<T> extends BasicTypeInfo<T> 
public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T>

可以看到Integer最終繼承到BasicType,BasicType除了繼承TypeInformation還實現AtomicType接口,

public interface AtomicType<T> {   

TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
}
* An atomic type is a type that is treated as one indivisible unit and where the entire type acts
* as a key.
* In contrast to atomic types are composite types, where the type information is aware of the individual
* fields and individual fields may be used as a key.
atomic類型就是不可分的類型,不像composite類型還包含其他的field,所以atomic本身整個作為key,基本類型如int肯定是屬於atomic類型的
 
在BasicTypeInfo中定義了所有基本類型的TypeInfo,
複製代碼
    public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<String>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);
    public static final BasicTypeInfo<Boolean> BOOLEAN_TYPE_INFO = new BasicTypeInfo<Boolean>(Boolean.class, new Class<?>[]{}, BooleanSerializer.INSTANCE, BooleanComparator.class);
    public static final BasicTypeInfo<Byte> BYTE_TYPE_INFO = new IntegerTypeInfo<Byte>(Byte.class, new Class<?>[]{Short.class, Integer.class, Long.class, Float.class, Double.class, Character.class}, ByteSerializer.INSTANCE, ByteComparator.class);
    public static final BasicTypeInfo<Short> SHORT_TYPE_INFO = new IntegerTypeInfo<Short>(Short.class, new Class<?>[]{Integer.class, Long.class, Float.class, Double.class, Character.class}, ShortSerializer.INSTANCE, ShortComparator.class);
    public static final BasicTypeInfo<Integer> INT_TYPE_INFO = new IntegerTypeInfo<Integer>(Integer.class, new Class<?>[]{Long.class, Float.class, Double.class, Character.class}, IntSerializer.INSTANCE, IntComparator.class);
    public static final BasicTypeInfo<Long> LONG_TYPE_INFO = new IntegerTypeInfo<Long>(Long.class, new Class<?>[]{Float.class, Double.class, Character.class}, LongSerializer.INSTANCE, LongComparator.class);
    public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new FractionalTypeInfo<Float>(Float.class, new Class<?>[]{Double.class}, FloatSerializer.INSTANCE, FloatComparator.class);
    public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new FractionalTypeInfo<Double>(Double.class, new Class<?>[]{}, DoubleSerializer.INSTANCE, DoubleComparator.class);
    public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, new Class<?>[]{}, CharSerializer.INSTANCE, CharComparator.class);
    public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, new Class<?>[]{}, DateSerializer.INSTANCE, DateComparator.class);
    public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null);
複製代碼

@Override
public void serialize(Long record, DataOutputView target) throws IOException {
    target.writeLong(record.longValue());
}

 

NumericTypeInfo,隻是一種特殊的BasicTypeInfo

複製代碼
    private static final Set<Class<?>> numericalTypes = Sets.<Class<?>>newHashSet(
            Integer.class,
            Long.class,
            Double.class,
            Byte.class,
            Short.class,
            Float.class,
            Character.class
    );
複製代碼

而IntegerTypeInfo,隻是範圍的進一步縮小,

複製代碼
    private static final Set<Class<?>> integerTypes = Sets.<Class<?>>newHashSet(
            Integer.class,
            Long.class,
            Byte.class,
            Short.class,
            Character.class
    );
複製代碼
 

除了上麵的AtomicType,還有如array的typeinfo

比如,BasicArrayTypeInfo

最後更新:2017-04-07 21:23:50

  上一篇:go Apache Kafka – KIP 32,33 Time Index
  下一篇:go Flink – submitJob