在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。 所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。 1.1 窗口(Window)1.1 窗口的概念 Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。
注意: Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。
6.1.2 窗口的分类 我们在上一节举的例子,其实是最为简单的一种时间窗口。在Flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度,对Flink中内置的窗口做一个分类说明。
1)按照驱动类型分
2)按照窗口分配数据的规则分类 根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。
1..1.3 窗口API概览 1)按键分区(Keyed)和非按键分区(Non-Keyed) 在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。 (1)按键分区窗口(Keyed Windows) 经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。 在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。 1 2 stream.keyBy(...) .window(...)
(2)非按键分区(Non-Keyed Windows) 如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。 在代码中,直接基于DataStream调用.windowAll()定义窗口。
注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。 2)代码中窗口API的调用 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。 1 2 3 stream.keyBy(<key selector>) .window(<window assigner>) .aggregate(<window function>)
其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。 1.1.4 窗口分配器 定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。 窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。 窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。 1.1.4.1 时间窗口 时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。 (1)滚动处理时间窗口 窗口分配器由类TumblingProcessingTimeWindows提供,需要调用它的静态方法.of()。 1 2 3 stream.keyBy(...) .window(TumblingProcessingTimeWindows .of(Time .seconds(5 ))) .aggregate(...)
这里.of()方法需要传入一个Time类型的参数size,表示滚动窗口的大小,我们这里创建了一个长度为5秒的滚动窗口。 另外,.of()还有一个重载方法,可以传入两个Time类型的参数:size和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。 (2)滑动处理时间窗口 窗口分配器由类SlidingProcessingTimeWindows提供,同样需要调用它的静态方法.of()。 1 2 3 stream.keyBy(...) .window(SlidingProcessingTimeWindows .of(Time .seconds(10 ),Time .seconds(5 ))) .aggregate(...)
这里.of()方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。 滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。 (3)处理时间会话窗口 窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。 1 2 3 stream.keyBy(...) .window(ProcessingTimeSessionWindows .withGap(Time .seconds(10 ))) .aggregate(...)
这里.withGap()方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。 另外,还可以调用withDynamicGap()方法定义session gap的动态提取逻辑。 (4)滚动事件时间窗口 窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致。 1 2 3 stream.keyBy(...) .window(TumblingEventTimeWindows .of(Time .seconds(5 ))) .aggregate(...)
(5)滑动事件时间窗口 窗口分配器由类SlidingEventTimeWindows提供,用法与滑动处理事件窗口完全一致。 1 2 3 stream.keyBy(...) .window(SlidingEventTimeWindows .of(Time .seconds(10 ),Time .seconds(5 ))) .aggregate(...)
(6)事件时间会话窗口 窗口分配器由类EventTimeSessionWindows提供,用法与处理事件会话窗口完全一致。 1 2 3 stream.keyBy(...) .window(EventTimeSessionWindows .withGap(Time .seconds(10 ))) .aggregate(...)
1.1.4.2 计数窗口 计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。 (1)滚动计数窗口 滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。 1 2 stream.keyBy(...) .countWindow(10 )
我们定义了一个长度为10的滚动计数窗口,当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口。 (2)滑动计数窗口 与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。 1 2 stream.keyBy(...) .countWindow(10 ,3 )
我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据,每隔3个数据就统计输出一次结果。 3)全局窗口 全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由GlobalWindows类提供。 1 2 stream.keyBy(...) .window(GlobalWindows .create());
需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。 1.1.5 窗口函数
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。下面我们来进行分别讲解。 1.1.5.1 增量聚合函数(ReduceFunction / AggregateFunction) 窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。 典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。 1)归约函数(ReduceFunction) 代码示例: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class WindowReduceDemo { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); env .socketTextStream("hadoop102" , 7777 ) .map(new WaterSensorMapFunction ()) .keyBy(r -> r.getId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10 ))) .reduce(new ReduceFunction <WaterSensor>() { @Override public WaterSensor reduce (WaterSensor value1, WaterSensor value2) throws Exception { System.out.println("调用reduce方法,之前的结果:" +value1 + ",现在来的数据:" +value2); return new WaterSensor (value1.getId(), System.currentTimeMillis(),value1.getVc()+value2.getVc()); } }) .print(); env.execute(); } }
2)聚合函数(AggregateFunction) ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。 Flink Window API中的aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。 AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。 接口中有四个方法:
createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
add():将输入的元素添加到累加器中。
getResult():从累加器中提取聚合的输出结果。
merge():合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。 代码实现如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.day03import com.day02.ClickSource import com.day02.SourceBoundedTest .Event import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner , WatermarkStrategy }import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object AggregateFunctionTest { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) val stream = env.addSource(new ClickSource ) .assignAscendingTimestamps(_.timestamp) stream.keyBy(data => true ) .window( SlidingEventTimeWindows .of(Time .seconds(10 ), Time .seconds(2 ))) .aggregate( new PvUv ) .print() env.execute() } class PvUv extends AggregateFunction [Event , (Long , Set [String ]), Double ] { override def createAccumulator (): (Long , Set [String ]) = (0 L, Set [String ]()) override def add (value: Event , accumulator: (Long , Set [String ])): (Long , Set [String ]) = (accumulator._1 + 1 , accumulator._2 + value.user) override def getResult (accumulator: (Long , Set [String ])): Double = accumulator._1.toDouble /accumulator._2.size override def merge (a: (Long , Set [String ]), b: (Long , Set [String ])): (Long , Set [String ]) = ??? } }
另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。 1.1.5.2 全窗口函数(full window functions) 有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。 所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。 1)窗口函数(WindowFunction) WindowFunction字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。 1 2 3 4 stream .keyBy(<key selector>) .window(<window assigner>) .apply(new MyWindowFunction ());
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。 不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。 2)处理窗口函数(ProcessWindowFunction) ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。 事实上,ProcessWindowFunction是Flink底层API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。 代码实现如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.day03import com.day02.ClickSource import com.day02.SourceBoundedTest .Event import com.day03.AggregateFunctionTest .PvUv import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows , TumblingEventTimeWindows }import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object FullWindowFunctionTest { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) val stream = env.addSource( new ClickSource ) .assignAscendingTimestamps(_.timestamp) stream.keyBy(data => "key" ) .window(TumblingEventTimeWindows .of(Time .seconds(10 ))) .process(new UvCountByWindow ) .print() env.execute() } class UvCountByWindow extends ProcessWindowFunction [Event , String , String , TimeWindow ] { override def process (key: String , context: Context , elements: Iterable [Event ], out: Collector [String ]): Unit = { var userSet = Set [String ]() elements.foreach(userSet += _.user) val uv = userSet.size val windowEnd = context.window.getEnd val windowsStart = context.window.getStart out.collect(s"窗口 $windowsStart ~$windowEnd 的uv值为: $uv " ) } } }
1.1.5.3 增量聚合和全窗口函数的结合使用 在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。 我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package com.day03import com.day02.ClickSource import com.day02.SourceBoundedTest .Event import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object UrlViewCountExample { case class UrlViewCount (url: String , count: Long , windowStart: Long , windowEnd: Long ) def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) val stream = env.addSource( new ClickSource ) .assignAscendingTimestamps(_.timestamp) stream.keyBy(_.url) .window(SlidingEventTimeWindows .of(Time .seconds(10 ), Time .seconds(5 ))) .aggregate(new UrlViewCountAgg , new UrlViewCountResult ) .print env.execute() } class UrlViewCountAgg extends AggregateFunction [Event , Long , Long ] { override def createAccumulator (): Long = 0 L override def add (value: Event , accumulator: Long ): Long = accumulator + 1 override def getResult (accumulator: Long ): Long = accumulator override def merge (a: Long , b: Long ): Long = ??? } class UrlViewCountResult extends ProcessWindowFunction [Long , UrlViewCount , String , TimeWindow ] { override def process (url: String , context: Context , elements: Iterable [Long ], out: Collector [UrlViewCount ]): Unit = { val count = elements.iterator.next() val start = context.window.getStart val end = context.window.getEnd out.collect(UrlViewCount (url, count, start, end)) } } }
1.1.6 其他API 对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink还提供了其他一些可选的API,让我们可以更加灵活地控制窗口行为。 1.1.6.1 触发器(Trigger) 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。 基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。 1 2 3 stream.keyBy(...) .window(...) .trigger(new MyTrigger ())
1.1.6.2 移除器(Evictor) 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。 1 2 3 stream.keyBy(...) .window(...) .evictor(new MyEvictor ())
1.2 时间语义 1**.2.1 Flink中的时间语义**
1.2.2 哪种时间语义更重要 1)从《星球大战》说起
为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。
如上图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。 2)数据处理系统中的时间语义 在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。 在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。 1.3 水位线( Watermark) 1.3.1 事件时间和窗口
1.3.2 什么是水位线
1.3.3 水位线和窗口的工作原理
[!CAUTION]
注意: Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。
1.3.4 生成水位线 1.3.4.1 生成水位线的总体原则 完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。 如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。 所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。 1.3.4.2 水位线生成策略 在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下: 1 2 3 4 DataStream<Event> stream = env.addSource(new ClickSource ()); DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);
说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。 1 2 3 4 5 6 7 8 stream.assignTimestampsAndWatermarks(WatermarkStrategy .forMonotonousTimestamps[Event ]() .withTimestampAssigner( new SerializableTimestampAssigner [Event ] { override def extractTimestamp (element: Event , recordTimestamp: Long ): Long = element.timestamp } ))
1.3.4.3 Flink内置水位线 1)有序流中内置水位线设置
2)乱序流中内置水位线设置
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。 .3.4.4 自定义水位线生成器 1)周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。 下面是一段自定义、有序流、无序流周期性生成水位线的代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 package com.day03import com.day02.SourceBoundedTest .Event import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner , TimestampAssigner , TimestampAssignerSupplier , Watermark , WatermarkGenerator , WatermarkGeneratorSupplier , WatermarkOutput , WatermarkStrategy }import org.apache.flink.streaming.api.scala._import java.time.Duration object WatermarkTest { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.getConfig.setAutoWatermarkInterval(500 L) val stream: DataStream [Event ] = env.fromElements( Event ("hkj" , "www.baidu.com" , 1000 L), Event ("Bob" , "www.bing.com" , 2000 L), Event ("Bob" , "www.bing.com" , 2000 L), Event ("Lisa" , "www.hkjcpdd.com" , 3000 L), Event ("Lisa" , "www.hkjcpdd.com" , 3000 L), Event ("Lisa" , "www.hkjcpdd.com" , 3000 L), Event ("hkj" , "www.by123.com" , 8000 L), Event ("Lisa" , "www.hkjmjj.com" , 1000 L) ) stream.assignTimestampsAndWatermarks(WatermarkStrategy .forMonotonousTimestamps[Event ]() .withTimestampAssigner( new SerializableTimestampAssigner [Event ] { override def extractTimestamp (element: Event , recordTimestamp: Long ): Long = element.timestamp } )) stream.assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness[Event ](Duration .ofSeconds(2 )) .withTimestampAssigner( new SerializableTimestampAssigner [Event ] { override def extractTimestamp (element: Event , recordTimestamp: Long ): Long = element.timestamp } )) stream.assignTimestampsAndWatermarks(new WatermarkStrategy [Event ] { override def createTimestampAssigner (context: TimestampAssignerSupplier .Context ): TimestampAssigner [Event ] = { new SerializableTimestampAssigner [Event ] { override def extractTimestamp (element: Event , recordTimestamp: Long ): Long = element.timestamp } } override def createWatermarkGenerator (context: WatermarkGeneratorSupplier .Context ): WatermarkGenerator [Event ] = { new WatermarkGenerator [Event ] { val delay = 5000 L var maxTs = Long .MinValue + delay + 1 override def onEvent (event: Event , eventTimestamp: Long , output: WatermarkOutput ): Unit = { maxTs = math.max(maxTs, event.timestamp) } override def onPeriodicEmit (output: WatermarkOutput ): Unit = { val watermark = new Watermark (maxTs - delay - 1 ) output.emitWatermark(watermark) } } } }) env.execute() } }
我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。 如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms 1 env.getConfig().setAutoWatermarkInterval(400 L);
2)断点式水位线生成器( Punctuated Generator)
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。 3)在数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下: 1 2 3 env.fromSource( kafkaSource, WatermarkStrategy .forBoundedOutOfOrderness(Duration .ofSeconds(3 )), "kafkasource" )
1.3.5 水位线的传递
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。 水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。 案例:1.3.4.3 中乱序流的watermark,将并行度设为2,观察现象。 在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。 用1.3.4.6中的自定义分区器,只输入奇数来模拟部分subtask无数据,代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package com.day03import com.day02.SourceBoundedTest .Event import org.apache.flink.api.common.eventtime._import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.time.Duration object WatermarkTest1 { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) env.getConfig.setAutoWatermarkInterval(500 L) val stream: DataStream [Event ] = env.socketTextStream("master" , 7777 ) .map(data => { val fields = data.split("," ) Event (fields(0 ).trim, fields(1 ).trim, fields(2 ).trim.toLong) }) stream.assignTimestampsAndWatermarks(WatermarkStrategy .forMonotonousTimestamps[Event ]() .withTimestampAssigner( new SerializableTimestampAssigner [Event ] { override def extractTimestamp (element: Event , recordTimestamp: Long ): Long = element.timestamp } )) stream.assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness[Event ](Duration .ofSeconds(5 )) .withTimestampAssigner( new SerializableTimestampAssigner [Event ] { override def extractTimestamp (element: Event , recordTimestamp: Long ): Long = element.timestamp } )) .keyBy(_.user) .window(SlidingProcessingTimeWindows .of(Time .seconds(10 ), Time .seconds(5 ))) .process( new WatermarkWindowResult ) .print() stream.assignTimestampsAndWatermarks(new WatermarkStrategy [Event ] { override def createTimestampAssigner (context: TimestampAssignerSupplier .Context ): TimestampAssigner [Event ] = { new SerializableTimestampAssigner [Event ] { override def extractTimestamp (element: Event , recordTimestamp: Long ): Long = element.timestamp } } override def createWatermarkGenerator (context: WatermarkGeneratorSupplier .Context ): WatermarkGenerator [Event ] = { new WatermarkGenerator [Event ] { val delay = 5000 L var maxTs = Long .MinValue + delay + 1 override def onEvent (event: Event , eventTimestamp: Long , output: WatermarkOutput ): Unit = { maxTs = math.max(maxTs, event.timestamp) } override def onPeriodicEmit (output: WatermarkOutput ): Unit = { val watermark = new Watermark (maxTs - delay - 1 ) output.emitWatermark(watermark) } } } }) env.execute() } class WatermarkWindowResult extends ProcessWindowFunction [Event , String , String , TimeWindow ] { override def process (user: String , context: Context , elements: Iterable [Event ], out: Collector [String ]): Unit = { val start = context.window.getStart val end = context.window.getEnd val count = elements.size val currentWatermark = context.currentWatermark out.collect(s"窗口 $start ~$end , 用户 $user 的活跃度为:$count , 水位线现在位于: $currentWatermark " ) } } }
1.3.6 迟到数据的处理
1.3.6.1 推迟水印推进 在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。 1 WatermarkStrategy .forBoundedOutOfOrderness(Duration .ofSeconds(10 ));
1.3.6.2 设置窗口延迟关闭 Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。 以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。 1 2 .window(TumblingEventTimeWindows .of(Time .seconds(5 ))) .allowedLateness(Time .seconds(3 ))
注意: 允许迟到只能运用在event time上 1 2 3 .windowAll(TumblingEventTimeWindows .of(Time .seconds(5 ))) .allowedLateness(Time .seconds(3 )) .sideOutputLateData(lateWS)
完整案例代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.day03import com.day02.ClickSource import com.day02.SourceBoundedTest .Event import com.day03.UrlViewCountExample .{UrlViewCountAgg , UrlViewCountResult }import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import java.time.Duration object ProcessLateDataExample { def main (args: Array [String ]): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(1 ) val stream = env.socketTextStream("master" , 7777 ) .map(data => { val fields = data.split("," ) Event (fields(0 ).trim, fields(1 ).trim, fields(2 ).trim.toLong) }) .assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness[Event ](Duration .ofSeconds(5 ))) .assignAscendingTimestamps(_.timestamp) val outputTag = OutputTag [Event ]("late-data" ) val result = stream.keyBy(_.url) .window(SlidingEventTimeWindows .of(Time .seconds(10 ), Time .seconds(5 ))) .allowedLateness(Time .minutes(1 )) .sideOutputLateData(outputTag) .aggregate(new UrlViewCountAgg , new UrlViewCountResult ) result.print("result" ) stream.print("input" ) result.getSideOutput(outputTag).print("late data" ) env.execute() } }