博客 数栈技术分享:一文带你了解Flink jm、tm启动过程和资源分配

数栈技术分享:一文带你了解Flink jm、tm启动过程和资源分配

   小美   发表于 2023-01-19 11:38  600  0

一、JM启动过程

1、从日志角度分析启动流程

1)client生成jobGraph

详情请参考:
https://www.bilibili.com/video/BV13K4y1P7ri

2)Yarn RM接收到请求(和yarn交互不重点分析)

3)在被分配的节点上的工作目录下启动launch_container.sh

4)在perJob模式下,最终调用的是YarnJobClusterEntrypoint

5)初始化相关运行环境,打印软件版本、运行环境、命令行参数、classpath 等信息

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/2318422b5ab44cfadb923ac3e2eaf88f..png
6)加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等)

7)启动Flink资源管理核心组件ResourceManager(包含 YarnResourceManager 和 SlotManager 两个子组件)

8)启动Dispatcher加载JobGraph 文件、并启动JobManager

9)JobManager开始执行ExecutionGraph,向 ResourceManager申请资源

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/ab1e9067719f449fdbc4bce213c4db7c..png
10)Flink ResourceManager 接收到新分配的 Container 资源后,准备好 TaskManager 启动上下文

11)TaskManager 进程加载并运行 YarnTaskExecutorRunner(Flink TaskManager入口类),初始化流程完成后启动 TaskExecutor(负责执行Task相关操作)

12)TaskExecutor向ResourceManager注册,向SlotManager汇报自己的 Slot 资源与状态

13)JobManager向TaskExecutor提交task,TaskExecutor启动新的线程运行Task

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/1afdc7347fbd8e242b95fe8f41bc1bf3..png
2、整体流程分析

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/0c420dc439011dde28a8b1c3ea973667..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/10bb85892d61f4f28f3d2f3cc851e9ab..png

1)输出各软件版本及运行环境信息、命令行参数项、classpath等信息
2)注册处理各种SIGNAL的handler:记录到日志
3)注册JVM关闭保障的shutdown hook:避免JVM退出时被其他shutdown hook阻塞
4)打印YARN运行环境信息:用户名
5)从运行目录中加载flink conf

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/955ab419655ca419060b444aed48e40f..png

3、AM启动过程

1)创建并启动各类内部服务(包括RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等)

2)将RPC address和port更新到flink conf配置

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/a3cc05166aa2bb500cbe5d3bd0c9f843..png

3)创建并启动resourceManager对象(Flink资源管理核心组件,包含YarnResourceManager和SlotManager两个子组件,YarnResourceManager负责外部资源管理,与YARN RM建立通信并保持心跳,申请或释放TaskManager资源,注销应用等;SlotManager则负责内部资源管理,维护全部Slot信息和状态)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/e6572726a177645fc5acf6fc216544ed..png

4)创建并启动dispatcher(负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的 JobManager)及相关服务(包括REST endpoint等)并加载JobGraph。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/f591140e7c433a54abe70aa350f1e86a..png

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/e6572726a177645fc5acf6fc216544ed..png

二、JM资源分配

JobManager开始执行ExecutionGraph,向ResourceManager申请资源。

ResourceManager将资源请求加入等待请求队列,并通过心跳向YARN RM申请新的Container资源来启动TaskManager进程。

后续流程如果有空闲Slot资源,SlotManager将其分配给等待请求队列中匹配的请求,不用再通过YarnResourceManager申请新的TaskManager。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/bb1875863126d97c117951733c0d50bc..png

 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/636713b5fdeeb74c2faccae3217a91a5..png

Flink ResourceManager接收到新分配的Container资源后,准备好TaskManager启动上下文(ContainerLauncherContext,生成TaskManager配置并上传至分布式存储,配置其他依赖和环境变量等)。

然后向YARN NM申请启动TaskManager进程,YARN NM启动Container的流程与AM Container启动流程基本类似。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/e297dbcf0aa379d1039b4dcfbb86e8b7..png

三、TM启动过程

输出各软件版本及运行环境信息、命令行参数项、classpath等信息

注册处理各种SIGNAL的handler:记录到日志

注册JVM关闭保障的shutdown hook:避免JVM退出时被其他shutdown hook阻塞

加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry等)

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/3bd76ebf493e900db38da8928f195481..png

启动tm后就可以通过RPC接收远程调用,submitTask就是接收任务的服务。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/665c0ac6d1f22d04b79064b9a15155bd..png

回到在JM端启动scheduler后,就开始调度Execution,在Execution的deploy()方法中通过rpc调用TM的submitTask接口。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/de013bc25755ea6b38ccdd4d86b842ae..png

交互流程图如下:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/533e30f4d79b321221c654df4b72a633..png

当submitTask收到请求后加载jobInformation和taskInformation文件,初始化jobInformation和taskInformation,然后构造Task,启动Task线程,最终调用AbstractInvokable.invoke方法。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/39b567f86ea7c4ff6986d8e243f8879c..png

  • invokable.invoke( )将根据nameOfInvokableClass的不同调度不同的任务,包括批任务、Source任务、Sink任务、流任务
  • DataSourceTask:Kafka Source
  • StreamTask:中间算子
  • DataSinkTask:Kafka Sink

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/94eec206dc6cc9719e0cb4ebb2384e03..png

这里以StreamTask例分析

  • 初始化、run、close
  • 初始化:创建状态后端、operator配置、特殊task初始化、恢复算子的状态、richfunction open
  • run:执行task,处理record并发往下游
  • close:关闭和清理操作

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/1e28f559f8a7b850205520a72cc70b63..png

这里以flinkX中的代码为例:

会被invoke()中的initialize-operator-states()执行并调用到DtInputFormatSourceFunction的initializeState方法恢复状态。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/12b38f2689d84a39f67f092629fe850b..png

这里以flinkX中的代码为例:

会被invoke()中的open-operators()执行并调用到DtInputFormatSourceFunction的open方法恢复状态做一些初始化工作。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/ea2dcd81daf9cbe5616769ca6977ef30..png

这里以flinkX中的代码为例:

会被invoke()中的run()执行并调用到DtInputFormatSourceFunction的run读取数据并往下游发送。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user6/article/f0940c318d10659dac013417918c0a99..png
经过上面分析,任务已经启动,并等待数据流动。

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群