Flink 区别与传统数据处理框架的特性如下:
除了上述这些特性之外,Flink 还是一个非常易于开发的框架,因为它拥有易于使用的分层 API,整体 API 分层如图:
大多数应用并不需要上述的底层抽象,而是直接针对核心 API(Core APIs) 进行编程,比如 DataStream API(用于处理有界或无界流数据)以及 DataSet API(用于处理有界
数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。
总共有四代:
1. spark:spark生态中是把所有的计算都当做批处理,
spark streaming中流处理本质上也是批处理(micro batch);
2. flink:flink中是把批处理(有界数据集的处理)看成是一个特殊的流处理场景;
flink中所有计算都是流式计算;
客户端(Client)、作业管理器(JobManager)和任务管理器(TaskManager)。
我们的代码,实际上是由客户端获取并做转换,之后提交给JobManger 的。所以 JobManager 就是 Flink 集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的 TaskManager。这里的 TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的
Flink程序由JobClient进行提交
JobClient将作业提交给JobManager
JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager
TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成。
作业执行完成后,结果将发送回客户端(JobClient)
link支持的runtime(core 分布式流计算)支持的是无界数据流,
但是对flink来说可以支持批处理,
只是从数据流上来说把有界数据流只是无界数据流的一个特例,
无界数据流只要添加上边界就是有界数据流。
Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行
Flink 安装部署的学习时,需要准备 3 台 Linux 机器。具体要求如下:
#1. 安装jdk
rpm -ivh jdk-8u171-linux-x64.rpm
#2.搜索默认安装位置
find / -name "java"
#3.配置环境变量
vi /etc/profile
#4.在文末加上配置
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64/
export PATH=$PATH:$JAVA_HOME/bin
#5.加载配置生效
source /etc/profile
#6.测试环境变量
java -version
#1.解压
tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz
#2.改名
mv flink-1.7.2 flink
#3.赋予权限
chown -R root:root flink
bin/start-scala-shell.sh local
benv.readTextFile("/home/user/apps/test/words.txt").flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1).print()
单节点的flink集群
systemctl stop firewalld
bin/start-cluster.sh
[root@node1 flink]# jps
#四行代码要写在一排,其中要提前准备words.txt,WordCount.jar 自带的,out不用准备
/home/user/apps/flink/bin/flink run
/home/user/apps/flink/examples/batch/WordCount.jar
--input /home/user/apps/test/words.txt
--output /home/user/apps/test/out
rm -rf /tmp/.yarn-properties-root
bin/stop-cluster.sh
1. 192.168.43.129(master+Slave)
1. 192.168.43.130(Slave)
1. 192.168.43.131(Slave)
[root@node1 conf]# vim flink-conf.yaml
jobmanager.rpc.address: 192.168.43.129
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024
taskmanager.heap.size: 1024
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8081
taskmanager.tmp.dirs: /home/user/apps/flink/tmp
#页面提交
web.submit.enable: true
192.168.43.129:8081
#如果自己的ip没有命名,可以这样,如下:
192.168.43.129
192.168.43.130
192.168.43.131
scp -r /home/user/apps/flink 192.168.43.130:/home/user/apps/flink
scp -r /home/user/apps/flink 192.168.43.131:/home/user/apps/flink
scp -r /etc/profile 192.168.43.130:/etc/profile
scp -r /etc/profile 192.168.43.131:/etc/profile
bin/start-cluster.sh 停止 bin/stop-cluster.sh
bin/jobmanager.sh start/stop
bin/taskmanager.sh start/stop
注意:使用的数据文件是hdfs上,不能是本地文件路径,因为会找不到文件。
hdfs集群部署的方法,参考我的博客:http://t.csdn.cn/ZLLD9
访问:http://192.168.43.129:50070/explorer.html
创建目录:word
#1. 查看hdfs文件系统目录文件
hdfs dfs -ls /wordcount
#2.上传:hdfs dfs -put 本地文件目录 HDFS文件目录
hdfs dfs -put /usr/apps/word/words.txt /wordcount
#3.删除文件hdfs dfs -rm -r HDFS文件路径
hdfs dfs -rm -r /wordcount
/home/user/apps/flink/bin/flink run
/home/user/apps/flink/examples/batch/WordCount.jar
--input hdfs://node1:8020/wordcount/words.txt
--output hdfs://node1:8020/wordcount/output/result.txt --parallelism 2
#hadoop_conf_dir
export hadoop_conf_dir=/usr/apps/hadoop/etc/hadoop
bin/stop-cluster.sh
#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
#使用zookeeper搭建高可用
high-availability: zookeeper
# 存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node1:8020/flink/ha/
# 配置ZK集群地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
# 默认是 open,如果 zookeeper security 启用了更改成 creator
high-availability.zookeeper.client.acl: open
# 设置savepoints 的默认目标目录(可选)
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# 用于启用/禁用增量 checkpoints 的标志
# state.backend.incremental: false
[root@node1 conf]# vim masters
node1:8081
node2:8081
scp -r /home/user/apps/flink/conf/masters node2:/home/user/apps/flink/conf/
scp -r /home/user/apps/flink/conf/masters node3:/home/user/apps/flink/conf/
scp -r /home/user/apps/flink/conf/flink-conf.yaml node2:/home/user/apps/flink/conf/
scp -r /home/user/apps/flink/conf/flink-conf.yaml node3:/home/user/apps/flink/conf/
jobmanager.rpc.address: node2
#1.启动hadoop
/usr/apps/hadoop/sbin/start-dfs.sh
#2.启动Zookeeper
/usr/apps/zookeeper/bin/zkServer.sh start
#查看zookeeper是否启动:/usr/apps/zookeeper/bin/zkServer.sh status
#3.启动Flink
bin/start-cluster.sh
#查看flink是否启动 jps
#1.在windows系统中打开 C:\Windows\System32\drivers\etc\hosts
# 文件并添加主机配置。我添加的是自己的虚拟主机ip跟主机名。
192.168.43.129 node1
192.168.43.130 node2
192.168.43.131 node3
[root@node1 flink]# jps
86913 QuorumPeerMain
91139 Jps
87177 NameNode
88442 StandaloneSessionClusterEntrypoint
87326 DataNode
88958 TaskManagerRunner
[root@node1 flink]# kill -9 88442
独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。在强大的 YARN 平台上 Flink 是如何集成部署的。整体来说,YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。
yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处。
(hadoop集群部署:http://t.csdn.cn/ZLLD9)
<!-- 设置不检查虚拟内存的值,不然内存不够会报错 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
在yarn上启动一个Flink主要有两种方式:
#1.启动hadoop
/usr/apps/hadoop/sbin/start-all.sh
#2.启动Zookeeper(因为flink里面有zookeeper的配置,不打卡会报错:Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster)
/usr/apps/zookeeper/bin/zkServer.sh start
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
# -n 表示申请2个容器,这里指的就是多少个taskmanager(虽然写的是2但是真实申请的是3个)
# -s 表示每个TaskManager的slots数量
# -tm 表示每个TaskManager的内存大小
# -d 表示以后台程序方式运行,分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,
# 即使关掉当前对话窗口,YARN session 也可以后台运行。
#-qu(--queue):指定 YARN 队列名。
从图中可以看到我们创建的 Yarn-Session 实际上是一个 Yarn 的Application,并且有唯一的 Application ID。
Usage:
Required
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
as typing Ctrl + C.
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。
/home/user/apps/flink/bin/flink run /home/user/apps/flink/examples/batch/WordCount.jar
yarn application -kill appid
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-sae,--shutdownOnAttachedExit If the job is submitted in attached
mode, perform a best-effort cluster
shutdown when the CLI is terminated
abruptly, e.g., in response to a user
interrupt, such as typing Ctrl + C.
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/user/apps/flink/examples/batch/WordCount.jar
# -m yarn-cluster 表示使用Job分离模式
# -yjm 指定jobmanager内存
# -ytm 指定taskmanager内存
# -yn 指定taskmanager数量
# -ys 指定每个taskmanager的slot数量
提交任务之后会在yarn集群按照我们的配置初始化一个flink集群,运行我们提交的作业,作业执行完成之后就释放资源关闭掉flink集群,把资源还给yarn集群。
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 268435456
taskmanager.network.memory.max: 4294967296
容器化部署是如今业界流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对
应用进行管理和运维。容器管理工具中最为流行的就是 Kubernetes(k8s),而 Flink 也在最近
的版本中支持了 k8s 部署模式。基本原理与 YARN 是类似的,具体配置可以参见官网说明
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack