博客 Spring-Kafka如何实现批量消费消息并且不丢失数据

Spring-Kafka如何实现批量消费消息并且不丢失数据

   数栈君   发表于 2023-09-05 09:59  236  0

Spring-Kafka如何实现批量消费消息并且不丢失数据

先给答案:

    // 批量消费配置: 1批量, 2手动提交
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    
    // 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
    // 一次poll操作最大获取的记录数量
    propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
    // 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
    propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
    // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
    // 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
    propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500

    // 在消费者方法中注入acknowledgment并在执行完业务逻辑后手动调用确认方法
    acknowledgment.acknowledge();

1、背景:

某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同一个对象的处理。其实只需要所有表插入完毕后,一次处理该对象即可。

2、现有技术架构:

mysql --> canal --> Kafka --> Spring-Kafka消费者 --> 下游接口

3、解决方案:

优化Spring-Kafka消费者,从单条处理改为多条处理,然后在消费者中合并相同的对象,从而达到一个对象只处理一次(最多两次)的目的。为什么有可能是两次,因为分批的时候很容易将同一个对象的多条消息分到上下两批,这样这个对象将会被处理两次;那为什么不会处理三次?其实也能够制造出来上次的情况,就是分批的大小如果很小就容易出现三次,甚至更多次。所以通常情况我们将分批大小设置得要比表数量多。

举个例子:一个对象的创建有10张表的插入消息,而分批大小设置成4,此时10条消息就被4这个批大小拆分成3批,每批处理一次该对象,那么该对象就会被处理3次。所以分批大小是一个重要的参数,其值的设定通常要大,但再大也不可避免被分成两批的情况。

4、实现步骤

Spring-kafka从1.1版本开始就支持了批量消费,需要在ContainerFactory中设置batchListener=true
同时设置消费者参数 max.poll.records 来控制一批的最大记录数量,该参数的缺省值为500。

AbstractKafkaListenerContainerFactory类的源码如下:

   /**
    * Set to true if this endpoint should create a batch listener.
    * @param batchListener true for a batch listener.
    * @since 1.1
    */
    public void setBatchListener(Boolean batchListener) {
        this.batchListener = batchListener;
    }

可以从javaDoc的@since得知该功能从1.1版本就存在了,所以只要Spring-Kafka的版本高于1.1的都支持批量消费功能。这个参数是搭配@KafkaListener来使用的。单条消费的写法如下:

@KafkaListener(id = "", groupId = "", topics = {})
public void listen(ConsumerRecord<String, String> data) {
}

以上只能一次处理一条,并不能将多条消息统筹处理,所以使用以下的批量消费的写法:

@KafkaListener(id = "", groupId = "", topics = {}, containerFactory = "")
public void listen(List<ConsumerRecord<String, String>> datas) {
}

从以上代码可以看出,批量消费仅仅是将参数的ConsumerRecord类型改为List<ConsumerRecord>即可。
这样就行了吗?如果批次较大,对这一批数据的处理时间较长,就容易造成丢数据。场景如下:
1、开启自动提交offset,
2、提交offset的时间间隔为1s,
3、处理这一批数据耗时为2s。
还需要发生以下条件才能导致丢消息:
1、“自动提交offset的时间”到了且成功执行了offset的提交
2、此后几乎同时程序发生了严重的错误导致进程退出(注意此时消费者的代码逻辑并未执行完毕)
那么消息就会被丢失,因为Kafka收到了offset的提交,所以Kafka认为这一批消息已经处理成功了,但程序实际并未处理成功,等到下次启动程序,将从Kafka记录的offset开始消费,“记录的offset”就是发生异常退出前“提交的offset”。所以上次异常退出时刻的那一批消息就被丢失了,不会再被消费到。

举例:
假设消费者这一批消费到的消息offset编码列表为5,6,7。而自动提交offset时候会将7提交给Kafka,表示下一个消息将从8开始消费。但567这3条消息在消费者进程中还没来得及处理完毕就被意外终止了。等到人工处理错误重新启动程序,将从8开始消费,因为Kafka认为567已经处理过了,但实际567并没有成功处理,所以就会丢失567这一批的消息。

在进一步,如何防止消息丢失呢?答案是手动提交offset,同样Spring-Kafka已经提供了支持,其实Spring-Kafka只是对原生Kafka的包装,最核心的还是原生Kafka支持手动提交offset的能力。

上干货,Spring-Kafka中有一个类很有用:AcknowledgingMessageListener,这个类就是为了支持手动ack消息的,也即是手动提交offset,只是Spring将“手动提交offset”这个概念包装成了“确认消息”,里面有一个方法:

   /**
    * Invoked with data from kafka.
    * @param data the data to be processed.
    * @param acknowledgment the acknowledgment.
    */
    @Override
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

这个类是设计成去实现它从而获取手动提交offset的能力,但我们还可以简化,结合之前的@KafkaListener的方法,我们将Acknowledgment acknowledgment参数放在@KafkaListener的方法中,Spring就能够将Acknowledgment对象传递进来,从而我们可以自己控制何时“确认消息”。当然仅有这步还不够,还需要告诉Spring-Kafka我需要手动提交offset,通过一个简单的设置即可:

    @Bean(name = "batch_and_manual_ack_ContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batch_and_manual_ack_ContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 批量消费配置: 1批量, 2手动提交
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
重要的一句是:.setAckMode(AckMode.MANUAL_IMMEDIATE);,该配置的缺省值是AckMode.BATCH。可以从ContainerProperties类的源码找到缺省值:

   /**
    * The ack mode to use when auto ack (in the configuration properties) is false.
    * <ul>
    * <li>RECORD: Ack after each record has been passed to the listener.</li>
    * <li>BATCH: Ack after each batch of records received from the consumer has been
    * passed to the listener</li>
    * <li>TIME: Ack after this number of milliseconds; (should be greater than
    * {@code #setPollTimeout(long) pollTimeout}.</li>
    * <li>COUNT: Ack after at least this number of records have been received</li>
    * <li>MANUAL: Listener is responsible for acking - use a
    * {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
    * </ul>
    */
  private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;

要想提高吞度量,须要设置一下几个参数:

max.poll.records
一次poll操作最大获取的记录数量,缺省是500。该值越大,则吞吐量也越大,但要求消费者能够在不超时的情况下处理完所有的消息。

fetch.min.bytes
一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 缺省是1B

fetch.max.wait.ms
一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者,缺省是500。
需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms

5、所有代码:


    @Bean(name = "batch_and_manual_ack_ContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batch_and_manual_ack_ContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        // 批量消费配置: 1批量, 2手动提交
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<String, Object>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

        // 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
        // 一次poll操作最大获取的记录数量
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
        // 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
        // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
        // 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
        propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500

        return propsMap;
    }
    @KafkaListener(groupId = "junit-test-group", containerFactory = "batch_and_manual_ack_ContainerFactory", topics = {"test"})
    public void test_batchConsume(List<ConsumerRecord<String, String>> datas, Acknowledgment acknowledgment) {
        System.out.println(new Date() + " datas = " + datas.size());
        System.out.println(new Date() + " collect = " + datas.stream().map(t -> t.offset()).collect(Collectors.toList()));
        // 最后一定要提交进度 (用于持久化进度到Kafka)
        acknowledgment.acknowledge();
    }

将以上代码放在某个Spring的类里,然后修改配置即可使用






免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
上一篇:全场景实时湖仓
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群