上图为Flink技术栈的核心组成部分,值得一提的是,Flink分别提供:
①面向流处理的接口(DataStream API)
②面向批处理的接口(DataSet API)
因此,Flink既可以完成流处理,也可以完成批处理。
Flink还支持的拓展库涉及:
①机器学习(FlinkML)
②复杂事件处理(CEP)
③图计算(Gelly)
④分别针对流处理和批处理的Table API
一一一一一
DataStream API可以流程地分析无限数据流,并且可以用Java或者Scala等来实现。开发者需要基于一个叫DataStream的数据结构来开发,这个数据结构用于表示永不停止的分布式数据流。
Flink的分布式特点体现在他能够在成百上千台机器上运行,它将大型的计算任务分成许多小的部分,每个机器执行一部分。
Flink能够自动地确保发生机器故障或者其他错误时计算能够持续进行,或者在修复bug或进行版本升级后有计划地再次执行一次。这种能力使得开发人员不需要担心运行失败。
Flink本质上使用容错性数据流,这使得开发人员可以分析持续生成切用不结束的数据(即流处理);
分析图片:
Source被分为Source1 和 Source2,它们是Source的Operator Subtask
【也就是说Source1和Source2都是Source的 Operator Subtask子任务】。
每一个Operator Subtask都是在不同的线程当中独立执行的;
一个Operator的并行度,就等于Operator Subtask的个数;
下图Source的并行度为2,而一个Stream的并行度就等于它生成的Operator的并行度;
数据在两个 operator 之间传递的时候有两种模式:
①One to One 模式:两个operator用此模式传递的时候,会保持数据的分区数和数据的排序;
如下图中的Source1到IMap1,它就保留的Source的分区特性,以及分区元素处理的有序性。
②Redistributing重新分配模式:这种模式会改变数据的分区数;
每一个Operator subtask【如下图的Source1 和 Source2】会根据选择Transformation把数据
发送到不同目标subtasks, 比如keyBy()会通过hashcode重新分区,broadcase()和rebalance()方法会随机重新分区;
当Flink执行executor会自动根据程序代码生成DAG数据流图;
ActorSystem创建Actor将数据流图发送给JobManager的Actor;
Jobmanager会不断接受TaskManager的心跳消息,从而可以获取到有效的TaskManager;
JobManager通过调度器在TaskManager中调度执行Task(在Flink中,最小的调度单元就是task,对应就是一个线程);
在程序运行过程中,task和task之间是可以进行数据传输的;
Job Client【就是上图的Flink程序】:
主要职责是提交任务,提交后可以结束进程,也可以等待结果返回;
Job Client 不是Flink程序执行的内部部分,但它是任务执行的起点;
Job Client负责接收用户的程序代码,然后创建数据流,将数据流提交给Job Manager以便进一步执行。执行完成后,Job Client将结果返回给用户;
JobManager:
主要职责是调度工作并协调任务做检查点;
集群中至少要有一个master,master负责调度task,协调checkpoints和容错;
高可用设置的话可以有多个master,但是要保证一个是leader,其他是standby;
JobManager包含Actor System、Scheduler调度器、CheckPoint协调器 三个重要的组件;
JobManager从客户端【上图的Flink程序】接收到任务后,首先生成优化过的执行计划,再调度到TaskManager中执行;
TaskManager
主要职责是从JobManager处接收任务,并部署和启动任务,接收上游的数据并处理;
TaskManager是在JVM中的一个或多个线程中执行任务的工作节点;
TaskManager在创建之初就设置好了Slot【槽】,每个Slot可以执行一个任务;
2.4.1任务槽和槽共享
每个TaskManager是一个JVM的进程,可以在不同的线程中执行一个或多个子任务。为了控制一个worker能接收多少个task。worker通过task slot【任务槽】来进行控制(一个worker至少有一个task slot)
任务槽:
每个task slot表示TaskManager拥有资源的一个固定大小的子集;
flink将进程的内存进行了划分到多个slot中;
上图中有两个taskManager,每个TaskManager有三个slot,每个slot占1/3的内存;
内存被划分到不同的slot之后可以获得如下好处:
①TaskManager最多能同时并发执行的任务是可以控制的,那就是3个,因为不能超过slot的数量;
②slot有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响;
槽共享:
只需计算Job中最高并行度(parallelism)的task slot,只要这个满足,其他的job也都能满足;
资源分配更加公平,如果有比较空闲的slot可以将更多的任务分配给它。图中若没有任务槽共享,负载不高的Source/Map 等subtask将会占据许多资源,而负载较高的窗口subtask则会缺乏资源;
有了任务槽任务,可以将基本并行度(base parallelism)从2提升到6,提高了分槽资源的利用率。同时还可以保障TaskManager给subtask的分配的slot方案更加公平;
org.apache.flink
flink-clients
1.17.0
org.apache.flink
flink-streaming-java
1.17.0
查找到数据源:
思路:流处理,所以是一行一行的读取,然后按照空格切分,然后再分组统计;
DataSet API 批处理 (过时了)
package com.flink17.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* DataSet API 实现wordcount
*
* @author lc
* @version 1.0
* @date 2024/10/8 0008 16:27
*/
public class WordCountStreamDemoMain{
public static void main(String[] args) throws Exception {
// 1.创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2.读取数据(这里是文本数据)
DataSource lineDS = env.readTextFile("input/words.txt");
// 3.切分、转换(word,1)的格式(第一个参数 word表示的是单词,第二个参数 1是指出现的次数)
// flatMap的方法中的FlatMapFunction 的第一个参数是输入,第二个参数是输出
FlatMapOperator> wordAndOne
= lineDS.flatMap(new FlatMapFunction>() {
// 重写 flatMap ,第一个参数是需要操作的数据,第二个参数Collector是采集器
@Override
public void flatMap(String s, Collector> collector) throws Exception {
//3.1 按照空格切割
String[] words = s.split(" ");
//3.2 将单词转换成二元组(word,1)这样的格式
for (String word : words) {
Tuple2 wordTuple2 = Tuple2.of(word, 1);
// 3.3 使用Collector向下游发送数据
collector.collect(wordTuple2);
}
}
});
// 4.按照word分组
UnsortedGrouping> wordAndOneGroupBy = wordAndOne.groupBy(0);
// 5.各分组内聚合
AggregateOperator> sum =
wordAndOneGroupBy.sum(1); // 这里的1 是位置,表示第二个元素
// 6.输出
sum.print();
}
}
DataSet写法实现批处理是过时的,推荐使用DataStream来写;
package com.flink17.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* DataStream流处理 实现wordcount:(读文件,有界流)
*
* @author lc
* @version 1.0
* @date 2024/10/8 0008 16:53
*/
public class WordCountStreamDemoMain {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.读取数据
DataStreamSource lineDs = evn.readTextFile("input/words.txt");
// 3.处理数据:切分、转换、分组、聚合
SingleOutputStreamOperator> wordAndOne = lineDs.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" "); // 空格切分
for (String word : words) {
// 转换成二元组
Tuple2 wordsAndOne = Tuple2.of(word, 1);
// 使用Collector向下游发送数据
out.collect(wordsAndOne);
}
}
});
// 分组
KeyedStream, String> ks = wordAndOne.keyBy(
// 第一个参数书数据的类型,第二个参数是key的类型
new KeySelector, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
}
);
SingleOutputStreamOperator> sum = ks.sum(1);// 这里的1 是位置,表示第二个元素
// 4.输出数据
sum.print();
// 5.执行
evn.execute();
}
}
执行结果
流处理:拿一条处理一条;所以是一行一行的读取;且有状态,比如hello最后为(hello,3)3是有状态的计算;
前面的编号就是并行度编号,也就是线程数编号;
批处理:一口气处理一批(这里就是整个文本)
实现wordcount 案例
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续地处理捕获的数据;
模拟场景:监听socket端口,然后向该端口不断的发送数据
将StreamWorldCount代码中读取文件数据的readTextFile方法替换成socket文本流的方法socketTextStram;
package com.flink17.demo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author lc
* @version 1.0
* @date 2024/10/8 0008 17:17
*/
public class WordCountStreamUnboundDemoMain {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.读取数据:socket,第一个参数书服务器名称,第二个参数是端口号
DataStreamSource socketDS = env.socketTextStream("hadoop102", 7777);
//3.处理数据:切分,转换,分组,聚合,输出
socketDS
.flatMap(
(String value, Collector> out) -> {
String[] words = value.split(" "); // 空格切分
for (String word : words) {
// 转换成二元组
Tuple2 wordsAndOne = Tuple2.of(word, 1);
// 使用Collector向下游发送数据
out.collect(wordsAndOne);
}
}
)
.returns(Types.TUPLE(Types.STRING,Types.INT)) //需要执行返回类型,(word,1)
.keyBy(value -> value.f0)
.sum(1)
.print(); // 输出
//4.执行
env.execute();
}
}
不指定返回类型的化,执行会报错;因为Flink具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应序列化器和反序列化器,但是,由于java中泛型查出的存在所以报错了;
执行,就会一直监听那个服务器,有输入才会有响应的输出;
有界流是有结束的,无界留是没有结束的
————————————————
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack