流处理:对数据进行实时处理的方式,数据会以流的形式不断地产生和处理。
流处理可以快速响应数据的变化,及时地进行数据处理和分析,适用于需要实时处理数据的场景。
例如:实时数仓、实时监控、实时推荐等等。
批处理:对数据进行离线处理的方式,数据会按照一定的时间间隔或者数据量进行批量处理。
批处理可以对大量数据进行高效处理和分析,适用于需要对历史数据进行分析和挖掘的场景。
例如:离线数仓、批量报表、离线推荐等等。
流处理和批处理都是常用的数据处理方式,它们各有优劣。流处理通常用于需要实时响应的场景,如在线监控和警报系统等。而批处理则通常用于离线数据分析和挖掘等大规模数据处理场景。选择合适的处理方式取决于具体的业务需求和数据处理场景。
以前很多系统的架构都是采用的 Lambda 架构,它将所有的数据分成了三个层次:批处理层、服务层和速率层,每个层次都有自己的功能和目的。
这种架构,需要一套流处理平台和一套批处理平台,这就可能导致了一些问题:
因此,流批一体诞生了!
流批一体的技术理念最早是 2015 年提出的,初衷就是让开发能用同一套代码和 API 实现流计算和批计算,但是那时候实际落地的就少之又少,阿里巴巴在 2020 年双十一首次实际落地。
Flink 流批一体架构:
接下来分别介绍这几个主流的流处理框架
★基于 Kafka 的一个轻量级流式计算框架,我们可以使用它从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。
”
工作原理:读取数据流 -> 数据转换/时间窗口处理/状态管理 -> 任务调度 -> 输出结果
简单示例:统计 20 秒内每个 input 的 key 输入的次数,典型的例子:统计网站 20 秒内用户的点击次数。
public class WindowCountApplication {
private static final String STREAM_INPUT_TOPIC = "streams-window-input";
private static final String STREAM_OUTPUT_TOPIC = "streams-window-output";
public static void main(String[] args) {
Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, WindowCountApplication.class.getSimpleName());
props.put(BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVERS);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
builder.stream(STREAM_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> Console.log("[input] key={}, value={}", key, value))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(20)))
.count()
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.peek((key, value) -> Console.log("[output] key={}, value={}", key, value))
.to(STREAM_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams kStreams = new KafkaStreams(builder.build(), props);
Runtime.getRuntime().addShutdownHook(new Thread(kStreams::close));
kStreams.start();
}
}
运行结果:{key}={value}
,发送了 3 次 A=1,2 次 B=1,以及 1 次 C=1,统计结果在预期之内,即 A 出现 3 次,B 出现 2 次,C 出现 1 次。
★和 Kafka Stream 类似,也是轻量级的流处理框架,不过它是基于 Pulsar 实现的一个流处理框架,同样的,也是从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。感兴趣的可以参考我之前写的文章:Pulsar Function 简介以及使用
”
工作原理:订阅消息流 -> 处理消息 -> 发布处理结果
简单示例:LocalRunner 模式,按照逗号“,”去切分 input topic 的消息,然后转换成数字进行求和,结果发送至 output topic。
public class IntSumFunction implements Function<String, Integer> {
public static final String BROKER_SERVICE_URL = "pulsar://localhost:6650";
public static final String INPUT_TOPIC = "persistent://public/default/int-sum-input";
public static final String OUTPUT_TOPIC = "persistent://public/default/int-sum-output";
public static final String LOG_TOPIC = "persistent://public/default/int-sum-log";
@Override
public Integer process(String input, Context context) {
Console.log("input: {}", input);
return Arrays.stream(input.split(","))
.map(Integer::parseInt)
.mapToInt(Integer::intValue)
.sum();
}
public static void main(String[] args) throws Exception {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName(IntSumFunction.class.getSimpleName());
functionConfig.setClassName(IntSumFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setInputs(Collections.singleton(INPUT_TOPIC));
functionConfig.setOutput(OUTPUT_TOPIC);
functionConfig.setLogTopic(LOG_TOPIC);
LocalRunner localRunner = LocalRunner.builder()
.brokerServiceUrl(BROKER_SERVICE_URL)
.functionConfig(functionConfig)
.build();
localRunner.start(true);
}
}
运行结果:1+2+3+4+5+6=21
★”
一种流处理框架,具有低延迟、高吞吐量和高可靠性的特性。 支持流处理和批处理,并支持基于事件时间和处理时间的窗口操作、状态管理、容错机制等。 提供了丰富的算子库和 API,支持复杂的数据流处理操作。
工作原理:接收数据流 -> 数据转换 -> 数据处理 -> 状态管理 -> 容错处理 -> 输出结果
简单来说就是将数据流分成多个分区,在多个任务中并行处理,同时维护状态信息,实现高吞吐量、低延迟的流处理。
简单示例:从 9966 端口读取数据,将输入的句子用空格分割成多个单词,每隔 5 秒做一次单词统计。
public class WindowSocketWordCount {
private static final String REGEX = " ";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStreamSource = env.socketTextStream("localhost", 9966);
SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = socketTextStreamSource
.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (sentence, collector) -> {
for (String word : sentence.split(REGEX)) {
collector.collect(new Tuple2<>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
streamOperator.print();
env.execute();
}
}
运行结果:
★”
一个开源的流处理引擎,旨在实现快速、可靠的数据流处理。 是业界最早出现的一个流处理框架(2011 年),但是现在已经有许多其它优秀的流处理框架了,所以它在现在并不是唯一选择。
工作原理:将数据流分成多个小的流(也称为 tuple),并将这些小流通过一系列的操作(也称为 bolt)进行处理。
简单示例:在本地模式,使用 Storm 内置的RandomSentenceSpout
充当数据源进行测试,用空格拆分生成的句子为多个单词,统计每个单词出现次数。
public class WindowedWordCountApplication {
public static void main(String[] args) throws Exception {
StreamBuilder builder = new StreamBuilder();
builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
.window(TumblingWindows.of(Duration.seconds(2)))
.flatMap(sentence -> Arrays.asList(sentence.split(" ")))
.peek(sentence -> Console.log("Random sentence: {}", sentence))
.mapToPair(word -> Pair.of(word, 1))
.countByKey()
.peek(pair -> Console.log("Count word: ", pair.toString()));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("windowedWordCount", new Config(), builder.build());
Utils.sleep(20000);
cluster.shutdown();
}
}
内置的RandomSentenceSpout
随机生成数据关键源代码:
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{
sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
};
final String sentence = sentences[rand.nextInt(sentences.length)];
LOG.debug("Emitting tuple: {}", sentence);
collector.emit(new Values(sentence));
}
运行结果:随机找一个单词“nature”,统计的次数为 10 次。
基于 Spark API 的扩展,支持对实时数据流进行可扩展、高吞吐量、容错的流处理。
工作原理:接收实时输入数据流并将数据分成批次,然后由 Spark 引擎处理以批次生成最终结果流。
简单示例:从 kafka 的 spark-streaming topic 读取数据,按照空格“ ”拆分,统计每一个单词出现的次数并打印。
public class JavaDirectKafkaWordCount {
private static final String KAFKA_BROKERS = "localhost:9092";
private static final String KAFKA_GROUP_ID = "spark-consumer-group";
private static final String KAFKA_TOPICS = "spark-streaming";
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
Configurator.setRootLevel(Level.WARN);
SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("spark-streaming-word-count");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(KAFKA_TOPICS.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP_ID);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
JavaDStream<String> linesStream = messages.map(ConsumerRecord::value);
JavaPairDStream<String, Integer> wordCountStream = linesStream
.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey(Integer::sum);
wordCountStream.print();
streamingContext.start();
streamingContext.awaitTermination();
}
}
运行结果:
简单数据流处理
如果只是轻量级使用的话,可以结合技术栈使用消息中间件自带的流处理框架就更节省成本。
使用的 Kafka 就用 Kafka Stream。
使用的 Pulsar 就用 Pulsar Function。
复杂数据流场景
Flink | Spark Streaming | Storm | |
---|---|---|---|
容错性 | 基于 CheckPoint 机制 | WAL 及 RDD 机制 | Records ACK |
延迟性 | 亚秒级 | 秒级 | 亚秒级 |
吞吐量 | 非常高 | 高 | 中等 |
一致性 | Excatly-Once | Excatly-Once | Excatly-Once |
状态支持 | √ | √ | × |
流批一体 | √ | √ | × |
窗口支持 | √ | √ | √ |
机器学习 | √ | √ | × |
SQL 查询 | √ | √ | × |
图计算 | √ | √ | × |
社区活跃度 | 高 | 高 | 中等 |
综上,可以结合数据规模、技术栈、处理延迟功能特性、未来的考虑、社区活跃度、成本和可用性等等进行选择。
本文系转载,版权归原作者所有,
转载自公众号 xxxx ,如若侵权请联系我们进行删除!
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack