Flink - Generating Timestamps / Watermarks
To work with Event Time, streaming programs need to set the time characteristic accordingly.
首先配置成,Event Time
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Assigning Timestamps
In order to work with Event Time, Flink needs to know the events’ timestamps, meaning each element in the stream needs to get its event timestamp assigned. That happens usually by accessing/extracting the timestamp from some field in the element.
Timestamp assignment goes hand-in-hand with generating watermarks, which tell the system about the progress in event time.
There are two ways to assign timestamps and generate Watermarks:
- Directly in the data stream source
- Via a TimestampAssigner / WatermarkGenerator
接著,我們需要定義如何去獲取event time和如何產生Watermark?
一種方式,在source中寫死,
@Override public void run(SourceContext<MyType> ctx) throws Exception { while (/* condition */) { MyType next = getNext(); ctx.collectWithTimestamp(next, next.getEventTimestamp()); if (next.hasWatermarkTime()) { ctx.emitWatermark(new Watermark(next.getWatermarkTime())); } } }
這種方式明顯比較low,不太方便,並且這種方式是會被TimestampAssigner 覆蓋掉的,
所以看看第二種方式,
Timestamp Assigners / Watermark Generators
Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the original stream had timestamps or watermarks already, the timestamp assigner overwrites those.
The timestamp assigners occur usually direct after the data source, but it is not strictly required to. A common pattern is for example to parse (MapFunction) and filter (FilterFunction) before the timestamp assigner. In any case, the timestamp assigner needs to occur before the first operation on event time (such as the first window operation).
一般在會在source後加些map,filter做些初始化或格式化
然後,在任意需要用到event time的操作之前,比如window,進行設置
給個例子,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props)); DataStream<MyEvent> withTimestampsAndWatermarks = stream .filter( event -> event.severity() == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()); withTimestampsAndWatermarks .keyBy( (event) -> event.getGroup() ) .timeWindow(Time.seconds(10)) .reduce( (a, b) -> a.add(b) ) .addSink(...);
那麼Timestamp Assigners如何實現,比如例子中給出的MyTimestampsAndWatermarks
有3種,
With Ascending timestamps
The simplest case for generating watermarks is the case where timestamps within one source occur in ascending order. In that case, the current timestamp can always act as a watermark, because no lower timestamps will occur any more.
DataStream<MyEvent> stream = ... DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { @Override public long extractAscendingTimestamp(MyEvent element) { return element.getCreationTime(); } });
這種沒人用吧,不如直接用processing time了
With Periodic Watermarks
The AssignerWithPeriodicWatermarks
assigns timestamps and generate watermarks periodically (possibly depending the stream elements, or purely based on processing time).
The interval (every n milliseconds) in which the watermark will be generated is defined viaExecutionConfig.setAutoWatermarkInterval(...)
. Each time, the assigner’s getCurrentWatermark()
method will be called, and a new Watermark will be emitted, if the returned Watermark is non-null and larger than the previous Watermark.
定期的發送,你可以通過ExecutionConfig.setAutoWatermarkInterval(...),來設置這個頻率
/** * This generator generates watermarks assuming that elements come out of order to a certain degree only. * The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest * elements for timestamp t. */ public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { long timestamp = element.getCreationTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } /** * This generator generates watermarks that are lagging behind processing time by a certain amount. * It assumes that elements arrive in Flink after at most a certain time. */ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> { private final long maxTimeLag = 5000; // 5 seconds @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { return element.getCreationTime(); } @Override public Watermark getCurrentWatermark() { // return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - maxTimeLag); } }
上麵給出兩個case,區別是第一種,會以event time的Max,來設置watermark
第二種,是以當前的processing time來設置watermark
With Punctuated Watermarks
To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use theAssignerWithPunctuatedWatermarks
. For this class, Flink will first call the extractTimestamp(...)
method to assign the element a timestamp, and then immediately call for that element the checkAndGetNextWatermark(...)
method.
The checkAndGetNextWatermark(...)
method gets the timestamp that was assigned in the extractTimestamp(...)
method, and can decide whether it wants to generate a Watermark. Whenever the checkAndGetNextWatermark(...)
method returns a non-null Watermark, and that Watermark is larger than the latest previous Watermark, that new Watermark will be emitted.
這種即,watermark不是由時間來觸發的,而是以特定的event觸發的,即本到某些特殊的event或message,才觸發watermark
所以它的接口叫,checkAndGetNextWatermark
需要先check
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> { @Override public long extractTimestamp(MyEvent element, long previousElementTimestamp) { return element.getCreationTime(); } @Override public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) { return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null; } }
最後更新:2017-04-07 21:05:50