package com.bigdata;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class _01WorkCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");
// 首先先对字符串进行切割,形成一个新的数组
SingleOutputStreamOperator<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] arr = line.split(" ");
for (String word : arr) {
collector.collect(word);
}
}
});
// 将切割好的字符串形成 (word,1)的二元组的形式
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 聚合
DataStream<Tuple2<String, Integer>> sumResult = map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
// 此处的1 指的是元组的第二个元素,进行相加的意思
}).sum(1);
sumResult.print();
env.execute();
}
}
package com.bigdata;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class _02WorkCount {
/**
*
* 简化版案例
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink")
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] arr = line.split(" ");
for (String word : arr) {
collector.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
// 此处的1 指的是元组的第二个元素,进行相加的意思
}).sum(1).print();
env.execute();
}
}
不过在使用lambda的时候,需要在后面指定一个方法的返回值,要不然会报错
package com.bigdata;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class _03WorkCount_lambda {
/**
* lambda 表达式简化版
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 使用lambda简化的时候,需要指定返回值类型,不指定的话会报错
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink")
.flatMap((String line, Collector<String> collector) -> {
String[] arr = line.split(" ");
for (String word : arr) {
collector.collect(word);
}
}).returns(Types.STRING).map((String word) -> Tuple2.of(word, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0).sum(1).print();
// 此处的1 指的是元组的第二个元素,进行相加的意思
env.execute();
}
}
复习lambda表达式:
·lambda可以用来简化匿名内部类的书写
·lambda只能简化函数式接口(有且仅有一个方法的接口)的匿名内部类的书写
省略规则:
·只拿小括号里面的 加上 -> 指向大括号
·只有一个参数的时候,参数类型可以省略
·如果方法体中的代码只有一行,大括号和return等都可以省略(但是需要同时省略)
没省略之前的 (第一版)
省略后(第三版)
package com.bigdata;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class _04WorkCount_zidingyipass {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个是 自动 ,根据流的性质,决定是批处理还是流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 批处理流, 一口气把数据算出来
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 将任务的并行度设置为2
// env.setParallelism(2);
// 通过args传参
DataStreamSource<String> dataStream01 = null;
if (args.length == 0){
dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");
}else {
String input = args[0];
dataStream01 = env.readTextFile(input);
}
// 首先先对字符串进行切割,形成一个新的数组
SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = dataStream01
.flatMap((String line, Collector<String> collector) -> {
String[] arr = line.split(" ");
for (String word : arr) {
collector.collect(word);
}
}).map((String word) -> Tuple2.of(word, 1)
).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0
// 此处的1 指的是元组的第二个元素,进行相加的意思
).sum(1);
if (args.length == 0){
sumResult.print();
}else {
String output = args[1];
sumResult.writeAsText(output, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
}
env.execute();
}
}
打包后执行结果如下:
package com.bigdata;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class _05WorkCount_zidingyipass_input {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个是 自动 ,根据流的性质,决定是批处理还是流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 批处理流, 一口气把数据算出来
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 将任务的并行度设置为2
// env.setParallelism(2);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String input = "";
String output = "";
if (parameterTool.has("output") && parameterTool.has("input")) {
input = parameterTool.get("input");
output = parameterTool.get("output");
} else {
output = "hdfs://bigdata01:9820/home/wordcount/output";
}
// 通过args传参
DataStreamSource<String> dataStream01 = null;
if (args.length == 0){
dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kafka hadoop flink");
}else {
dataStream01 = env.readTextFile(input);
}
// 首先先对字符串进行切割,形成一个新的数组
SingleOutputStreamOperator<Tuple2<String, Integer>> sumResult = dataStream01
.flatMap((String line, Collector<String> collector) -> {
String[] arr = line.split(" ");
for (String word : arr) {
collector.collect(word);
}
}).returns(Types.STRING).map((String word) -> Tuple2.of(word, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy((Tuple2<String, Integer> tuple2) -> tuple2.f0
// 此处的1 指的是元组的第二个元素,进行相加的意思
).sum(1);
if (args.length == 0){
sumResult.print();
}else {
sumResult.writeAsText(output, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
}
env.execute();
}
}
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack