博客 Flink简介、快速入门、部署、集群(下)

Flink简介、快速入门、部署、集群(下)

   数栈君   发表于 2024-12-26 13:53  312  0

四、Flink部署及启动

4.1、Flink的执行逻辑

Flink提交作业和执行任务,需要几个关键组件:

1.客户端(client):代码由客户端获取并做转换,之后提交给JobManager

2.JobManager 就是 Flink集群里面的“管事人”,对作业进行中央调度管理;而他获取到要执行的作业后,会进一步处理转换,然后分布任务给众多的TaskManager;

3.TaskManager,就是真正“干活的人”,数据的处理操作 都是他们来做的;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f72c115015ff1bf7bbad08bc81eaabe3..png

4.2、Flink的安装模式

Flink支持多种安装模式:

1.local-本地安装:单机模式,一般不使用;

2.standalone-独立模式:Flink自带集群,开发测试环境使用;

3.yarn:计算资源统一由Hadoop YARN管理,生产环境使用;

4.2.1、准备工作:

Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行
Flink 安装部署的学习时,需要准备 3 Linux 机器。
具体要求如下:
系统环境为 CentOS 7.5 版本(也可以是Ubuntu)
安装 Java 8

4.2.1.1 Flink和Java下载安装

安装Flink:

官网:https://flink.apache.org/downloads/

清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

1.可以从官网下载Flink的安装包,如下1.20.0版本的;

2.下载完后可以移动到自己想存储的位置

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/aca9caac425ea444bda3e3dea9e38f20..png

3.解压安装包
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/759525907080c1be38449f7a52fffd23..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/458bb573119b6cf43b46e089b82cf5ba..png

4.启动flink

进入到解压目录下,执行以下脚本:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f344ece779a13fcad714f778b5388a53..png

发现报错,是因为没有安装jdk,这里我们安装jdk8;

5.安装jdk

网址:https://www.oracle.com/java/technologies/downloads/#java8
jdk安装:
ubuntu中自带了jdk,先将其卸载:sudo apt-get remove openjdk
sudo apt-get autoremove

上传安装包到自指定路径:/usr/local/jdk

解压安装包:tar -zxvf jdk-8u291-linux-x64.tar -C /usr/local/myapp/jdk

配置环境变量:sudo vim /etc/profile
ESC然后再 : + shfit 输入wq,保存退出
在文末增加配置(路径根据实际情况调整):
export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_291
export JRE_HOME=/usr/local/myapp/jdk/jdk1.8.0_291/jre

export PATH=P A T H : PATH:PATH:JAVA_HOME/bin:J R E H O M E / b i n e x p o r t C L A S S P A T H = JRE_HOME/bin export CLASSPATH=JRE
H

OME/binexportCLASSPATH=CLASSPATH:J A V A H O M E / l i b : JAVA_HOME/lib:JAVA
H

OME/lib:JRE_HOME/lib

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a50dcc8374e986e9750f2340266503a7..png

测试jdk:java -version 或者 javac -version
root@vm1:/usr/local/myapp/jdk# java -version
java version “1.8.0_291”
Java™ SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot™ 64-Bit Server VM (build 25.291-b10, mixed mode)
root@vm1:/usr/local/myapp/jdk# javac -version
javac 1.8.0_291

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e9dbd90f5ab948421993ce9cb10c5dc2..png

6. Flink安装包然后解压到指定目录,注意修改所属用户和用户组

#.改名
mv flink-1.7.2 flink
#.赋予权限
chown -R root:root flink

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c79dc16ff124bb6fe931cc9a40c81391..png

7.再次启动

关闭防火墙:sudo ufw disable

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/229bb7bd4995cd6fab1050c85c66d2a6..png

启动成功,通过jps查看服务信息:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a2b1a6d211752975aaee8fe6c66f7203..png

StandaloneSessionClusterEntrypoint为Flink主进程,即JobManager;TaskManagerRunner为Flink从进程,即TaskManager

8.页面查看
单机启动,自己既是jobmanager,也是taskmanager

在浏览器中访问服务器8081端口即可查看Flink的WebUI,
比如http://localhost:8081/,从WebUI中可以看出,当前本地模式的Task Slot数量和TaskManager数量。访问结果如下图所示:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6537aa130a10c9bdcd9ba9b4d522223c..png

9.停止flink
./bin/stop-cluster.sh
刷新页面会报错

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6f35d75d2a79fd0cc3088f0ed9d81e68..png

10.可以通过观察logs目录下的日志来检测系统是否正在运行了
tail log/flink–jobmanager-.log

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/82af5f2c71486bbd8f162b37bde50ea9..png

4.2.2、单节点部署

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/7acd8969ca62c2a934735d7c0cb08ba3..png

4.2.3、standalone安装模式

我这边是给Flink003为Master机器(JobManager),其他的为Slave机器(TaskManager)

4.2.3.1 ssh配置

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9d20fc4c5da00c3afe34ebe70fa6605e..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3616c9efc19aeb8393bfedf3c6f05480..png

集群的服务器之间配置好ssh免密登录,避免后续搭建出现麻烦,这一步一定要做。简单步骤如下:

  1. 先进入root用户权限 sudo -i,且修改root用户的密码 输入passwd

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c5ce23ddf7da68fc7d9151478e8fea01..png

2. 查看服务器是否安装有ssh服务

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ba6f34e309a58de7da53297c325058d1..png

3.生成密钥和公钥;和在master机器执行ssh-keygen -t rsa
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d6d59e9463fdf9ad419f0326a769533e..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/1f160eaf136ede9c056375918e47c7bb..png

4.把公钥复制到需要免密登录的从服务器上;在master机器执行命令,将密钥拷贝到其余服务器ssh-copy-id -i /root/.ssh/id_rsa.pub 目标服务器IP (报错的化看下图的原因解释)

我们执行ssh-copy-id 报错,说连接被拒,可以进行排错

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/94f044a14b7b58e1ec3e0b172f96f7de..png

确认目标主机的SSH服务是否运行:sudo systemctl status ssh,如果未运行执行:sudo systemctl start ssh;
![![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/07bd7889c044401593383cad3875c95b.png)
那就是ssh服务没有被安装,需要安装,ubuntu上检查安装状态输入 sudo apt-listfiles | grep openssh-server,
在使用sudo apt-get install openssh-server安装;

检查SSH配置,确保SSH服务的配置文件 /etc/ssh/sshd_config正确,特别是监听的端口设置是Port 22,如果使用了自定义端口,确保该端口正确配置并且未被防火墙阻挡。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a485d8075e206f8812dcd928573bd4f5..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6592e1c759e98e41cbc670d03f63a4a8..png

可以之后再试ssh-copy-id ;

在继续ssh-copy-id ,输入密码;

验证免密登录。运行以下命令连接到远程服务器:ssh username@remote_server_ip。如果成功连接而无需输入密码,则表示设置成功

要退出ssh连接输入exit就可以了;

4.2.3.2 修改Flink配置,实现基础的集群搭建
修改flink配置文件:
进入Flink的配置文件,conf目录下的flink-conf.yaml 或者cong.yaml: vim conf.yaml
将文件中的jobmanager.rpc.address属性进行修改为JobManager机器也就是主机的ip地址,
jobmanager.rpc.address: 主机ip地址
jobmanager.bind-host: 0.0.0.0

2.修改workers文件
vim conf/workers
workers文件必须包含所有需要启动的TaskManager节点的主机名,且每个主机名占一行。在JobManager服务器,执行以下操作
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/306dcf0b17cbbf30ce0cd8db3cd9ff4c..png


3. 复制Flink安装文件到其他服务器

其他从服务器不需要下载flink的安装包,必须从主scp到从,因为有路径要求,才能关联起来;

scp -r /usr/local/flink/ root@从服务器1的ip地址:/usr/local/flink/
scp -r /usr/local/flink/ root@从服务器2的ip地址:/usr/local/flink/

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/02c08a75531e74f558f28160af53ee32..png
 
4.集群启动
这里只需要在主服务器上,也就是JobManager的服务器上启动Flink,从服务也会跟着启动;
在JobManager节点上进入Flink安装目录,执行以下命令启动Flink集群:

启动完毕后,在集群各服务器上通过jsp命令查看Java进程。若各节点存在以下进程,则说明集群启动成功:
JobManager节点:StandaloneSessionClusterEntrypoint
TaskManager1节点:TaskManagerRunner
TaskManager2节点:TaskManagerRunner

我们输入从机的密码,可以看到提醒我们启动了从机的flink(这里我只弄了一台机器从机),下图是主机的命令窗口,输入jps可以看到作为JobManager的StanaloneSessionClusxxxx启动了

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/80300d09da60aa89e5d88325621f10e6..png

从机命令窗口输入jps可以看到TaskManager启动成功;
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/87e96cac4f423410c4b7ac996b47e98f..png

尝试提交一个简单任务,如果任务正常执行完毕,则集群一切正常。提交Flink自带的简单任务如下:
./bin/flink run examples/streaming/WordCount.jar

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/cc84c29230ea6e0905bbb8865bf40b85..png

5.查看webUI
通过JobManager节点访问WebUI(http://localhost:8081/),可以看到此时是1个JobManager,1个TaskManager,也能以上执行完毕的任务,如下图:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6da6cf022ea05d0f5becf388267fe3e3..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/dca1582c93b4313d7e4b25315c48012e..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c7a08e6170045e6834de9b8b0ac91942..png

4.2.4 进阶版集群-Flink Standalone HA搭建
在Flink Standalone模式下,实现HA的方式可以利用ZooKeeper在所有正在运行的JobManager实例之间进行分布式协调,实现多个JobManager无缝切换。Flink Standalone模式的HA架构如图:

简单来说就是 使用 ZooKeeper 来实现JobManager无缝切换,从而达到分布式协调;

注意:Flink内置了Zookeeper服务和相关脚本文件,如果你的集群中没有安装Zookeeper,则可以通过修改zoo.cfg文件配置Flink内置的Zookeeper。生产环境建议使用独立的外部zookeeper;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/36dd32470ab153fc1151ba786b71a638..png

HA的核心就是:可以在集群中启动多个JobManager,并使它们都向ZooKeeper进行注册,ZooKeeper利用自身的选举机制保证同一时间只有一个JobManager是活动状态(Active)的,其他的都是备用状态(Standby)。当活动状态的JobManager出现故障时,ZooKeeper会从其他备用状态的JobManager选出一个成为活动JobManager。流程见下图:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6388fd2930f97e6a912dbee54a067f01..png

此外,活动状态的JobManager在工作时会将其元数据(JobGraph、应用程序JAR文件等)写入一个远程持久化存储系统(例如HDFS)中,还会将元数据存储的位置和路径信息写入ZooKeeper存储,以便能够进行故障恢复,如图下图所示:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e1c3a8a29b8d660322f0ff09568bf4b7..png

操作起来!!!!!
首先,我们还是在上面的操作的三个服务器进行操作;
目前是一个jobmanager(Flink003机器),两个taskmanager(Flink001和Flink002机器);
现在我们需要变成两个jobmanager,一个taskmanager;

1.修改masters文件
Flink的masters文件用于配置所有需要启动的JobManager节点以及每个JobManager的WebUI绑定的端口。
进入Flink安装目录,修改conf/masters文件,修改内容如下:

Flink003机器的ip地址:8081
Flink002机器的ip地址:8082

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/95f42d07c8cfeb084b535d6009b94c9d..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c96173bcb6f94e145724bcc07840e62b..png

上述配置表示在集群 Flink003机器 和 Flink002机器节点上启动JobManager,并且每个JobManager的WebUI访问端口分别为8081和8082。

2.修改flink-conf.yaml文件设置高可用模式
进入Flink003机器 节点的Flink安装主目录,修改conf/flink-conf.yaml文件,添加以下内容:

# 将高可用模式设置为ZooKeeper,默认集群不会开启高可用状态
high-availability: zookeeper
# ZooKeeper集群主机名(或IP)与端口列表,多个以逗号分隔
high-availability.zookeeper.quorum: centos01:2181,centos02:2181,centos03:2181
# 用于持久化JobManager元数据(JobGraph、应用程序JAR文件等)的HDFS地址,以便进行故障恢复,ZooKeeper上存储的只是元数据所在的位置路径信息
high-availability.storageDir: /data/software/flink-15.4/ha
# 获取storageDir也可用hdfs,如果使用hdfs的话,则需要单独安装hdfs,本文暂不使用
#high-availability.storageDir: hdfs://centos01:9000/flink/recovery

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0bc7e533902e99b3ed698b6bde391ae2..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/df88db67de435b6472dedac2899222ff..png

改动如图下
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8ed7bf72980e1b598da87ee53210fa09..png

  1. 修改zoo.cfg文件
    Flink内置了ZooKeeper服务和相关脚本文件,如果你的集群中没有安装ZooKeeper,则可以通过修改zoo.cfg文件配置Flink内置的ZooKeeper。生产环境建议使用独立的外部ZooKeeper。

进入centos01节点的Flink安装主目录,修改conf/zoo.cfg文件,添加以下内容,配置ZooKeeper启动节点与选举相关端口:

server.1=centos01:2888:3888
server.2=centos02:2888:3888
server.3=centos03:2888:388

上述配置表示在centos01、centos02和centos03节点上启动ZooKeeper服务,其中1、2、3表示每个ZooKeeper服务器的唯一ID。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d92491fa15a5e908ab4c2566db690483..png

  1. 复制Flink安装文件到其他节点
    继续采用scp命令,复制centos01的文件到其他节点,scp命令会把相同文件覆盖。
scp -r /usr/local/flink/ root@从服务器1的ip地址:/usr/local/flink/
scp -r /usr/local/flink/ root@从服务器2的ip地址:/usr/local/flink/
  1. 启动ZooKeeper集群
    如果使用Flink内置的ZooKeeper,在Flink003机器节点执行以下命令,即可启动整个ZooKeeper集群:
    …/bin/start-zookeeper-quorum.sh

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6b7d9bf211647798bd05910a40672fe7..png

启动成功后,在每个Flink节点上都会产生一个名为FlinkZooKeeperQuorumPeer的进程,该进程是ZooKeeper服务的守护进程。使用jps可以查看到如下进程:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/7a37571b1181a7e7bf7df8961f2c7432..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/1565ba333978a40a9d3d9850ee1823b3..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9c5ab8e20e429a406fdb818932a27453..png

注意:这里本级需要使用localhost
6. Flink003机器节点上执行以下命令,启动Flink Standalone HA集群:

要先启动zookeeper再启动flink

bin/start-cluster.sh

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/6c43ddc542cc78c6bc477391fd93097c..png

Flink003主机 jps:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d8d2eaf5e33d08b8a57b5f4d6c75b653..png

Flink002和Flink001机器 jps:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0b74353651144a235a7bd891045564af..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/71e36859f95f879e5da7c96bff2128f9..png

  1. 访问webUI
    以前只有Flink003可以访问webui页面也就是Flink的Dashboard,现在是Flink003和Flink002都可以访问

假如方向访问不了,说明配置不对,重新查看自己的配置,要注意改完Flink003的后要从新scp复制到从服务器上;

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d46d52464bba78d87bbbe037ef9dc556..png

8.测试
在提交一个测试,如果能正常执行,说明整个集群正常。

./bin/flink run examples/streaming/WordCount.jar

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9ef1eccd43dfa31c92cd715b13d86ff9..png

9.停止集群

若要停止Flink Standalone HA集群,在jobmanager节点上首先执行以下命令停止整个Flink集群,我这里是flink003:

./stop-zookeeper-quorum.sh

4.2.5、Yarn安装模式
完成上述的Flink和jdk的安装后;

进入Flink的conf目录,依据Flink的版本进行修改flink-conf.yaml或者conf.yaml文件

bind-host 就是0.0.0.0
rpc.address指的是从机的ip地址

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b3d385aaa3d6ef9a15d0b8922a60e083..png

————————————————

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群