258
技術社區[雲棲]
Flink - DataStream
先看例子,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<Long, Long>> stream = env.addSource(...); stream .keyBy(0) .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) .reduce(new SummingReducer()) .addSink(new SinkFunction<Tuple2<Long, Long>>() {...}); env.execute();
看出,和batch最大的不同是,這裏是DataStream而不是DataSet;
/** * A DataStream represents a stream of elements of the same type. A DataStream * can be transformed into another DataStream by applying a transformation as * for example: * <ul> * <li>{@link DataStream#map}, * <li>{@link DataStream#filter}, or * </ul> * * @param <T> The type of the elements in this Stream */ public class DataStream<T> { protected final StreamExecutionEnvironment environment; protected final StreamTransformation<T> transformation; /** * Create a new {@link DataStream} in the given execution environment with * partitioning set to forward by default. * * @param environment The StreamExecutionEnvironment */ public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) { this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null."); this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null."); } //DataStream上的各種操作。。。。。。 //map,reduce,keyby...... }
DataStream的核心,即
StreamTransformation<T> transformation; 如何產生data stream
StreamTransformation
對於StreamTransformation,表示一個用於create dataStream的operation;
並且不一定需要對應於一個實際的物理operation,可能隻是個邏輯概念,比如下麵的例子
/** * A {@code StreamTransformation} represents the operation that creates a * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying * {@code StreamTransformation} that is the origin of said DataStream. * * <p> * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this * graph is translated to a {@link StreamGraph} using * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}. * * <p> * A {@code StreamTransformation} does not necessarily correspond to a physical operation * at runtime. Some operations are only logical concepts. Examples of this are union, * split/select data stream, partitioning. * * <p> * The following graph of {@code StreamTransformations}: * * <pre>{@code * Source Source * + + * | | * v v * Rebalance HashPartition * + + * | | * | | * +------>Union<------+ * + * | * v * Split * + * | * v * Select * + * v * Map * + * | * v * Sink * }</pre> * * Would result in this graph of operations at runtime: * * <pre>{@code * Source Source * + + * | | * | | * +------->Map<-------+ * + * | * v * Sink * }</pre> * * The information about partitioning, union, split/select end up being encoded in the edges * that connect the sources to the map operation. * * @param <T> The type of the elements that result from this {@code StreamTransformation} */ public abstract class StreamTransformation<T>
對於StreamTransformation隻定義了output,即該transform產生的result stream
這是抽象類無法直接用,transform產生stream的邏輯還是要封裝在具體的operator中
通過下麵的例子體會一下,transform和operator的區別,這裏設計的有點繞
OneInputTransformation,在StreamTransformation基礎上加上input
/** * This Transformation represents the application of a * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}. * * @param <IN> The type of the elements in the nput {@code StreamTransformation} * @param <OUT> The type of the elements that result from this {@code OneInputTransformation} */ public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> { private final StreamTransformation<IN> input; private final OneInputStreamOperator<IN, OUT> operator; private KeySelector<IN, ?> stateKeySelector; private TypeInformation<?> stateKeyType; }
所以包含,
產生input stream的StreamTransformation<IN> input
以及通過input產生output的OneInputStreamOperator<IN, OUT> operator
同時也可以看下,
public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> { private final StreamTransformation<IN1> input1; private final StreamTransformation<IN2> input2; private final TwoInputStreamOperator<IN1, IN2, OUT> operator; }
在看下SourceTransformation和SinkTransformation的對比,
public class SourceTransformation<T> extends StreamTransformation<T> { private final StreamSource<T> operator; } public class SinkTransformation<T> extends StreamTransformation<Object> { private final StreamTransformation<T> input; private final StreamSink<T> operator; }
比較容易理解transform的作用,
對於source,沒有input,所以沒有代表input的transformation
而對於sink,有input,但是sink的operator不是普通的streamOperator,是StreamSink,即流的終點
transform
這個函數的意思,用用戶自定義的operator,將當前的Stream,轉化為用戶指定類型的Stream
/** * Method for passing user defined operators along with the type * information that will transform the DataStream. * * @param operatorName * name of the operator, for logging purposes * @param outTypeInfo * the output type of the operator * @param operator * the object containing the transformation logic * @param <R> * type of the return stream * @return the data stream constructed */ public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation<T, R>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, resultTransform); getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
所以參數為,
用戶定義的: 輸出的TypeInformation,以及OneInputStreamOperator
實現是,
創建OneInputTransformation,以this.transformation為input,以傳入的operator為OneInputStreamOperator
所以通過resultTransform,就會將當前的stream轉換為目的流
然後又封裝一個SingleOutputStreamOperator,這是什麼?
/** * The SingleOutputStreamOperator represents a user defined transformation * applied on a {@link DataStream} with one predefined output type. * * @param <T> The type of the elements in this Stream * @param <O> Type of the operator. */ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> { protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) { super(environment, transformation); } }
說白了,就是封裝了一下用戶定義的transformation
Flink這塊代碼的命名有點混亂,Operator,transformation,兩個概念容易混
上麵的例子,裏麵keyBy(0)
會產生
KeyedStream
對於keyedStream,關鍵的就是
keySelector和keyType,如何產生key以及key的類型
/** * A {@code KeyedStream} represents a {@link DataStream} on which operator state is * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of * partitioning methods such as shuffle, forward and keyBy. * * <p> * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements * that have the same key. * * @param <T> The type of the elements in the Keyed Stream. * @param <KEY> The type of the key in the Keyed Stream. */ public class KeyedStream<T, KEY> extends DataStream<T> { /** The key selector that can get the key by which the stream if partitioned from the elements */ private final KeySelector<T, KEY> keySelector; /** The type of the key by which the stream is partitioned */ private final TypeInformation<KEY> keyType; }
看下transform,在調用DataStream.transform的同時,設置keySelector和keyType
// ------------------------------------------------------------------------ // basic transformations // ------------------------------------------------------------------------ @Override public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator); // inject the key selector and key type OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keyType); return returnStream; }
KeyedStream很關鍵的是,作為一個到WindowedStream的過度,
所以提供一組生成Windowed的接口
// ------------------------------------------------------------------------ // Windowing // ------------------------------------------------------------------------ /** * Windows this {@code KeyedStream} into tumbling time windows. * * <p> * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic * set using * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)} * * @param size The size of the window. */ public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) { return window(TumblingTimeWindows.of(size)); }
WindowedStream
例子中
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
/** * A {@code WindowedStream} represents a data stream where elements are grouped by * key, and for each key, the stream of elements is split into windows based on a * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. * * <p> * The windows are conceptually evaluated for each key individually, meaning windows can trigger at * different points for each key. * * <p> * If an {@link Evictor} is specified it will be used to evict elements from the window after * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window. * When using an evictor window performance will degrade significantly, since * pre-aggregation of window results cannot be used. * * <p> * Note that the {@code WindowedStream} is purely and API construct, during runtime * the {@code WindowedStream} will be collapsed together with the * {@code KeyedStream} and the operation over the window into one single operation. * * @param <T> The type of elements in the stream. * @param <K> The type of the key by which elements are grouped. * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to. */ public class WindowedStream<T, K, W extends Window> { /** The keyed data stream that is windowed by this stream */ private final KeyedStream<T, K> input; /** The window assigner */ private final WindowAssigner<? super T, W> windowAssigner; /** The trigger that is used for window evaluation/emission. */ private Trigger<? super T, ? super W> trigger; /** The evictor that is used for evicting elements before window evaluation. */ private Evictor<? super T, ? super W> evictor;
可以看到WindowedStream沒有直接繼承自DataStream
而是以,KeyedStream作為他的input
當然window所必需的,WindowAssigner,Trigger和Evictor,也是不會少
繼續例子, .reduce(new SummingReducer())
看看windowedStream的操作,reduce
/** * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted * as a regular non-windowed stream. * <p> * This window will try and pre-aggregate data as much as the window policies permit. For example, * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval, * so a few elements are stored per key (one per slide interval). * Custom windows may not be able to pre-aggregate, or may need to store extra values in an * aggregation tree. * * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) { //clean the closure function = input.getExecutionEnvironment().clean(function); String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")"; KeySelector<T, K> keySel = input.getKeySelector(); OneInputStreamOperator<T, T> operator; boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; if (evictor != null) { operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), new HeapWindowBuffer.Factory<T>(), new ReduceWindowFunction<K, W, T>(function), trigger, evictor).enableSetProcessingTime(setProcessingTime); } else { operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), new PreAggregatingHeapWindowBuffer.Factory<>(function), //PreAggre,即不會cache真實的element,而是直接存聚合過的值,這樣比較節省空間 new ReduceWindowFunction<K, W, T>(function), trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, input.getType(), operator); }
關鍵就是根據是否有Evicting,選擇創建不同的WindowOperator
然後調用input.transform,將windowedStream轉換成SingleOutputStream,
這裏input,即是keyedStream
// ------------------------------------------------------------------------ // basic transformations // ------------------------------------------------------------------------ @Override public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator); // inject the key selector and key type OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keyType); return returnStream; }
可以看到這裏的參數是OneInputStreamOperator,而WindowOperator其實是實現了該interface的,
可以看到,對於OneInputStreamOperator而言,我們隻需要實現,processElement和processWatermark兩個接口,側重如何處理input element
/** * Interface for stream operators with one input. Use * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if * you want to implement a custom operator. * * @param <IN> The input type of the operator * @param <OUT> The output type of the operator */ public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> { /** * Processes one element that arrived at this operator. * This method is guaranteed to not be called concurrently with other methods of the operator. */ void processElement(StreamRecord<IN> element) throws Exception; /** * Processes a {@link Watermark}. * This method is guaranteed to not be called concurrently with other methods of the operator. * * @see org.apache.flink.streaming.api.watermark.Watermark */ void processWatermark(Watermark mark) throws Exception; }
繼續調用,super.transform,即DataStream的transform
例子最後,
.addSink(new SinkFunction<Tuple2<Long, Long>>() {...});
實際是調用,
SingleOutputStreamOperator.addSink,即DataStream.addSink
/** * Adds the given sink to this DataStream. Only streams with sinks added * will be executed once the {@link StreamExecutionEnvironment#execute()} * method is called. * * @param sinkFunction * The object containing the sink's invoke function. * @return The closed DataStream. */ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) { StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction)); DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator); getExecutionEnvironment().addOperator(sink.getTransformation()); return sink; }
SinkFunction結構,
public interface SinkFunction<IN> extends Function, Serializable { /** * Function for standard sink behaviour. This function is called for every record. * * @param value The input record. * @throws Exception */ void invoke(IN value) throws Exception; }
StreamSink,即是OneInputStreamOperator,所以主要是processElement接口
public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>> implements OneInputStreamOperator<IN, Object> { public StreamSink(SinkFunction<IN> sinkFunction) { super(sinkFunction); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { userFunction.invoke(element.getValue()); } @Override public void processWatermark(Watermark mark) throws Exception { // ignore it for now, we are a sink, after all } }
DataStreamSink,就是對SinkTransformation的封裝
/** * A Stream Sink. This is used for emitting elements from a streaming topology. * * @param <T> The type of the elements in the Stream */ public class DataStreamSink<T> { SinkTransformation<T> transformation; @SuppressWarnings("unchecked") protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) { this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism()); } }
最終,
把SinkTransformation加入 List<StreamTransformation<?>> transformations
最後走到,env.execute();
最後更新:2017-04-07 21:05:50