前言
在系统中引入消息中间件不可避免的需要处理如下的几个问题:
- 如何保证消息不丢失?
- 如果解决消息重复消费的问题?
- 如何保证消息的顺序消费?
- 消息消费失败了怎么办??
文章就上述几个问题,通过 Java + Kafka 来解释
Kafka 的三种消息投递语义
| 语义 | 含义 | 说明 | 实现方式 |
|---|---|---|---|
| At most once | 最多一次 | 有丢失消息的风险 | 生产者投递失败后不重试 |
| At least once | 至少一次 | 会导致消息重复 | - 生产者投递失败后进行重试 - 消费者手动提交 ACK |
| Exactly once | 只有一次 | 消息不丢失,消息不重复 | Kafka 事务 |
kafka 处于哪种消息语义取决于生产者配置的 acks 参数,不同 acks 配置的含义:
acks=0: 生产者发送消息后,完全不等待 Broker 的确认,立即发送下一条消息。吞吐量高,但消息可能在 Broker 接收到之前丢失或者 Broker 只写入了 Leader Partition,将消息写入 Follower Partition 的时候 Leader Partition 所在的 Broker 挂掉,导致消息丢失。acks=1: 消息只要写入 Leader Partition 就立即发送 ACK。如果 Broker 在将消息写入 Follower Partition 之前挂掉了,会导致消息丢失。acks=all或者acks=-1: 消息成功写入到 Leader Partition 以及所有的 Follower Partition 之后,Broker 才会给 Producer 发送 ACK。这种情况下不会导致消息的丢失。
根据上述 acks 参数的配置:
acks=0表示 At most once 语义acks=1或acks=all表示的是 At least once 语义
spring-kafka 包中,acks 的默认值是 acks=1。
如何保证消息不丢失?
消息的流转过程: Producer -> Broker -> Consumer
上述环节中的任意一个环节都有可能造成消息的丢失:
- 在 Producer 和 Broker 之间,为了防止消息丢失,需要引入 ACK 确认机制
- 在 Broker 和 Consumer 之间,为了防止消息丢失,需要关闭自动提交 offet 的功能,改成手动提交 offset
- Broker 配置不当也可能导致消息的丢失,需要合理的配置:
replication.factor和min.insync.replicas参数。replication.factor参数表示每个 partition 有多少副本min.insync.replicas参数表示集群中每个 partition 至少需要多少副本与 Leader 副本保持同步,集群才可以正常运行。如果处于同步状态的副本数低于min.insync.replicas参数,将消息发送到 Broker 时,会被拒绝写入。
看下在 Spring Boot 项目中如何配置 application.yaml 文件:
spring:
kafka:
consumer:
enable-auto-commit: false # 关闭自动提交 Offset
auto-offset-reset: earliest # 找不到 partition 的历史消费 offset 时,将 offset 重置到 partition 最早的位置开始消费
listener:
ack-mode: manual_immediate # 消费完毕后立即提交 ack
producer:
acks: all # Leader 和 Follower Partition 都成功写入后 Broker 再发送 ack
properties:
enable.idempotence: true # 开启幂等性,确保每个消息只会被存储一次。通过 producer id + sequence id 保证
max.in.flight.requests.per.connection: 5 # 最多允许多少条消息等待 broker 的 ACK。设置成 1 表示消息顺序发送。
retries: 2147483647 # 重试次数
request.timeout.ms: 30000 # 发送请求后等待 Broker 响应的超时时间
delivery.timeout.ms: 120000 # 消息从准备到最终被 Broker 接收并返回 ACK 确认的超时时间,其中包括了 request.timeout.ms
linger.ms: 5 # 每个 batch(批次) 收集消息的时间,单位 ms。设置成 0 表示有消息立即发送。大于 0 表示每隔 n 毫秒发送一次消息。
如何解决消息重复消费?
什么情况下会导致重复消费?
- Producer 将同一条消息多次发送给 Broker,导致 Broker 中存在多条相同的消息
- Broker 将同一条消息多次发送给 Consumer
生产端和消费端都有可能导致消息重复消费,接下来看看如何解决
生产端
生产端导致重复消费的问题,可以让 Producer 开启幂等模式。开启幂等模式后,Producer 发送的消息会携带 Producer Id(PID) 以及自增的序列号,Broker 会根据 PID 以及序列号判断消息在 Broker 中是否存在,如果存在直接返回 ACK,如果不存在,则将消息成功写入 Broker 后,再发送 ACK。
spring:
kafka:
producer:
acks: all # Leader 和 Follower Partition 都成功写入后 Broker 再发送 ack
properties:
enable.idempotence: true # 开启幂等性,确保每个消息只会被存储一次。通过 producer id + sequence id 保证
max.in.flight.requests.per.connection: 5 # 最多允许多少条消息等待 broker 的 ACK。设置成 1 表示消息顺序发送。
retries: 2147483647 # 重试次数
request.timeout.ms: 30000 # 发送请求后等待 Broker 响应的超时时间
delivery.timeout.ms: 120000 # 消息从准备到最终被 Broker 接收并返回 ACK 确认的超时时间,其中包括了 request.timeout.ms
linger.ms: 5 # 每个 batch(批次) 收集消息的时间,单位 ms。设置成 0 表示有消息立即发送。大于 0 表示每隔 n 毫秒发送一次消息。
这种方式有个问题: 幂等只在 Producer 和 Broker 的同一次会话中有效。Producer ID 是每次建立会话时 Broker 随机分配的,Broker 不会验证消息的内容,只会根据 Producer ID + 序列号 判断消息是否重复。而 Producer 重启后,会被重新分配 PID,导致幂等只在 Producer 与 Broker 的同一次会话中有效,新建立的会话会重新开始计算。
消费端
Broker 多次投递同一条消息给 Consumer,Consumer 在消费时,需要幂等的消费消息,才能保证系统状态的正确性。
幂等消费常用的方案:
- 数据库唯一键约束去重
- Redis + TTL 去重
- 状态机幂等
数据库唯一键约束
设计思路:
-
设计一张消费表,每个消息中携带一个全局唯一的 ID,
-
消费时先 insert 记录
-
如果插入成功,表示第一次消费,接着执行业务逻辑
-
如果插入失败,表示已经消费过,直接返回 ACK 即可。
适用场景: 消息会落库/业务的最终状态在数据库中
@KafkaListener(topics = "order.created", groupId = "order-group")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String eventId = extractEventId(record); // 从 header/payload 提取
// 1) 先做“去重标记”
boolean first = consumedRepo.tryMarkConsumed(
"order-group",
record.topic(),
record.partition(),
record.offset(),
eventId
);
if (!first) {
// 重复消息:直接 ack,推进 offset(否则会无限重复拉到)
ack.acknowledge();
return;
}
try {
// 2) 执行业务
orderService.handleCreated(record.value());
// 3) 业务成功后提交 offset
ack.acknowledge();
} catch (Exception e) {
// 业务失败:不 ack,让 Spring Kafka 的重试/DLT 接管
throw e;
}
}
Redis + TTL
设计思路:
-
使用
SETNX指令,将消息唯一 ID 作为 Key, Key 对应的 value 随意,建议设置成数字,节省内存空间 -
设置 TTL
-
如果
SETNX指令返回 true,说明是第一次消费,继续执行业务逻辑 -
如果
SETNX指令返回 false, 说明已经消费过了,直接返回 ACK
适用场景:
- 业务结果不一定落 DB
- 需要极高吞吐
- 能接受 TTL 窗口内去重(通常足够)
@Service
public class RedisIdempotencyService {
private final StringRedisTemplate redis;
public RedisIdempotencyService(StringRedisTemplate redis) {
this.redis = redis;
}
/** @return true=首次消费,false=重复 */
public boolean tryAcquire(String key, Duration ttl) {
Boolean ok = redis.opsForValue().setIfAbsent(key, "1", ttl);
return Boolean.TRUE.equals(ok);
}
/** 可选:失败时释放(谨慎使用) */
public void release(String key) {
redis.delete(key);
}
}
@KafkaListener(topics = "order.created", groupId = "order-group")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
String eventId = extractEventId(record);
String idemKey = "kafka:idem:order-group:" + eventId;
boolean first = idem.tryAcquire(idemKey, Duration.ofDays(3));
if (!first) {
ack.acknowledge();
return;
}
try {
orderService.handleCreated(record.value());
ack.acknowledge();
} catch (Exception e) {
// 这里是否 release?——通常不建议直接 release
// 因为如果业务已部分成功但你没感知到,release 会导致重复执行更危险
throw e;
}
}
注意:
- 不要轻易的释放 redis 中用于判断消息是否被处理过的 key,否则可能会出现重复消费问题
- key 的 TTL 一定要合理设置。
状态机幂等
设计思路: 状态之间的转换关系是确定的,例如: CREATED -> PAID -> SHIPPED -> FINISHED,重复消费消息只会尝试做同样的状态转换,不会造成副作用。
适用场景: 订单、支付、物流、账户等状态流转型业务
状态转换的逻辑: 当前状态 + 输入事件 => 新状态
-- 只有当状态还是 CREATED 才能推进到 PAID
UPDATE orders
SET status = 'PAID', paid_time = NOW()
WHERE order_id = ? AND status = 'CREATED';
Java 代码
@KafkaListener(topics = "order.created", groupId = "order-group")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
int updated = jdbcTemplate.update(sql, orderId);
if(updated == 0) {
// 可能已经处理过,也可能是乱序消息
// 假设有 1 -> 2 -> 3 -> 4 这 4 种状态转换
// 记录的当前状态是 3
// 此时来了一个从 1 -> 2 的状态转换的消息
// 由于当前状态是 3,因此消息是处理过了,直接丢弃
// 处理过的消息也可以用 redis 快速去重
if(isPrevStateMessage(record)) {
ack.acknowledge();
return;
}
// 假设有 1 -> 2 -> 3 -> 4 这 4 种状态转换
// 记录的当前状态是 1
// 此时来了一个从 2 -> 3 的状态转换消息,这个就是乱序消息
// 如果是乱序消息,将当前消息重新入队或者是发送到其他队列,并回复 ack
if(isOutOfOrderMessage(record)) {
kafkaTemplate.send(record);
ack.acknowledge();
return;
}
// 如果是重复消息,直接回复 ACK
ack.acknowledge();
return;
}
// 第一次消费,执行相关业务逻辑
try {
orderService.handleCreated(record.value());
ack.acknowledge();
} catch (Exception e) {
throw e;
}
// ....
}
如何在消费端实现 Exactly Once 语义
消费端需要处理的问题:
- 重复消费
- 乱序消费
有状态的消息和无状态的消息
- 有状态的消息: 例如订单,有明确的
CREATED -> PAID -> SHIPPED -> FINISHED状态转换 - 无状态的消息: 例如点赞,每一次点赞都是一种全新的状态
实现方式
有状态
有状态的消息实现步骤:
- Redis 去重,判断是否处理过(可选)
- 乱序消息直接 ACK 并重新入队
- 执行业务幂等 SQL
- 将消息已处理过的信息写入 Redis 并加上恰当的 TTL
无状态
无状态的消息实现步骤:
- 单独增加一张数据库表,例如: user_assets_message,用于持久化记录消息是否被处理
- Redis 去重,判断是否处理过(可选)
乱序消息直接 ACK 并重新入队(有状态的消息才会出现乱序的现象)- 查询数据库,当前消息是否已经被处理过(最后一步的 Redis 写入失败)
- 执行业务 SQL 以及消息是否被处理的 SQL,这两个 SQL 必须处于同一个事务中
- 将消息已经处理过的信息写入 Redis 并加上恰当的 TTL(可选)
思考
重复消费,乱序消费本身是一个小概率事件,是否有必要还加上 Redis 来去重? 大部分的业务都是不需要加上 Redis 来去重的,业务中出现如下的情况时,加上 Redis 去重才有意义:
- 重复执行的代价太高
如何设置重试以及死信队列?
Kafka 自身是不支持死信队列,但是 spring-kafka 提供了消费重试以及重试失败后将消息发送到指定队列的能力。
- 关闭自动提交 + 手动 ack
spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
listener:
ack-mode: manual_immediate
- 配置错误处理器: 重试 N 次,最终进入 DLT(Dead Letter Topic)
@Configuration
public class KafkaDltConfig {
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
// 把失败消息发布到 DLT 的“发布器”
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));
// 重试策略:例如重试 3 次,每次间隔 1s(固定退避)
FixedBackOff backOff = new FixedBackOff(1000L, 3L);
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);
// 不可重试异常:直接进 DLT(避免白重试)
handler.addNotRetryableExceptions(
IllegalArgumentException.class,
ValidationException.class
);
// 可选:记录日志(默认也会打)
handler.setRetryListeners((record, ex, deliveryAttempt) -> {
// deliveryAttempt: 第几次投递(1 开始)
});
return handler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory,
DefaultErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
// 你希望手动 ack 的话,确保这里是 MANUAL 或 MANUAL_IMMEDIATE(也可放 YAML)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
关键点: @KafkaListener 注解修饰的方法不要捕获异常,从而交给错误处理器去处理,如果 @KafkaListener 注解修饰的方法捕获了异常,导致调用方无法感知到异常,错误处理器就无法起作用。