博客 流处理和批处理讲解、主流框架对比、流批一体架构

流处理和批处理讲解、主流框架对比、流批一体架构

   数栈君   发表于 2024-04-28 14:05  56  0

什么是流处理和批处理?

流处理:对数据进行实时处理的方式,数据会以流的形式不断地产生和处理。
流处理可以快速响应数据的变化,及时地进行数据处理和分析,适用于需要实时处理数据的场景。
例如:实时数仓、实时监控、实时推荐等等。

  • 优点
  1. 实时性:数据在产生的时候就立即被处理,能及时反馈结果。
  2. 高效性:不间断接受新数据并进行处理,因此可以更加高效利用硬件资源。
缺点
  1. 数据突发性:因为流式数据具有不可预测性,可能会突然出现突发的高峰,会导致系统压力急剧增加。
  2. 处理复杂度高:实时处理可能需要更高的处理能力和更复杂的算法。

批处理:对数据进行离线处理的方式,数据会按照一定的时间间隔或者数据量进行批量处理。
批处理可以对大量数据进行高效处理和分析,适用于需要对历史数据进行分析和挖掘的场景。
例如:离线数仓、批量报表、离线推荐等等。

  • 优点
  1. 处理复杂度低:通常不需要考虑数据的顺序、时间窗口等因素。
  2. 容错性高:数据多批次集中处理,通常一条数据的失败不会影响后续数据的处理,也可以采用多种容错机制来确保任务正确完成。
缺点
  1. 响应速度慢:由于批处理是周期性执行,不能及时响应数据变化。
  2. 处理结果滞后:由于批处理是周期性执行,在某些场景下可能会出现数据结果滞后的情况。

流处理和批处理都是常用的数据处理方式,它们各有优劣。流处理通常用于需要实时响应的场景,如在线监控和警报系统等。而批处理则通常用于离线数据分析和挖掘等大规模数据处理场景。选择合适的处理方式取决于具体的业务需求和数据处理场景。

什么是流批一体架构?

以前很多系统的架构都是采用的 Lambda 架构,它将所有的数据分成了三个层次:批处理层、服务层和速率层,每个层次都有自己的功能和目的。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3452a5ba5b54df727cc48a42b40f55af..jpg
  • 批处理层:负责离线计算和历史数据的存储。
  • 服务层:负责在线查询和实时数据的处理。
  • 速率层:负责对实时数据进行快速的处理和查询。

这种架构,需要一套流处理平台和一套批处理平台,这就可能导致了一些问题:

  1. 资源浪费:一般来说,白天是流计算的高峰期,此时需要更多的计算资源,相对来说,批计算就没有严格的限制,可以选择凌晨或者白天任意时刻,但是,流计算和批计算的资源无法进行混合调度,无法对资源进行错峰使用,这就会导致资源的浪费。
  2. 成本高:流计算和批计算使用的是不同的技术,意味着需要维护两套代码,不论是学习成本还是维护成本都会更高。
  3. 数据一致性:两套平台都是不一样的,可能会导致数据不一致的问题。

因此,流批一体诞生了!

流批一体的技术理念最早是 2015 年提出的,初衷就是让开发能用同一套代码和 API 实现流计算和批计算,但是那时候实际落地的就少之又少,阿里巴巴在 2020 年双十一首次实际落地。

Flink 流批一体架构

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/3a23046c0038e976465fa6cfc70cf1d6..jpg

有哪些流处理框架?

  1. Kafka Stream
  2. Pulsar Function
  3. Flink
  4. Storm
  5. Spark Streaming

接下来分别介绍这几个主流的流处理框架

Kafka Stream

基于 Kafka 的一个轻量级流式计算框架,我们可以使用它从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。

工作原理:读取数据流 -> 数据转换/时间窗口处理/状态管理 -> 任务调度 -> 输出结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c3f135fd7ad7f394a33a7b83e7e76383..jpg

简单示例:统计 20 秒内每个 input 的 key 输入的次数,典型的例子:统计网站 20 秒内用户的点击次数。

public class WindowCountApplication {

    private static final String STREAM_INPUT_TOPIC = "streams-window-input";
    private static final String STREAM_OUTPUT_TOPIC = "streams-window-output";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, WindowCountApplication.class.getSimpleName());
        props.put(BOOTSTRAP_SERVERS_CONFIG, KafkaConstant.BOOTSTRAP_SERVERS);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(STREAM_INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
                .peek((key, value) -> Console.log("[input] key={}, value={}", key, value))
                .groupByKey()
                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(20)))
                .count()
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), value))
                .peek((key, value) -> Console.log("[output] key={}, value={}", key, value))
                .to(STREAM_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams kStreams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(kStreams::close));
        kStreams.start();
    }
}

运行结果{key}={value},发送了 3 次 A=1,2 次 B=1,以及 1 次 C=1,统计结果在预期之内,即 A 出现 3 次,B 出现 2 次,C 出现 1 次。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a69db8ddadbeab1fe8edcad33ef43569..jpg

Pulsar Function

和 Kafka Stream 类似,也是轻量级的流处理框架,不过它是基于 Pulsar 实现的一个流处理框架,同样的,也是从一个或多个输入流中读取数据,对数据进行转换和处理,然后将结果写入一个或多个输出流中。感兴趣的可以参考我之前写的文章:Pulsar Function 简介以及使用

工作原理:订阅消息流 -> 处理消息 -> 发布处理结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/567e164f403580512837ed9f43592643..jpg

简单示例:LocalRunner 模式,按照逗号“,”去切分 input topic 的消息,然后转换成数字进行求和,结果发送至 output topic。

public class IntSumFunction implements Function<StringInteger{

    public static final String BROKER_SERVICE_URL = "pulsar://localhost:6650";
    public static final String INPUT_TOPIC = "persistent://public/default/int-sum-input";
    public static final String OUTPUT_TOPIC = "persistent://public/default/int-sum-output";
    public static final String LOG_TOPIC = "persistent://public/default/int-sum-log";

    @Override
    public Integer process(String input, Context context) {
        Console.log("input: {}", input);
        return Arrays.stream(input.split(","))
                .map(Integer::parseInt)
                .mapToInt(Integer::intValue)
                .sum();
    }

    public static void main(String[] args) throws Exception {
        FunctionConfig functionConfig = new FunctionConfig();
        functionConfig.setName(IntSumFunction.class.getSimpleName());
        functionConfig.setClassName(IntSumFunction.class.getName());
        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
        functionConfig.setInputs(Collections.singleton(INPUT_TOPIC));
        functionConfig.setOutput(OUTPUT_TOPIC);
        functionConfig.setLogTopic(LOG_TOPIC);

        LocalRunner localRunner = LocalRunner.builder()
                .brokerServiceUrl(BROKER_SERVICE_URL)
                .functionConfig(functionConfig)
                .build();
        localRunner.start(true);
    }
}

运行结果:1+2+3+4+5+6=21

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/cecdd28a506851f1ea5463481f83df81..jpg

Flink

  • 一种流处理框架,具有低延迟、高吞吐量和高可靠性的特性。
  • 支持流处理和批处理,并支持基于事件时间和处理时间的窗口操作、状态管理、容错机制等。
  • 提供了丰富的算子库和 API,支持复杂的数据流处理操作。

工作原理:接收数据流 -> 数据转换 -> 数据处理 -> 状态管理 -> 容错处理 -> 输出结果

简单来说就是将数据流分成多个分区,在多个任务中并行处理,同时维护状态信息,实现高吞吐量、低延迟的流处理。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/c04361c1aee89a193824f75c6ed9c08a..jpg

简单示例:从 9966 端口读取数据,将输入的句子用空格分割成多个单词,每隔 5 秒做一次单词统计。

public class WindowSocketWordCount {

    private static final String REGEX = " ";

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketTextStreamSource = env.socketTextStream("localhost"9966);

        SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = socketTextStreamSource
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (sentence, collector) -> {
                    for (String word : sentence.split(REGEX)) {
                        collector.collect(new Tuple2<>(word, 1));
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        streamOperator.print();
        env.execute();
    }
}

运行结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/44b964a6ebb4f1c17c51fe5146e846c0..jpg

Storm

  • 一个开源的流处理引擎,旨在实现快速、可靠的数据流处理。
  • 是业界最早出现的一个流处理框架(2011 年),但是现在已经有许多其它优秀的流处理框架了,所以它在现在并不是唯一选择。

工作原理:将数据流分成多个小的流(也称为 tuple),并将这些小流通过一系列的操作(也称为 bolt)进行处理。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e2362e26c7eb5d2243e87c1a5cbade3f..jpg

简单示例:在本地模式,使用 Storm 内置的RandomSentenceSpout充当数据源进行测试,用空格拆分生成的句子为多个单词,统计每个单词出现次数。

public class WindowedWordCountApplication {

    public static void main(String[] args) throws Exception {
        StreamBuilder builder = new StreamBuilder();
        builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
                .window(TumblingWindows.of(Duration.seconds(2)))
                .flatMap(sentence -> Arrays.asList(sentence.split(" ")))
                .peek(sentence -> Console.log("Random sentence: {}", sentence))
                .mapToPair(word -> Pair.of(word, 1))
                .countByKey()
                .peek(pair -> Console.log("Count word: ", pair.toString()));

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("windowedWordCount"new Config(), builder.build());
        Utils.sleep(20000);
        cluster.shutdown();
    }
}

内置的RandomSentenceSpout随机生成数据关键源代码:

@Override
public void nextTuple() {
    Utils.sleep(100);
    String[] sentences = new String[]{
        sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
        sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")
    };
    final String sentence = sentences[rand.nextInt(sentences.length)];
    LOG.debug("Emitting tuple: {}", sentence);
    collector.emit(new Values(sentence));
}

运行结果:随机找一个单词“nature”,统计的次数为 10 次。http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/641c239caab0211ae68e0e6b0cdfcb75..jpg

Spark Streaming

基于 Spark API 的扩展,支持对实时数据流进行可扩展、高吞吐量、容错的流处理。

工作原理:接收实时输入数据流并将数据分成批次,然后由 Spark 引擎处理以批次生成最终结果流。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9efae11b1da2fdd7e19260ebf47ea768..jpg

简单示例:从 kafka 的 spark-streaming topic 读取数据,按照空格“ ”拆分,统计每一个单词出现的次数并打印。

public class JavaDirectKafkaWordCount {

    private static final String KAFKA_BROKERS = "localhost:9092";
    private static final String KAFKA_GROUP_ID = "spark-consumer-group";
    private static final String KAFKA_TOPICS = "spark-streaming";
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        Configurator.setRootLevel(Level.WARN);
        SparkConf sparkConf = new SparkConf().setMaster("local[1]").setAppName("spark-streaming-word-count");
        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(2));

        Set<String> topicsSet = new HashSet<>(Arrays.asList(KAFKA_TOPICS.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP_ID);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        JavaDStream<String> linesStream = messages.map(ConsumerRecord::value);
        JavaPairDStream<String, Integer> wordCountStream = linesStream
                .flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey(Integer::sum);

        wordCountStream.print();

        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

运行结果

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/11caf994550d7df1d01286432b4239cc..jpg

如何选择流处理框架?

简单数据流处理

如果只是轻量级使用的话,可以结合技术栈使用消息中间件自带的流处理框架就更节省成本。

  • 使用的 Kafka 就用 Kafka Stream。

  • 使用的 Pulsar 就用 Pulsar Function。

复杂数据流场景


FlinkSpark StreamingStorm
容错性基于 CheckPoint 机制WAL 及 RDD 机制Records ACK
延迟性亚秒级秒级亚秒级
吞吐量非常高中等
一致性Excatly-OnceExcatly-OnceExcatly-Once
状态支持×
流批一体×
窗口支持
机器学习×
SQL 查询×
图计算×
社区活跃度中等

综上,可以结合数据规模、技术栈、处理延迟功能特性、未来的考虑、社区活跃度、成本和可用性等等进行选择。

本文系转载,版权归原作者所有,
转载自公众号 xxxx ,如若侵权请联系我们进行删除!  

《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs

《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群