博客 Flink提交流程&如何debug和跟踪流程(on yarn)

Flink提交流程&如何debug和跟踪流程(on yarn)

   数栈君   发表于 2023-02-14 14:05  795  0

4月20日,袋鼠云数栈技术研发团队工程师兰洋(花名:莫问)为大家直播分享《Flink提交流程&如何debug和跟踪流程(on yarn)》。错过直播的朋友可以钉钉扫描文末的二维码,加入钉钉群回看直播,或者在b站搜索“袋鼠云”观看视频。

b站视频网址:

https://www.bilibili.com/video/BV13K4y1P7ri


下面带大家来回顾下本次直播的内容,本次直播莫问大佬主要从以下几个方面来为大家进行分享。



Flink on yarn 作业提交


在我们平常flink的开发中,通常是在ide中开发并使用flink的单机模式进行测试,测试完毕后会将代码打成jar文件然后通过Flink CLI或者Web UI提交作业到flink集群。但是在真正执行我们flink的作业之前,我们是否有考虑过以下几个问题:


  • Flink作业是如何提交到集群的?
  • Flink集群是如何在资源管理集群(yarn、k8s)上启动起来的?
  • Flink到计算资源是如何分配给作业的?
  • Flink作业提交之后是如何运行的?

在本文中,主要介绍flink应用程序如何一步步变成Flink的Graph结构,然后交给Flink集群执行。


Flink部署模式介绍




在了解flink作业的提交之前,我们需要先了解一下flink作业的几种部署模式。


1、Session模式的问题
这种模式会预先在yarn或者或者k8s上启动一个flink集群,然后将任务提交到这个集群上,这种模式,集群中的任务使用相同的资源,

如果某一个任务出现了问题导致整个集群挂掉,那就得重启集群中的所有任务,这样就会给集群造成很大的负面影响。

2、Perjob模式的问题
目前,对于per job模式,jar包的解析、生成JobGraph是在客户端上执行的,然后将生成的jobgraph提交到集群。

很多公司都会有自己的实时计算平台,用户可以使用这些平台提交flink任务,如果任务特别多的话,那么这些生成JobGraph、提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。

此外这种模式提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应 的临时目录,这个也会带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

3、Application模式引入
flink 引入了一个新的部署模式--Application模式。目前 Application 模式支持 Yarn 和 K8s 的部署方式,

Yarn Application 模式会在客户端将运行任务需要的依赖都上传到 Flink Master,然后在 Master 端进行任务的提交。

此外,还支持远程的用户jar包来提交任务,比如可以将jar放到hdfs上,进一步减少上传jar所需的时间,从而减少部署作业的时间。


提交流程概览


1、Flink客户端提交流程

我们的Flink作业在开发完毕后,是需要提交到Flink集群执行。那么CliFrontend的main()方法便是提交作业到Flink集群的入口。其main()方法中主要的逻辑是触发我们打包好的jar中的main方法,然后交给PipelineExecutor#execute方法,最终会实例化一个具体的PipelineExecutor进行执行。但是在Application模式,跟Perjob以及Session模式有所区别,是通过ApplicationDeployer来进行任务的部署。

其过程如下图所示:



2、FlinkStreamSql客户端提交流程

FlinkX以及FlinkStreamSql的提交代码在项目的Launch模块下,下图是它们客户端的提交流程:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0751039e37cdd53b402239b40839e98c..jpg


PipelineExecutor是什么?


PipelineExecutor是Flink Client在生成jobgraph之后,将作业提交给集群的重要环节。就目前Session和Perjob两种模式而言,由于集群的启动时机以及提交作业的方式不同,所以会有两种不同的PipelineExecutor的实现
(AbstractSessionClusterExecutor 以及 AbstractJobClusterExecutor)。


其实现关系如下:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/472c395b7d0c4bd0e331e87e8b651b2f..jpg



Yarn Session提交流程


在这里我们以Yarn Session的提交进行举例。
Yarn Session任务的提交分为三部分:启动集群、作业提交以及作业调度。
下图是Yarn Session的提交示意图:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e7cea5207e4d9f9bd73693e6c88c7799..jpg


1、启动集群

1)使./bin/start-cluster.sh启动flink集群


2)Yarn启动Flink集群,其集群的启动入口是YarnSessionClusterEntryPoint。其启动集群的的步骤可以分为两部分。


第一部分是客户端会将应用配置(即flink-conf.yaml、logback.xml以及log4j.properties)和相关文件(flink jar、配置类文件、用户jar文件以及JobGraph对象等等)上传到分布式存储的应用暂存目录(.flink)。


第二部分是通过YarnClient向Yarn提交Flink创建集群的申请,Yarn分配容器资源,并且在申请的container中初始化并启动Flink JobManager进程,然后会在JobManager进程中运行YarnJobClusterEntryPoint作为集群的启动入口。初始化Dispatcher、ResourceManager以及RestEndPoint服务,并启动相关的RPC服务。


2、作业提交

集群准备好后,FlinkClient会向Dispathcher提交任务的JobGraph。Dispatcher获取到JobGraph后会为该JobGraph创建一个JobMaster,随后由JobMaster来负责作业调度、管理作业和Task的生命周期,并构建ExecutionGraph(JobGraph的并行化版本)。


3、作业调度
其中作业调度的示意图如下图所示:
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/8361475529ecbf46cf02076535f0152d..jpg

1)JobMaster建立与YarnResourceManager的连接,随后由ScheduleNG从SlotPool中获取Slot来调度Task的执行。由于Flink集群刚启动,SlotPool没有足够的Slot资源,且TaskManager也还未启动,所以此时会向YarnResourceManager申请资源。

2)YarnResourceManager中收到请求之后会调用SlotManager来查看是否有空闲的Slot,如果有会将其分配给JobMaster,然后通过ScheduleNG去调度job。否则YarnResourceManager向ClusterManager请求去协调容器资源并启动TaskManager。

3)YarnResourceManager此时会将请求加入到队列中,并周期性的通过RMClient去申请容器资源并启动TaskManager。

4)当容器资源申请到后,YarnResourceManager会从HDFS上加载启动TaskManager所需要的jar等相关文件,随后在容器中启动TaskManager。

5)TaskManager启动之后,会向YarnResourceManager注册,并把Slot资源情况上报给YarnResourceManager中的SlotManager。

6)YarnResourceManager从等待队列中取出Slot请求,向TaskManager确认资源可用并告知TaskManager将Slot分配给了哪个JobMaster。

7)TaskManager向JobMaster提供Slot,此时JobMaster会将Slot添加到SlotPool中。随后在ScheduleNG调度job的时候取出来并将Task调度到TaskManager对应的Slot上。


源码提交类图


在这里把提交的类图贴出来,大家有时间可以根据这个类图去追踪源码。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/b19b5eca540e32eaac4499f2b7561f4c..jpg



Flink远程debug方法


1、参数配置

# jobmanager debug端口env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"# taskmanager debug端口env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"


2、参数说明

  • transport:指定调试数据的传送方式。dt_socket是指用SOCKET模式,另有dt_shmem指用共享内存方式,其中,dt_shmem只适用于windows平台。

  • server:指是否支持在server模式的VM中。

  • suspend: 指定是否在调试客户端建立起来后,再执行JVM。

  • address: socket连接的端口号。


3、注意点

  • 预先设置断点

  • 保持远端代码跟所要调试的代码一致

  • 线程并行调试。不要影响其他线程运行,可在断点出勾选Thread。


疑问解答 ●


Q1


如何通过debug session模式或debug perjob模式获取集群am的ip。

将debug参数
`-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006`中的suspend设置成y,然后在资源管理平台(yarn、k8s等等)的ui界面根据flinkx任务的applicationID查看该任务的集群am所在计算节点的地址即可。
其中集群的applicationId,我们可以在flinkx的提交信息中看到。


Q2


debug参数可以考虑反过来设置吗?我们设置成server。

debug参数

`-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006`
中的server,这个默认就是server模式。


Q3


提交完任务 对任务有做监控么 如果任务挂掉你是怎么知道的呢 需要主动去看么?

flinkx本身在提交完任务后是没有对任务的运行状态做任务监控,需要主动查看。
我们可以通过以下3个地方去查看:
- 资源管理平台(yarn、k8s等等)上该任务的运行状态
- flinkx任务的运行日志。
- 通过访问任务的rest接口去查看任务的状态。


Q4

flink1.12.0升级版本大概在什么时候呢?

大概在6月底的时候。


Q5

有没有计划出一个web呢?

预计下半年开始投入研发。




  • 袋鼠云在大数据领域深耕7年,拥有丰富的大数据平台建设经验和成熟的产品体系,想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

    同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群