// 批量消费配置: 1批量, 2手动提交
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
// 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
// 一次poll操作最大获取的记录数量
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
// 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
// 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
// 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500
// 在消费者方法中注入acknowledgment并在执行完业务逻辑后手动调用确认方法
acknowledgment.acknowledge();
/**
* Set to true if this endpoint should create a batch listener.
* @param batchListener true for a batch listener.
* @since 1.1
*/
public void setBatchListener(Boolean batchListener) {
this.batchListener = batchListener;
}
@KafkaListener(id = "", groupId = "", topics = {})
public void listen(ConsumerRecord<String, String> data) {
}
@KafkaListener(id = "", groupId = "", topics = {}, containerFactory = "")
public void listen(List<ConsumerRecord<String, String>> datas) {
}
/**
* Invoked with data from kafka.
* @param data the data to be processed.
* @param acknowledgment the acknowledgment.
*/
@Override
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
@Bean(name = "batch_and_manual_ack_ContainerFactory")重要的一句是:.setAckMode(AckMode.MANUAL_IMMEDIATE);,该配置的缺省值是AckMode.BATCH。可以从ContainerProperties类的源码找到缺省值:
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batch_and_manual_ack_ContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 批量消费配置: 1批量, 2手动提交
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
* The ack mode to use when auto ack (in the configuration properties) is false.
* <ul>
* <li>RECORD: Ack after each record has been passed to the listener.</li>
* <li>BATCH: Ack after each batch of records received from the consumer has been
* passed to the listener</li>
* <li>TIME: Ack after this number of milliseconds; (should be greater than
* {@code #setPollTimeout(long) pollTimeout}.</li>
* <li>COUNT: Ack after at least this number of records have been received</li>
* <li>MANUAL: Listener is responsible for acking - use a
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
* </ul>
*/
private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
@Bean(name = "batch_and_manual_ack_ContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batch_and_manual_ack_ContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
// 批量消费配置: 1批量, 2手动提交
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<String, Object>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
// 一次poll操作最大获取的记录数量
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
// 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
// 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
// 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500
return propsMap;
}
@KafkaListener(groupId = "junit-test-group", containerFactory = "batch_and_manual_ack_ContainerFactory", topics = {"test"})
public void test_batchConsume(List<ConsumerRecord<String, String>> datas, Acknowledgment acknowledgment) {
System.out.println(new Date() + " datas = " + datas.size());
System.out.println(new Date() + " collect = " + datas.stream().map(t -> t.offset()).collect(Collectors.toList()));
// 最后一定要提交进度 (用于持久化进度到Kafka)
acknowledgment.acknowledge();
}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack