Kafka 是一款高吞吐量、低延迟的分布式消息系统。本文将详细介绍如何在 Spring Boot 项目中使用 Kafka 进行消息接收与消费,并结合幂等和重试机制,确保消息消费的可靠性和系统的扩展性。我们将以电商交易系统为案例进行深入解析。
+----------------+ Kafka +----------------+
| 订单服务 | ---> Produce ---> | 消费服务 |
| (Order Service)| Topic | (Consumer Service)|
+----------------+ +----------------+
| |
MySQL MySQL
主要流程:
1.订单服务:接收用户订单请求后,异步将订单信息发送到 Kafka。
2.消费服务:从 Kafka 中消费订单信息,更新库存、生成发货信息等操作。
3.数据库:使用 MySQL 存储订单和库存数据,并通过 MyBatis 实现持久化操作。
<dependencies>
<!-- Spring Boot Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- MySQL Driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- MyBatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok (可选,用于简化代码) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
为实现电商系统的消息接收与消费,以下是两个主要数据库表:订单表和消费记录表。
·订单表(orders):存储订单的基础信息。
·消费记录表(message_consume_record):记录消费过的消息,用于幂等校验。
CREATE TABLE orders (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_no VARCHAR(64) NOT NULL,
user_id BIGINT NOT NULL,
total_price DECIMAL(10, 2) NOT NULL,
status INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE message_consume_record (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_key VARCHAR(64) NOT NULL UNIQUE,
consumed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 3
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers: 这是 Kafka 服务的地址,Kafka 集群通常由多个 Broker 组成,每个 Broker 提供消息的存储与转发功能。这里指定了本地的 Kafka 服务器(localhost:9092),如果有多个 Broker,可以用逗号分隔(例如:localhost:9092,localhost:9093)。
retries: 当消息发送失败时,生产者将重试发送的次数。这里配置了 3 次重试。这在网络不稳定或 Kafka 节点暂时不可用时非常有用,可以有效提高消息发送成功率。
key-serializer: 生产者发送的消息可以有一个键值对。key-serializer 用于将消息的键序列化为字节数组。这里使用了 StringSerializer,表示消息的键是字符串形式,序列化为字节后发送。
value-serializer: 类似于键,value-serializer 用于将消息的值序列化为字节数组。配置 StringSerializer 表示消息内容是字符串。
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrderMessage(String orderId) {
kafkaTemplate.send("order-topic", orderId);
}
}
在 OrderService 中,用户提交订单后,可以将订单 ID 发送至 Kafka:
@Service
public class OrderService {
@Autowired
private OrderProducer orderProducer;
public void createOrder(OrderDTO order) {
// 保存订单逻辑...
orderProducer.sendOrderMessage(order.getOrderId());
}
}
spring:
kafka:
consumer:
group-id: order-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: 消费者组 ID。Kafka 允许多个消费者组监听同一个 Topic,每个消费者组可以独立消费消息。此处配置 order-group,意味着该消费者属于订单消费逻辑的消费者组。
auto-offset-reset: 指定消费者在没有初始偏移量(offset)或当前偏移量无效的情况下,从哪里开始读取消息。earliest 表示从最早的可用消息开始消费,这对于新启动的消费者非常有用,能够确保读取历史数据。
key-deserializer: 将接收到的消息键从字节数组反序列化为 Java 对象。这里配置 StringDeserializer,表示键是字符串。
value-deserializer: 类似于键的反序列化,value-deserializer 用于将消息内容反序列化为 Java 对象。配置 StringDeserializer,表示消息内容是字符串。
@Service
public class OrderConsumer {
@Autowired
private OrderService orderService;
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consumeOrder(String orderId) {
orderService.processOrder(orderId);
}
}
在 OrderService 中,处理接收到的订单消息:
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Transactional
public void processOrder(String orderId) {
// 根据订单 ID 更新订单状态、库存等操作
Order order = orderMapper.findById(orderId);
// 更新订单逻辑...
}
}
@Service
public class OrderConsumer {
@Autowired
private MessageConsumeRecordMapper consumeRecordMapper;
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consumeOrder(String orderId) {
if (consumeRecordMapper.existsByMessageKey(orderId)) {
// 如果已经处理过该消息,直接返回
return;
}
// 处理订单
orderService.processOrder(orderId);
// 记录已处理消息
consumeRecordMapper.insertConsumeRecord(orderId);
}
}
MessageConsumeRecordMapper 接口用于操作消费记录表:
@Mapper
public interface MessageConsumeRecordMapper {
boolean existsByMessageKey(String messageKey);
void insertConsumeRecord(String messageKey);
}
通过这种方式,我们确保了每条消息只被消费一次,避免重复处理订单数据。
CREATE TABLE failed_message (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_key VARCHAR(64) NOT NULL,
payload TEXT NOT NULL,
failed_reason TEXT,
retry_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
在消费消息失败时,将消息记录到失败表中,并定期进行重试。
@Service
public class OrderConsumer {
@Autowired
private FailedMessageMapper failedMessageMapper;
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consumeOrder(String orderId) {
try {
orderService.processOrder(orderId);
} catch (Exception e) {
failedMessageMapper.insertFailedMessage(orderId, e.getMessage());
}
}
}
通过定时任务或手动触发,定期查询失败的消息并重新消费:
@Service
public class FailedMessageRetryService {
@Autowired
private FailedMessageMapper failedMessageMapper;
@Scheduled(fixedDelay = 60000) // 每分钟重试一次
public void retryFailedMessages() {
List<FailedMessage> failedMessages = failedMessageMapper.findAll();
for (FailedMessage message : failedMessages) {
try {
orderService.processOrder(message.getPayload());
failedMessageMapper.deleteById(message.getId());
} catch (Exception e) {
failedMessageMapper.incrementRetryCount(message.getId());
}
}
}
}
为了使系统具备良好的扩展性,我们需要考虑以下几个方面:
public interface MessageConsumer {
void consume(String payload);
}
@Service
public class KafkaOrderConsumer implements MessageConsumer {
@Override
public void consume(String payload) {
// Kafka 消息消费逻辑
}
}
通过这种设计,可以轻松添加新的消息类型或处理逻辑,而不需要修改现有代码。
message-consumers:
order:
type: kafka
topic: order-topic
user:
type: http
url: http://example.com/user/message
通过读取这些配置,系统可以动态选择不同的消费逻辑,从而增强扩展性。
@Async
public void processOrderAsync(String orderId) {
orderService.processOrder(orderId);
}
Kafka 支持批量消费消息,这样可以减少 Kafka 客户端与 Broker 之间的交互次数,提升性能。在 Spring Boot 中,可以通过配置 max.poll.records 参数控制每次批量消费的消息数量。
spring:
kafka:
consumer:
max-poll-records: 500
通过为 Kafka 的 Topic 配置多个分区,并为消费者组中的消费者分配不同的分区,可以实现并行消费,从而提升系统的消费能力。
spring:
kafka:
consumer:
concurrency: 3
spring:
kafka:
consumer:
concurrency: 3 # 配置多个消费者进行并行处理
2.批量消费:通过 max.poll.records 参数配置每次拉取的消息数量。增加批量消费可以减少 Kafka 消费者与 Broker 之间的交互,从而提升性能。
spring:
kafka:
consumer:
max-poll-records: 500 # 每次批量拉取 500 条消息
在消费端,消息的处理速度是决定 Kafka 消费效率的关键。因此,需要对消费逻辑进行优化:
1.异步处理:在消息处理完成后再返回响应,可能导致整个消费过程变慢。可以通过使用异步任务处理消息内容,从而避免阻塞 Kafka 消费的主线程。可以结合 Spring 的 @Async 注解实现异步处理。
@Async
public void processOrderAsync(String orderId) {
// 异步处理订单消息
orderService.processOrder(orderId);
}
2.缩短消息处理时间:简化业务逻辑,避免冗长的处理流程。使用缓存等方式减少对数据库的频繁访问,降低 I/O 操作带来的性能开销。
1.限流机制:在生产者端通过限流策略,控制每秒钟向 Kafka 发送的消息数量,确保消费者有足够的时间处理消息。例如,可以使用 RateLimiter 实现限流。
RateLimiter rateLimiter = RateLimiter.create(1000); // 每秒最多发送 1000 条消息
public void sendMessage(String topic, String message) {
rateLimiter.acquire(); // 获取许可
kafkaTemplate.send(topic, message);
}
2.分布式限流:如果消息的生产端部署在多个节点上,可以使用 Redis 等工具实现分布式限流。
spring:
kafka:
consumer:
enable-auto-commit: false # 关闭自动提交位移
手动提交消费位移:
try {
// 消费处理消息
processMessage(record);
// 手动提交位移
acknowledgment.acknowledge();
} catch (Exception e) {
// 处理异常
}
如果消息堆积严重,可以通过 Kafka 的 retention.ms 参数设置消息的存储时间,确保超过存储时间的消息自动删除,防止 Kafka 分区文件无限制增长。
log.retention.ms=604800000 # 配置 Kafka 日志文件的保留时间,单位为毫秒,这里设置为 7 天
此外,可以通过配置 log.retention.bytes 来限制 Kafka 每个分区的日志文件大小,确保超出大小限制后自动删除最早的消息。
log.retention.bytes=1073741824 # 配置 Kafka 分区日志文件的最大大小,单位为字节,这里设置为 1 GB
对于大数据量的消息,可以启用 Kafka 消息压缩功能,减少消息的占用空间,从而提升生产和消费的效率。Kafka 支持多种压缩算法,包括 GZIP、LZ4 和 SNAPPY。
spring:
kafka:
producer:
compression-type: gzip # 启用 GZIP 压缩
压缩不仅可以减少网络传输的数据量,还可以降低 Kafka Broker 和消费端的存储压力,从而减少消息堆积的可能性。
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据资产管理白皮书》下载地址:https://www.dtstack.com/resources/1073/?src=bbs
《行业指标体系白皮书》下载地址:https://www.dtstack.com/resources/1057/?src=bbs
《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001/?src=bbs
《数栈V6.0产品白皮书》下载地址:https://www.dtstack.com/resources/1004/?src=bbs
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack