博客 大数据Flink进阶(十五):Flink On Yarn任务提交

大数据Flink进阶(十五):Flink On Yarn任务提交

   数栈君   发表于 2023-05-08 15:32  293  0

Local模式:通过一个JVM进程中,通过线程模拟出各个Flink角色来得到Flink环境

Standalone模式:各个角色是独立的进程存在

YARN模式:Flink的各个角色,均运行在多个YARN的容器内,其整体上是一个YARN的任务

flink on yarn的前提是:hdfs、yarn均启动

在企业实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:

  • Yarn的资源可以按需使用,提高集群的资源利用率

  • Yarn的任务有优先级,根据优先级运行作业

  • 基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

  • JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控

  • 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器

  • 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager



1 准备工作




jdk1.8及以上【配置JAVA_HOME环境变量】

ssh免密码登录【集群内节点之间免密登录】

至少hadoop2.2

hdfs & yarn均启动

文末阅读原文,无须下载直接在线学习~



2 集群规划




服务器: node1(ResourceManager+ NodeManager)

服务器: node2(NodeManager)

服务器: node3(NodeManager)



3 修改hadoop的配置参数




1. 打开yarn配置页面(每台hadoop节点都需要修改

vim etc/hadoop/yarn-site.xml

添加

<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

在这里面我们需要关闭,因为对于flink使用yarn模式下,很容易内存超标,这个时候yarn会自动杀掉job

2. 分发yarn-site.xml到其它服务器节点

scp yarn-site.xml node2:$PWD
scp yarn-site.xml node3:$PWD

3. 启动HDFS、YARN集群

start-all.sh



4 Flink on Yarn的运行机制




http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9f8f8dd80faee2ab5ad212e0269fa570..jpg

从图中可以看出,Yarn的客户端需要获取hadoop的配置信息,连接Yarn的ResourceManager。所以要有设置有 YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_CONF_PATH,只要设置了其中一个环境变量,就会被读取。如果读取上述的变量失败了,那么将会选择hadoop_home的环境变量,都区成功将会尝试加载$HADOOP_HOME/etc/hadoop的配置文件。

当启动一个Flink Yarn会话时,客户端首先会检查本次请求的资源是否足够。资源足够将会上传包含HDFS配置信息和Flink的jar包到HDFS。

随后客户端会向Yarn发起请求,启动applicationMaster,随后NodeManager将会加载有配置信息和jar包,一旦完成,ApplicationMaster(AM)便启动。

当JobManager and AM 成功启动时,他们都属于同一个container,从而AM就能检索到JobManager的地址。此时会生成新的Flink配置信息以便TaskManagers能够连接到JobManager。同时,AM也提供Flink的WEB接口。用户可并行执行多个Flink会话。

随后,AM将会开始为分发从HDFS中下载的jar以及配置文件的container给TaskMangers.完成后Fink就完全启动并等待接收提交的job。



5 Flink on Yarn的三种部署方式介绍




1 Session模式

这种模式会预先在yarn或者或者k8s上启动一个flink集群,然后将任务提交到这个集群上,这种模式,集群中的任务使用相同的资源,如果某一个任务出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8a9ea33bc9e95b33861640826089f462..jpg


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/53d69af6d830ab2d4327a78cc4a58d9c..jpg

特点:需要事先申请资源,使用Flink中的yarn-session(yarn客户端),启动JobManager和TaskManger

优点:不需要每次递交作业申请资源,而是使用已经申请好的资源,从而提高执行效率

缺点:作业执行完成以后,资源不会被释放,因此一直会占用系统资源

应用场景:适合作业递交比较频繁的场景,小作业比较多的场景

2 Per-Job模式

考虑到集群的资源隔离情况,一般生产上的任务都会选择per job模式,也就是每个任务启动一个flink集群,各个集群之间独立运行,互不影响,且每个集群可以设置独立的配置。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ebe0db6eff2eede71897851c9af93412..jpg


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/00c757dee6199ff7dc8054ccf569e1af..jpg

特点:每次递交作业都需要申请一次资源

优点:作业运行完成,资源会立刻被释放,不会一直占用系统资源

缺点:每次递交作业都需要申请资源,会影响执行效率,因为申请资源需要消耗时间

应用场景:适合作业比较少的场景、大作业的场景

3 application模式

3.1 背景

flink-1.11 引入了一种新的部署模式,即 Application 模式。目前,flink-1.11 已经可以支持基于 Yarn 和 Kubernetes 的 Application 模式。

3.2 优势

Session模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行

Per-Job模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行

通过以上两种模式的特点描述,可以看出,main方法都是在客户端执行,社区考虑到在客户端执行 main() 方法来获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。尤其在大量用户共享客户端时,问题更加突出。

此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

因此,社区提出新的部署方式 Application 模式解决该问题。

3.3 原理

Application 模式下,用户程序的 main 方法将在集群中而不是客户端运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。

Application 模式为每个提交的应用程序创建一个集群,该集群可以看作是在特定应用程序的作业之间共享的会话集群,并在应用程序完成时终止。

在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。



6 Flink on Yarn的三种部署方式使用说明




1. 第一种方式:YARN session

1. yarn-session.sh(开辟资源)+flink run(提交任务)

这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和Task-managers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)

通过./bin/yarn-session.sh脚本启动YARN Session

脚本可以携带的参数:

-n(--container):TaskManager的数量。(1.10 已经废弃)

-s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。

-jm:JobManager的内存(单位MB)。

-q:显示可用的YARN资源(内存,内核);

-tm:每个TaskManager容器的内存(默认值:MB)

-nm:yarn 的appName(现在yarn的ui上的名字)。  

-d:后台执行。

注意:

如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d--detached

确定TaskManager数:

Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为10,每个TaskManager有两个任务槽的作业,就会启动5个TaskManager。

2. 启动:

bin/yarn-session.sh -tm 1024  -s 4 -d

上面的命令的意思是,每个 TaskManager 拥有4个 Task Slot(-s 4),并且被创建的每个 TaskManager 所在的YARN Container 申请 1024M  的内存,同时额外申请一个Container用以运行ApplicationMaster以及Job Manager。

TM的数量取决于并行度,如下图:

执行:bin/flink run -p 8 examples/batch/WordCount.jar

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0ebfe65c32bb904fbb772dd95493084b..jpg

3. 启动成功之后,控制台显示:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c6cb93a3b8c1ebcb8807dfc1b9182cd4..jpg

4. 去yarn页面:ip:8088可以查看当前提交的flink session

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a0b89985bf87fdb386fe20ef07528961..jpg

5. 然后使用flink提交任务

bin/flink run examples/batch/WordCount.jar

在控制台中可以看到wordCount.jar计算出来的任务结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b607fa8624739f27148248e9faf53dc0..jpg

6. 在yarn-session.sh提交后的任务页面中也可以观察到当前提交的任务:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/bf62a95db50a3b1aeb6280a0aa777737..jpg

7. 点击查看任务细节:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/afd172661701a44cc41783b10447002a..jpg

8. 停止当前任务:

yarn application -kill  application_1527077715040_0007

2 第二种方式:在YARN上运行一个Flink作业

上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业,这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

使用flink直接提交任务

bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

常用参数:

--p 程序默认并行度

下面的参数仅可用于 -m yarn-cluster 模式

--yjm JobManager可用内存,单位兆

--ynm YARN程序的名称

--yq 查询YARN可用的资源

--yqu 指定YARN队列是哪一个

--ys 每个TM会有多少个Slot

--ytm 每个TM所在的Container可申请多少内存,单位兆

--yD 动态指定Flink参数

--yd 分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)

在8088页面观察:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/d8b7abe7a647bd71c9d0ba016a672e47..jpg

停止yarn-cluster

yarn application -kill application的ID

注意:

在创建集群的时候,集群的配置参数就写好了,但是往往因为业务需要,要更改一些配置参数,这个时候可以不必因为一个实例的提交而修改conf/flink-conf.yaml;

可以通过:-yD <arg>                        Dynamic properties

来覆盖原有的配置信息:比如:

bin/flink run -m yarn-cluster -yD fs.overwrite-files=true examples/batch/WordCount.jar
-yD fs.overwrite-files=true -yD taskmanager.network.numberOfBuffers=16368



7 注意




如果使用的是flink on yarn方式,想切换回standalone模式的话,需要删除文件:【/tmp/.yarn-properties-root】

因为默认查找当前yarn集群中已有的yarn-session信息中的jobmanager

如果是分离模式运行的YARN JOB后,其运行完成会自动删除这个文件

但是会话模式的话,如果是kill掉任务,其不会执行自动删除这个文件的步骤,所以需要我们手动删除这个文件。

免责申明:

本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群