Flink快速上手

Flink快速上手

[!NOTE]

前提准备好相关maven环境和依赖

1.WordCount代码编写

需求:统计一段文字中,每个单词出现的频次。
环境准备:在src/main/java目录下,新建一个包,命名为com.atguigu.wc。

1.1 批处理

批处理基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

1)数据准备

  • (1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
  • (2)在words.txt中输入一些文字,例如:
1
2
3
hello flink
hello world
hello java

2)代码编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.day01

import org.apache.flink.api.scala._

object BatchWordCount {
def main(args: Array[String]): Unit = {
// 创建一个执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

// 读取文本文件数据
val lineData = env.readTextFile("G:\\Flink学习\\FlinkStu\\data\\words.txt")

// 对数据集进行转换处理
val wordAndOne = lineData.flatMap(_.split(" ")).map(word => (word, 1))

// 按照单词进行分组
val wordAndOneGroup = wordAndOne.groupBy(0)

// 对分组数据进行sum聚合统计
val sum = wordAndOneGroup.sum(1)

// 打印
sum.print()
}
}

(2)输出

1
2
3
4
(flink,1)
(world,1)
(hello,3)
(java,1)

[!NOTE]

需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSet API就没什么用了,在实际应用中我们只要维护一套DataStream API就可以。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。

1.2流处理

[!CAUTION]

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。

我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。整体思路与之前的批处理非常类似,代码模式也基本一致
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.day01

import org.apache.flink.streaming.api.scala._

object BoundedStreamWordCount {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 读取文本文件数据
val lineDataStream: DataStream[String] = env.readTextFile("G:\\Flink学习\\FlinkStu\\data\\words.txt")

// 对数据进行转换处理
val wordAndOne = lineDataStream.flatMap(_.split(" ")).map(word => (word, 1))

// 按照单词进行分组
val wordAndGroup = wordAndOne.keyBy(_._1)
// 对分组数据进行sum聚合操作
val sum = wordAndGroup.sum(1)

// 打印
sum.print()

// 执行任务
env.execute()
}
}

输出:

1
2
3
4
5
6
3> (java,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)
13> (flink,1)
9> (world,1)

主要观察与批处理程序BatchWordCount的不同:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment
  • 转换处理之后,得到的数据对象类型不同
  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么
  • 代码末尾需要调用env的execute方法,开始执行任务

2)读取socket文本流

[!NOTE]

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据。为了模拟这种场景,可以监听socket端口,然后向该端口不断的发送数据。

(1)将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。具体代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.day01

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object StreamWordCOunt {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val parameterTool = ParameterTool.fromArgs(args)
val hostname = parameterTool.get("host")
val port = parameterTool.getInt("port")

val lineDataStream = env.socketTextStream(hostname, port)

val wordAndOne = lineDataStream.flatMap(_.split(" ")).map(word => (word, 1))

val lineAndOneGroup = wordAndOne.keyBy(_._1)

val sum = lineAndOneGroup.sum(1)

sum.print()

env.execute()
}
}

(2)在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试

1
[atguigu@hadoop102 ~]$ nc -lk 7777

[!IMPORTANT]

注意:要先启动端口,后启动StreamWordCount程序,否则会报超时连接异常。

(3)启动StreamWordCount程序

[!WARNING]

我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。

(4)从hadoop102发送数据
①在hadoop102主机中,输入“hello flink”,输出如下内容
1
2
13> (flink,1)
5> (hello,1)
②再输入“hello world”,输出如下内容
1
2
2> (world,1)
5> (hello,2)

[!NOTE]

Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。