4月20日,袋鼠云数栈技术研发团队工程师兰洋(花名:莫问)为大家直播分享《Flink提交流程&如何debug和跟踪流程(on yarn)》。错过直播的朋友可以钉钉扫描文末的二维码,加入钉钉群回看直播,或者在b站搜索“袋鼠云”观看视频。
b站视频网址:
https://www.bilibili.com/video/BV13K4y1P7ri
下面带大家来回顾下本次直播的内容,本次直播莫问大佬主要从以下几个方面来为大家进行分享。
Flink on yarn 作业提交
在我们平常flink的开发中,通常是在ide中开发并使用flink的单机模式进行测试,测试完毕后会将代码打成jar文件然后通过Flink CLI或者Web UI提交作业到flink集群。但是在真正执行我们flink的作业之前,我们是否有考虑过以下几个问题:
Flink部署模式介绍
提交流程概览
1、Flink客户端提交流程
我们的Flink作业在开发完毕后,是需要提交到Flink集群执行。那么CliFrontend的main()方法便是提交作业到Flink集群的入口。其main()方法中主要的逻辑是触发我们打包好的jar中的main方法,然后交给PipelineExecutor#execute方法,最终会实例化一个具体的PipelineExecutor进行执行。但是在Application模式,跟Perjob以及Session模式有所区别,是通过ApplicationDeployer来进行任务的部署。
其过程如下图所示:
2、FlinkStreamSql客户端提交流程
FlinkX以及FlinkStreamSql的提交代码在项目的Launch模块下,下图是它们客户端的提交流程:
PipelineExecutor是什么?
其实现关系如下:
Yarn Session提交流程
1)使用./bin/start-cluster.sh启动flink集群。
2)Yarn启动Flink集群,其集群的启动入口是YarnSessionClusterEntryPoint。其启动集群的的步骤可以分为两部分。
第一部分是客户端会将应用配置(即flink-conf.yaml、logback.xml以及log4j.properties)和相关文件(flink jar、配置类文件、用户jar文件以及JobGraph对象等等)上传到分布式存储的应用暂存目录(.flink)。
第二部分是通过YarnClient向Yarn提交Flink创建集群的申请,Yarn分配容器资源,并且在申请的container中初始化并启动Flink JobManager进程,然后会在JobManager进程中运行YarnJobClusterEntryPoint作为集群的启动入口。初始化Dispatcher、ResourceManager以及RestEndPoint服务,并启动相关的RPC服务。
集群准备好后,FlinkClient会向Dispathcher提交任务的JobGraph。Dispatcher获取到JobGraph后会为该JobGraph创建一个JobMaster,随后由JobMaster来负责作业调度、管理作业和Task的生命周期,并构建ExecutionGraph(JobGraph的并行化版本)。
源码提交类图
Flink远程debug方法
1、参数配置
# jobmanager debug端口
env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
# taskmanager debug端口
env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
2、参数说明
transport:指定调试数据的传送方式。dt_socket是指用SOCKET模式,另有dt_shmem指用共享内存方式,其中,dt_shmem只适用于windows平台。
server:指是否支持在server模式的VM中。
suspend: 指定是否在调试客户端建立起来后,再执行JVM。
address: socket连接的端口号。
3、注意点
预先设置断点
保持远端代码跟所要调试的代码一致
线程并行调试。不要影响其他线程运行,可在断点出勾选Thread。
●疑问解答 ●
Q1
如何通过debug session模式或debug perjob模式获取集群am的ip。
将debug参数
`-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006`中的suspend设置成y,然后在资源管理平台(yarn、k8s等等)的ui界面根据flinkx任务的applicationID查看该任务的集群am所在计算节点的地址即可。
其中集群的applicationId,我们可以在flinkx的提交信息中看到。
Q2
debug参数可以考虑反过来设置吗?我们设置成server。
debug参数
`-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006`
中的server,这个默认就是server模式。
Q3
提交完任务 对任务有做监控么 如果任务挂掉你是怎么知道的呢 需要主动去看么?
flinkx本身在提交完任务后是没有对任务的运行状态做任务监控,需要主动查看。
我们可以通过以下3个地方去查看:
- 资源管理平台(yarn、k8s等等)上该任务的运行状态
- flinkx任务的运行日志。
- 通过访问任务的rest接口去查看任务的状态。
Q4
flink1.12.0升级版本大概在什么时候呢?
大概在6月底的时候。
Q5
有没有计划出一个web呢?
预计下半年开始投入研发。
袋鼠云在大数据领域深耕7年,拥有丰富的大数据平台建设经验和成熟的产品体系,想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack