Flink集群部署搭建

Flink1.14.0集群部署

1
2
3
4
5
# 修改conf目录下的workers将三台主机名写进去
# 修改master为master:8081
# 修改flink-conf.yaml文件
# 将jobmanager.rpc.address的主机名改成master就好了
# 分发到其他主机,然后就可以启动了,start-cluster.sh启动 web端为8081

Flink1.17.0集群部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 修改conf目录下的workers将三台主机名写进去
# 修改master为master:8081
# 修改flink-conf.yaml文件
# JobManager节点地址.
jobmanager.rpc.address: master
jobmanager.bind-host: 0.0.0.0
rest.address: master
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master

# 分发到其他主机
# 修改slave1的 taskmanager.host
taskmanager.host: slave1
# 修改slave2的 taskmanager.host
taskmanager.host: slave2
# 然后就可以启动了,start-cluster.sh启动 web端为8081

Flink1.14.0高可用部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 前面正常部署,后面在high中新增
#开启HA,使用文件系统作为快照存储
state.backend: filesystem (选填)

#启用检查点,可以将快照保存到HDFS (选填)
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints

#使用zookeeper搭建高可用 (必填)
high-availability: zookeeper

# 存储JobManager的元数据到HDFS (必填)
high-availability.storageDir: hdfs://node1:8020/flink/ha/

# 配置ZK集群地址(必填)
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

# 指定flink在zokkeper的位置(选填)
high-availability.zookeeper.path.root: /flink

然后将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar复制到/root/software/flink/lib中
# 然后scp最后就可以启动了

Flink On Yarn

1.flink的Yarn部署模式分为三种方式:一种是Application Mode, 一种是Per-lob Mode,一种是Session Mode

1
2
3
4
配置环境变量,增加环境变量配置如下:
vi /etc/profile
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

2 会话模式部署

YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。具体步骤如下:

(1)启动集群

  • (1)启动Hadoop集群(HDFS、YARN)。

  • (2)执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。

1
2
3
4
5
6
7
bin/yarn-session.sh -nm test

-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
-jm(–jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(–name):配置在YARN UI界面上显示的任务名。
-qu(–queue):指定YARN队列名。
-tm(–taskManager):配置每个TaskManager所使用内存。
YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,如下所示,用户可以通过Web UI或者命令行两种方式提交作业。
1
2
2022-11-17 15:20:52,711 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hadoop104:40825 of application 'application_1668668287070_0005'.
JobManager Web Interface: http://hadoop104:40825

(2)通过命令行提交作业

  • ① 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群。
  • ② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。
1
bin/flink run -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar

3 单作业模式部署

在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群
(1)执行命令提交作业
1
bin/flink run -d -t yarn-per-job -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
注意:如果启动过程中报如下异常
1
2
Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
1
2
3
[atguigu@hadoop102 conf]$ vim flink-conf.yaml

classloader.check-leaked-classloader: false
(2)可以使用命令行查看或取消作业,命令如下。
1
2
3
[atguigu@hadoop102 flink-1.17.0]$ bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

[atguigu@hadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

4 应用模式部署

1
bin/flink run-application -t yarn-application -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar 
(2)在命令行中查看或取消作业。
1
2
3
[atguigu@hadoop102 flink-1.17.0]$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

[atguigu@hadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

2)上传HDFS提交

可以通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程。
(1)上传flink的lib和plugins到HDFS上
1
2
3
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -mkdir /flink-dist
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -put lib/ /flink-dist
[atguigu@hadoop102 flink-1.17.0]$ hadoop fs -put plugins/ /flink-dist
(2)提交作业
1
bin/flink run-application -t yarn-application    -Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flink-dist"    -c com.atguigu.wc.SocketStreamWordCount  hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar