• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

使用Flink和基于事件时间的流计算平均值

scala 来源:Andreas Vogler 5次浏览

我想在基于历史事件的流中计算Flink中基于窗口的平均值(或由我定义的任何其他函数),因此流必须是事件时间(不处理基于时间):使用Flink和基于事件时间的流计算平均值

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

我已经找到了如何在摄入添加时间戳:

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis) 

但是,当我做计算(一个应用函数),它不工作时我只是按照我没有使用EventTime的方式进行操作。我已经读了一些关于我必须设置的水印:

val avg = stream 
    .keyBy("instrument") 
    .timeWindow(Time.seconds(10)) 
    .apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{ 
    val avg = values.map(_.val).sum/values.size 
    val dp = Datapoint(key.getField[String](0), avg) 
    out.collect(dp) 
    }) 

avg.print() 
env.execute() 

有人有一个简单的Scala例子吗?

问候,
安德烈亚斯

===========解决方案如下:

水印是一种有效地与早期的时间戳的所有事件都(可能)已经抵达断言时间戳。基于事件时间的Windows依赖水印来知道窗口何时完成。到目前为止,最常见的水印策略是假定事件以一定的有限延迟到达。

如果要发射的数据源水印(服用时),见Source Functions with Timestamps and Watermarks,但它是那样简单

ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime)) 

如果,另一方面,要解决这个问题之外来源,见Timestamp Assigners/Watermark Generators和Assigners allowing a fixed amount of lateness。你可以简单地做这样的事情:

stream 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))(_.getTimestamp)) 
    .keyBy("instrument") 
    ... 

我链接到的文档有更详细的例子在斯卡拉。


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)