Flink中的时间和窗口、水位线、

在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。
所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。

1.1 窗口(Window)

1.1 窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。

6.1.2 窗口的分类

我们在上一节举的例子,其实是最为简单的一种时间窗口。在Flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度,对Flink中内置的窗口做一个分类说明。

1)按照驱动类型分

2)按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

image-20250115103117635

1..1.3 窗口API概览

1)按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。

(1)按键分区窗口(Keyed Windows)
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
1
2
stream.keyBy(...)
.window(...)
(2)非按键分区(Non-Keyed Windows)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。
在代码中,直接基于DataStream调用.windowAll()定义窗口。
1
stream.windowAll(...)
注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
2)代码中窗口API的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
1
2
3
stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)
其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。

1.1.4 窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。
窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。
窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

1.1.4.1 时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。
(1)滚动处理时间窗口
窗口分配器由类TumblingProcessingTimeWindows提供,需要调用它的静态方法.of()。
1
2
3
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)
这里.of()方法需要传入一个Time类型的参数size,表示滚动窗口的大小,我们这里创建了一个长度为5秒的滚动窗口。
另外,.of()还有一个重载方法,可以传入两个Time类型的参数:size和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

(2)滑动处理时间窗口

窗口分配器由类SlidingProcessingTimeWindows提供,同样需要调用它的静态方法.of()。
1
2
3
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(...)
这里.of()方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

(3)处理时间会话窗口

窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。
1
2
3
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
这里.withGap()方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。
另外,还可以调用withDynamicGap()方法定义session gap的动态提取逻辑。

(4)滚动事件时间窗口

窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致。
1
2
3
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)

(5)滑动事件时间窗口

窗口分配器由类SlidingEventTimeWindows提供,用法与滑动处理事件窗口完全一致。
1
2
3
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(...)

(6)事件时间会话窗口

窗口分配器由类EventTimeSessionWindows提供,用法与处理事件会话窗口完全一致。
1
2
3
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

1.1.4.2 计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。

(1)滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。

1
2
stream.keyBy(...)
.countWindow(10)

我们定义了一个长度为10的滚动计数窗口,当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口。

(2)滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
1
2
stream.keyBy(...)
.countWindow(103)
我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据,每隔3个数据就统计输出一次结果。

3)全局窗口

全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由GlobalWindows类提供。
1
2
stream.keyBy(...)
.window(GlobalWindows.create());
需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

1.1.5 窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。下面我们来进行分别讲解。

1.1.5.1 增量聚合函数(ReduceFunction / AggregateFunction)

窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。
1)归约函数(ReduceFunction)
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class WindowReduceDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction())
.keyBy(r -> r.getId())
// 设置滚动事件时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<WaterSensor>() {

@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("调用reduce方法,之前的结果:"+value1 + ",现在来的数据:"+value2);
return new WaterSensor(value1.getId(), System.currentTimeMillis(),value1.getVc()+value2.getVc());
}
})
.print();

env.execute();
}
}

2)聚合函数(AggregateFunction)

ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
接口中有四个方法:
  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.day03

import com.day02.ClickSource
import com.day02.SourceBoundedTest.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object AggregateFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.addSource(new ClickSource)
.assignAscendingTimestamps(_.timestamp)

// 统计pv 和 uv 输出pv/uv的值
stream.keyBy(data => true) // 这里就是让所有数据进入同一个分组
.window( SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
.aggregate( new PvUv )
.print()

env.execute()
}

// 实现自定义聚合函数, 用二元组(LOng, Set)表示中间聚合的(pv, uv)状态
class PvUv extends AggregateFunction[Event, (Long, Set[String]), Double] {
override def createAccumulator(): (Long, Set[String]) = (0L, Set[String]())

// 每来一条数据,都回进行add叠加聚合
override def add(value: Event, accumulator: (Long, Set[String])): (Long, Set[String]) = (accumulator._1 + 1, accumulator._2 + value.user)

// 返回最终的计算结果
override def getResult(accumulator: (Long, Set[String])): Double = accumulator._1.toDouble /accumulator._2.size

override def merge(a: (Long, Set[String]), b: (Long, Set[String])): (Long, Set[String]) = ???
}

}
另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。

1.1.5.2 全窗口函数(full window functions)

有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。

所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。

1)窗口函数(WindowFunction)
WindowFunction字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。
1
2
3
4
stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。
不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。

2)处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。
事实上,ProcessWindowFunction是Flink底层API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。
代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.day03

import com.day02.ClickSource
import com.day02.SourceBoundedTest.Event
import com.day03.AggregateFunctionTest.PvUv
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object FullWindowFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.addSource( new ClickSource )
.assignAscendingTimestamps(_.timestamp)

// pv是指页面被浏览的总次数,每次用户打开或刷新一个页面,都会增加一次Pv
// uv是指一定时间内访问网站独立用户数,同一用户多次访问记为一次uv
// 测试全窗口函数, 统计uv
stream.keyBy(data => "key")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UvCountByWindow )
.print()


env.execute()
}

// 自定义实现ProcessWindowFunction
class UvCountByWindow extends ProcessWindowFunction[Event, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
// 使用过一个Set进行去重操作
var userSet = Set[String]()

// 从elements中提取所有数据,一次放入set中去重
elements.foreach(userSet += _.user)
val uv = userSet.size
// 提取窗口信息包装String输出
val windowEnd = context.window.getEnd
val windowsStart = context.window.getStart

out.collect(s"窗口 $windowsStart~$windowEnd 的uv值为: $uv")
}
}
}

1.1.5.3 增量聚合和全窗口函数的结合使用

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.day03

import com.day02.ClickSource
import com.day02.SourceBoundedTest.Event
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object UrlViewCountExample {
// 定义统计输出结果的数据结构
case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.addSource( new ClickSource )
.assignAscendingTimestamps(_.timestamp)

// 结合使用增量聚合函数和全窗口函数,包装统计信息
stream.keyBy(_.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlViewCountAgg, new UrlViewCountResult)
.print

env.execute()
}

// 实现增强聚合函数
class UrlViewCountAgg extends AggregateFunction[Event, Long, Long] {
override def createAccumulator(): Long = 0L

override def add(value: Event, accumulator: Long): Long = accumulator + 1

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = ???
}

// 实现全窗口函数
class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
override def process(url: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
// 提取需要的数据
val count = elements.iterator.next()
val start = context.window.getStart
val end = context.window.getEnd

out.collect(UrlViewCount(url, count, start, end))
}
}
}

1.1.6 其他API

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink还提供了其他一些可选的API,让我们可以更加灵活地控制窗口行为。

1.1.6.1 触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
1
2
3
stream.keyBy(...)
.window(...)
.trigger(new MyTrigger())

1.1.6.2 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
1
2
3
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())

1.2 时间语义

1**.2.1 Flink中的时间语义**

1.2.2 哪种时间语义更重要

1)从《星球大战》说起

为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。

如上图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。
2)数据处理系统中的时间语义
在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

1.3 水位线(Watermark

1.3.1 事件时间和窗口

1.3.2 什么是水位线

1.3.3 水位线和窗口的工作原理

[!CAUTION]

注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。

1.3.4 生成水位线

1.3.4.1 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

1.3.4.2 水位线生成策略

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:
1
2
3
4
DataStream<Event> stream = env.addSource(new ClickSource());

DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(<watermark strategy>);
说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
1
2
3
4
5
6
7
8
// 1.有序流的水位线生成策略 forMonotonousTimestamps
stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[Event]()
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
// 提取时间戳的方法
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
))

1.3.4.3 Flink内置水位线

1)有序流中内置水位线设置

对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

2)乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

.3.4.4 自定义水位线生成器

1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义、有序流、无序流周期性生成水位线的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.day03

import com.day02.SourceBoundedTest.Event
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._

import java.time.Duration

object WatermarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 自动生成水位线的周期时间间隔
env.getConfig.setAutoWatermarkInterval(500L)

val stream: DataStream[Event] = env.fromElements(
Event("hkj", "www.baidu.com", 1000L),
Event("Bob", "www.bing.com", 2000L),
Event("Bob", "www.bing.com", 2000L),
Event("Lisa", "www.hkjcpdd.com", 3000L),
Event("Lisa", "www.hkjcpdd.com", 3000L),
Event("Lisa", "www.hkjcpdd.com", 3000L),
Event("hkj", "www.by123.com", 8000L),
Event("Lisa", "www.hkjmjj.com", 1000L)
)

// 1.有序流的水位线生成策略 forMonotonousTimestamps
stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[Event]()
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
// 提取时间戳的方法
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
))

// 2.乱序流的水位线生成策略 forBoundedOutOfOrderness
stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
))


// 水位线的设置有周期性和事件性,周期性就是隔一段时间检查一次,事件性就是来一个事件就检测一次
// 自定义水位线 双new
stream.assignTimestampsAndWatermarks(new WatermarkStrategy[Event] {
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Event] = {
new SerializableTimestampAssigner[Event] {
// 如果event中没有时间戳字段那就是,得替换掉后面的逻辑,换成自己的逻辑比如:System.currentTimeMillis() 使用当前系统时间
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
}

override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Event] = {
new WatermarkGenerator[Event] {
// 定义一个延迟时间
val delay = 5000L
// 定义属性保存最大时间戳 就算是0的时间来了也能正常处理
var maxTs = Long.MinValue + delay + 1

// 每来一条数据进行调用,有了这个下面那个周期性的就没用了
override def onEvent(event: Event, eventTimestamp: Long, output: WatermarkOutput): Unit = {
// 更新最大时间戳
maxTs = math.max(maxTs, event.timestamp)
}

// 按照周期性进行调用
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
val watermark = new Watermark(maxTs - delay - 1)
output.emitWatermark(watermark)
}
}
}
})

env.execute()
}
}
我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。
如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms
1
env.getConfig().setAutoWatermarkInterval(400L);

2)断点式水位线生成器(Punctuated Generator

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。

3)在数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:
1
2
3
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

1.3.5 水位线的传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
案例:1.3.4.3 中乱序流的watermark,将并行度设为2,观察现象。
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。
用1.3.4.6中的自定义分区器,只输入奇数来模拟部分subtask无数据,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.day03

import com.day02.SourceBoundedTest.Event
import org.apache.flink.api.common.eventtime._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.time.Duration

object WatermarkTest1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 自动生成水位线的周期时间间隔
env.getConfig.setAutoWatermarkInterval(500L)

val stream: DataStream[Event] = env.socketTextStream("master", 7777)
.map(data => {
val fields = data.split(",")
Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
})

// 1.有序流的水位线生成策略 forMonotonousTimestamps
stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[Event]()
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
// 提取时间戳的方法
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
))

// 2.乱序流的水位线生成策略 forBoundedOutOfOrderness
// 这个5指的是最大延迟时间
stream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
))
.keyBy(_.user)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process( new WatermarkWindowResult )
.print()


// 水位线的设置有周期性和事件性,周期性就是隔一段时间检查一次,事件性就是来一个事件就检测一次
// 自定义水位线 双new
stream.assignTimestampsAndWatermarks(new WatermarkStrategy[Event] {
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[Event] = {
new SerializableTimestampAssigner[Event] {
// 如果event中没有时间戳字段那就是,得替换掉后面的逻辑,换成自己的逻辑比如:System.currentTimeMillis() 使用当前系统时间
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
}
}

override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Event] = {
new WatermarkGenerator[Event] {
// 定义一个延迟时间
val delay = 5000L
// 定义属性保存最大时间戳 就算是0的时间来了也能正常处理
var maxTs = Long.MinValue + delay + 1

// 每来一条数据进行调用,有了这个下面那个周期性的就没用了
override def onEvent(event: Event, eventTimestamp: Long, output: WatermarkOutput): Unit = {
// 更新最大时间戳
maxTs = math.max(maxTs, event.timestamp)
}

// 按照周期性进行调用
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
val watermark = new Watermark(maxTs - delay - 1)
output.emitWatermark(watermark)
}
}
}
})

env.execute()
}

// 实现自定义全窗口函数
class WatermarkWindowResult extends ProcessWindowFunction[Event, String, String, TimeWindow] {
override def process(user: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
// 提取信息
val start = context.window.getStart
val end = context.window.getEnd
val count = elements.size

// 增加水位线信息
val currentWatermark = context.currentWatermark

out.collect(s"窗口 $start~$end, 用户 $user 的活跃度为:$count, 水位线现在位于: $currentWatermark")
}
}
}

1.3.6 迟到数据的处理

1.3.6.1 推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
1
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
1.3.6.2 设置窗口延迟关闭
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。
以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
1
2
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
注意:
允许迟到只能运用在event time上
1
2
3
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)
完整案例代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.day03

import com.day02.ClickSource
import com.day02.SourceBoundedTest.Event
import com.day03.UrlViewCountExample.{UrlViewCountAgg, UrlViewCountResult}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

import java.time.Duration

object ProcessLateDataExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream = env.socketTextStream("master", 7777)
.map(data => {
val fields = data.split(",")
Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
})
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5)))
.assignAscendingTimestamps(_.timestamp)

// 定义一个侧输出流的标签,泛型就是你要放什么数据你就要改成什么
val outputTag = OutputTag[Event]("late-data")

val result = stream.keyBy(_.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
// 指定窗口允许等待的时间
.allowedLateness(Time.minutes(1))
// 将迟到数据输出到侧输出流
.sideOutputLateData(outputTag)
.aggregate(new UrlViewCountAgg, new UrlViewCountResult)
result.print("result")

stream.print("input")

// 将侧输出流的数据进行打印输出
result.getSideOutput(outputTag).print("late data")

env.execute()
}
}