1.客户端(client):代码由客户端获取并做转换,之后提交给JobManager
2.JobManager 就是 Flink集群里面的“管事人”,对作业进行中央调度管理;而他获取到要执行的作业后,会进一步处理转换,然后分布任务给众多的TaskManager;
3.TaskManager,就是真正“干活的人”,数据的处理操作 都是他们来做的;
1.local-本地安装:单机模式,一般不使用;
2.standalone-独立模式:Flink自带集群,开发测试环境使用;
3.yarn:计算资源统一由Hadoop YARN管理,生产环境使用;
4.2.1、准备工作:
Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行
Flink 安装部署的学习时,需要准备 3 台 Linux 机器。
具体要求如下:
系统环境为 CentOS 7.5 版本(也可以是Ubuntu)
安装 Java 8。
4.2.1.1 Flink和Java下载安装
安装Flink:
官网:https://flink.apache.org/downloads/
清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/
1.可以从官网下载Flink的安装包,如下1.20.0版本的;
2.下载完后可以移动到自己想存储的位置
4.启动flink
进入到解压目录下,执行以下脚本:
发现报错,是因为没有安装jdk,这里我们安装jdk8;
5.安装jdk
网址:https://www.oracle.com/java/technologies/downloads/#java8
jdk安装:
ubuntu中自带了jdk,先将其卸载:sudo apt-get remove openjdk
sudo apt-get autoremove
上传安装包到自指定路径:/usr/local/jdk
解压安装包:tar -zxvf jdk-8u291-linux-x64.tar -C /usr/local/myapp/jdk
配置环境变量:sudo vim /etc/profile
ESC然后再 : + shfit 输入wq,保存退出
在文末增加配置(路径根据实际情况调整):
export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_291
export JRE_HOME=/usr/local/myapp/jdk/jdk1.8.0_291/jre
export PATH=P A T H : PATH:PATH:JAVA_HOME/bin:J R E H O M E / b i n e x p o r t C L A S S P A T H = JRE_HOME/bin export CLASSPATH=JRE
H
OME/binexportCLASSPATH=CLASSPATH:J A V A H O M E / l i b : JAVA_HOME/lib:JAVA
H
OME/lib:JRE_HOME/lib
测试jdk:java -version 或者 javac -version
root@vm1:/usr/local/myapp/jdk# java -version
java version “1.8.0_291”
Java™ SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot™ 64-Bit Server VM (build 25.291-b10, mixed mode)
root@vm1:/usr/local/myapp/jdk# javac -version
javac 1.8.0_291
6. Flink安装包然后解压到指定目录,注意修改所属用户和用户组
#.改名
mv flink-1.7.2 flink
#.赋予权限
chown -R root:root flink
7.再次启动
关闭防火墙:sudo ufw disable
StandaloneSessionClusterEntrypoint为Flink主进程,即JobManager;TaskManagerRunner为Flink从进程,即TaskManager
8.页面查看
单机启动,自己既是jobmanager,也是taskmanager
在浏览器中访问服务器8081端口即可查看Flink的WebUI,
比如http://localhost:8081/,从WebUI中可以看出,当前本地模式的Task Slot数量和TaskManager数量。访问结果如下图所示:
9.停止flink
./bin/stop-cluster.sh
刷新页面会报错
10.可以通过观察logs目录下的日志来检测系统是否正在运行了
tail log/flink–jobmanager-.log
我这边是给Flink003为Master机器(JobManager),其他的为Slave机器(TaskManager)
集群的服务器之间配置好ssh免密登录,避免后续搭建出现麻烦,这一步一定要做。简单步骤如下:
3.生成密钥和公钥;和在master机器执行ssh-keygen -t rsa
4.把公钥复制到需要免密登录的从服务器上;在master机器执行命令,将密钥拷贝到其余服务器ssh-copy-id -i /root/.ssh/id_rsa.pub 目标服务器IP (报错的化看下图的原因解释)
我们执行ssh-copy-id 报错,说连接被拒,可以进行排错
确认目标主机的SSH服务是否运行:sudo systemctl status ssh,如果未运行执行:sudo systemctl start ssh;

那就是ssh服务没有被安装,需要安装,ubuntu上检查安装状态输入 sudo apt-listfiles | grep openssh-server,
在使用sudo apt-get install openssh-server安装;
检查SSH配置,确保SSH服务的配置文件 /etc/ssh/sshd_config正确,特别是监听的端口设置是Port 22,如果使用了自定义端口,确保该端口正确配置并且未被防火墙阻挡。
可以之后再试ssh-copy-id ;
在继续ssh-copy-id ,输入密码;
验证免密登录。运行以下命令连接到远程服务器:ssh username@remote_server_ip。如果成功连接而无需输入密码,则表示设置成功
要退出ssh连接输入exit就可以了;
4.2.3.2 修改Flink配置,实现基础的集群搭建
修改flink配置文件:
进入Flink的配置文件,conf目录下的flink-conf.yaml 或者cong.yaml: vim conf.yaml
将文件中的jobmanager.rpc.address属性进行修改为JobManager机器也就是主机的ip地址,
jobmanager.rpc.address: 主机ip地址
jobmanager.bind-host: 0.0.0.0
2.修改workers文件
vim conf/workers
workers文件必须包含所有需要启动的TaskManager节点的主机名,且每个主机名占一行。在JobManager服务器,执行以下操作
3. 复制Flink安装文件到其他服务器
其他从服务器不需要下载flink的安装包,必须从主scp到从,因为有路径要求,才能关联起来;
scp -r /usr/local/flink/ root@从服务器1的ip地址:/usr/local/flink/
scp -r /usr/local/flink/ root@从服务器2的ip地址:/usr/local/flink/
4.集群启动
这里只需要在主服务器上,也就是JobManager的服务器上启动Flink,从服务也会跟着启动;
在JobManager节点上进入Flink安装目录,执行以下命令启动Flink集群:
启动完毕后,在集群各服务器上通过jsp命令查看Java进程。若各节点存在以下进程,则说明集群启动成功:
JobManager节点:StandaloneSessionClusterEntrypoint
TaskManager1节点:TaskManagerRunner
TaskManager2节点:TaskManagerRunner
我们输入从机的密码,可以看到提醒我们启动了从机的flink(这里我只弄了一台机器从机),下图是主机的命令窗口,输入jps可以看到作为JobManager的StanaloneSessionClusxxxx启动了
从机命令窗口输入jps可以看到TaskManager启动成功;
尝试提交一个简单任务,如果任务正常执行完毕,则集群一切正常。提交Flink自带的简单任务如下:
./bin/flink run examples/streaming/WordCount.jar
5.查看webUI
通过JobManager节点访问WebUI(http://localhost:8081/),可以看到此时是1个JobManager,1个TaskManager,也能以上执行完毕的任务,如下图:
4.2.4 进阶版集群-Flink Standalone HA搭建
在Flink Standalone模式下,实现HA的方式可以利用ZooKeeper在所有正在运行的JobManager实例之间进行分布式协调,实现多个JobManager无缝切换。Flink Standalone模式的HA架构如图:
简单来说就是 使用 ZooKeeper 来实现JobManager无缝切换,从而达到分布式协调;
注意:Flink内置了Zookeeper服务和相关脚本文件,如果你的集群中没有安装Zookeeper,则可以通过修改zoo.cfg文件配置Flink内置的Zookeeper。生产环境建议使用独立的外部zookeeper;
HA的核心就是:可以在集群中启动多个JobManager,并使它们都向ZooKeeper进行注册,ZooKeeper利用自身的选举机制保证同一时间只有一个JobManager是活动状态(Active)的,其他的都是备用状态(Standby)。当活动状态的JobManager出现故障时,ZooKeeper会从其他备用状态的JobManager选出一个成为活动JobManager。流程见下图:
此外,活动状态的JobManager在工作时会将其元数据(JobGraph、应用程序JAR文件等)写入一个远程持久化存储系统(例如HDFS)中,还会将元数据存储的位置和路径信息写入ZooKeeper存储,以便能够进行故障恢复,如图下图所示:
操作起来!!!!!
首先,我们还是在上面的操作的三个服务器进行操作;
目前是一个jobmanager(Flink003机器),两个taskmanager(Flink001和Flink002机器);
现在我们需要变成两个jobmanager,一个taskmanager;
1.修改masters文件
Flink的masters文件用于配置所有需要启动的JobManager节点以及每个JobManager的WebUI绑定的端口。
进入Flink安装目录,修改conf/masters文件,修改内容如下:
Flink003机器的ip地址:8081
Flink002机器的ip地址:8082
上述配置表示在集群 Flink003机器 和 Flink002机器节点上启动JobManager,并且每个JobManager的WebUI访问端口分别为8081和8082。
2.修改flink-conf.yaml文件设置高可用模式
进入Flink003机器 节点的Flink安装主目录,修改conf/flink-conf.yaml文件,添加以下内容:
# 将高可用模式设置为ZooKeeper,默认集群不会开启高可用状态
high-availability: zookeeper
# ZooKeeper集群主机名(或IP)与端口列表,多个以逗号分隔
high-availability.zookeeper.quorum: centos01:2181,centos02:2181,centos03:2181
# 用于持久化JobManager元数据(JobGraph、应用程序JAR文件等)的HDFS地址,以便进行故障恢复,ZooKeeper上存储的只是元数据所在的位置路径信息
high-availability.storageDir: /data/software/flink-15.4/ha
# 获取storageDir也可用hdfs,如果使用hdfs的话,则需要单独安装hdfs,本文暂不使用
#high-availability.storageDir: hdfs://centos01:9000/flink/recovery
进入centos01节点的Flink安装主目录,修改conf/zoo.cfg文件,添加以下内容,配置ZooKeeper启动节点与选举相关端口:
server.1=centos01:2888:3888
server.2=centos02:2888:3888
server.3=centos03:2888:388
上述配置表示在centos01、centos02和centos03节点上启动ZooKeeper服务,其中1、2、3表示每个ZooKeeper服务器的唯一ID。
scp -r /usr/local/flink/ root@从服务器1的ip地址:/usr/local/flink/
scp -r /usr/local/flink/ root@从服务器2的ip地址:/usr/local/flink/
启动成功后,在每个Flink节点上都会产生一个名为FlinkZooKeeperQuorumPeer的进程,该进程是ZooKeeper服务的守护进程。使用jps可以查看到如下进程:
注意:这里本级需要使用localhost
6. Flink003机器节点上执行以下命令,启动Flink Standalone HA集群:
要先启动zookeeper再启动flink
bin/start-cluster.sh
Flink003主机 jps:
Flink002和Flink001机器 jps:
假如方向访问不了,说明配置不对,重新查看自己的配置,要注意改完Flink003的后要从新scp复制到从服务器上;
8.测试
在提交一个测试,如果能正常执行,说明整个集群正常。
./bin/flink run examples/streaming/WordCount.jar
9.停止集群
若要停止Flink Standalone HA集群,在jobmanager节点上首先执行以下命令停止整个Flink集群,我这里是flink003:
./stop-zookeeper-quorum.sh
4.2.5、Yarn安装模式
完成上述的Flink和jdk的安装后;
进入Flink的conf目录,依据Flink的版本进行修改flink-conf.yaml或者conf.yaml文件
bind-host 就是0.0.0.0
rpc.address指的是从机的ip地址
————————————————
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack