在Java中如何正确的使用Kafka

前言

在系统中引入消息中间件不可避免的需要处理如下的几个问题:

  1. 如何保证消息不丢失?
  2. 如果解决消息重复消费的问题?
  3. 如何保证消息的顺序消费?
  4. 消息消费失败了怎么办??

文章就上述几个问题,通过 Java + Kafka 来解释

Kafka 的三种消息投递语义

语义 含义 说明 实现方式
At most once 最多一次 有丢失消息的风险 生产者投递失败后不重试
At least once 至少一次 会导致消息重复 - 生产者投递失败后进行重试
- 消费者手动提交 ACK
Exactly once 只有一次 消息不丢失,消息不重复 Kafka 事务

kafka 处于哪种消息语义取决于生产者配置的 acks 参数,不同 acks 配置的含义:

  • acks=0: 生产者发送消息后,完全不等待 Broker 的确认,立即发送下一条消息。吞吐量高,但消息可能在 Broker 接收到之前丢失或者 Broker 只写入了 Leader Partition,将消息写入 Follower Partition 的时候 Leader Partition 所在的 Broker 挂掉,导致消息丢失。
  • acks=1: 消息只要写入 Leader Partition 就立即发送 ACK。如果 Broker 在将消息写入 Follower Partition 之前挂掉了,会导致消息丢失。
  • acks=all 或者 acks=-1: 消息成功写入到 Leader Partition 以及所有的 Follower Partition 之后,Broker 才会给 Producer 发送 ACK。这种情况下不会导致消息的丢失。

根据上述 acks 参数的配置:

  • acks=0 表示 At most once 语义
  • acks=1acks=all 表示的是 At least once 语义

spring-kafka 包中,acks 的默认值是 acks=1

如何保证消息不丢失?

消息的流转过程: Producer -> Broker -> Consumer

上述环节中的任意一个环节都有可能造成消息的丢失:

  • 在 Producer 和 Broker 之间,为了防止消息丢失,需要引入 ACK 确认机制
  • 在 Broker 和 Consumer 之间,为了防止消息丢失,需要关闭自动提交 offet 的功能,改成手动提交 offset
  • Broker 配置不当也可能导致消息的丢失,需要合理的配置: replication.factormin.insync.replicas 参数。
    • replication.factor 参数表示每个 partition 有多少副本
    • min.insync.replicas 参数表示集群中每个 partition 至少需要多少副本与 Leader 副本保持同步,集群才可以正常运行。如果处于同步状态的副本数低于 min.insync.replicas 参数,将消息发送到 Broker 时,会被拒绝写入。

看下在 Spring Boot 项目中如何配置 application.yaml 文件:

spring:
  kafka:
    consumer:
      enable-auto-commit: false # 关闭自动提交 Offset
      auto-offset-reset: earliest # 找不到 partition 的历史消费 offset 时,将 offset 重置到 partition 最早的位置开始消费
    listener:
      ack-mode: manual_immediate # 消费完毕后立即提交 ack
    producer:
      acks: all # Leader 和 Follower Partition 都成功写入后 Broker 再发送 ack
      properties:
        enable.idempotence: true # 开启幂等性,确保每个消息只会被存储一次。通过 producer id + sequence id 保证
        max.in.flight.requests.per.connection: 5 # 最多允许多少条消息等待 broker 的 ACK。设置成 1 表示消息顺序发送。
        retries: 2147483647 # 重试次数
        request.timeout.ms: 30000 # 发送请求后等待 Broker 响应的超时时间
        delivery.timeout.ms: 120000 # 消息从准备到最终被 Broker 接收并返回 ACK 确认的超时时间,其中包括了 request.timeout.ms
        linger.ms: 5 # 每个 batch(批次) 收集消息的时间,单位 ms。设置成 0 表示有消息立即发送。大于 0 表示每隔 n 毫秒发送一次消息。

如何解决消息重复消费?

什么情况下会导致重复消费?

  1. Producer 将同一条消息多次发送给 Broker,导致 Broker 中存在多条相同的消息
  2. Broker 将同一条消息多次发送给 Consumer

生产端和消费端都有可能导致消息重复消费,接下来看看如何解决

生产端

生产端导致重复消费的问题,可以让 Producer 开启幂等模式。开启幂等模式后,Producer 发送的消息会携带 Producer Id(PID) 以及自增的序列号,Broker 会根据 PID 以及序列号判断消息在 Broker 中是否存在,如果存在直接返回 ACK,如果不存在,则将消息成功写入 Broker 后,再发送 ACK。

spring:
  kafka:
    producer:
      acks: all # Leader 和 Follower Partition 都成功写入后 Broker 再发送 ack
      properties:
        enable.idempotence: true # 开启幂等性,确保每个消息只会被存储一次。通过 producer id + sequence id 保证
        max.in.flight.requests.per.connection: 5 # 最多允许多少条消息等待 broker 的 ACK。设置成 1 表示消息顺序发送。
        retries: 2147483647 # 重试次数
        request.timeout.ms: 30000 # 发送请求后等待 Broker 响应的超时时间
        delivery.timeout.ms: 120000 # 消息从准备到最终被 Broker 接收并返回 ACK 确认的超时时间,其中包括了 request.timeout.ms
        linger.ms: 5 # 每个 batch(批次) 收集消息的时间,单位 ms。设置成 0 表示有消息立即发送。大于 0 表示每隔 n 毫秒发送一次消息。

这种方式有个问题: 幂等只在 Producer 和 Broker 的同一次会话中有效。Producer ID 是每次建立会话时 Broker 随机分配的,Broker 不会验证消息的内容,只会根据 Producer ID + 序列号 判断消息是否重复。而 Producer 重启后,会被重新分配 PID,导致幂等只在 Producer 与 Broker 的同一次会话中有效,新建立的会话会重新开始计算。

消费端

Broker 多次投递同一条消息给 Consumer,Consumer 在消费时,需要幂等的消费消息,才能保证系统状态的正确性。

幂等消费常用的方案:

  1. 数据库唯一键约束去重
  2. Redis + TTL 去重
  3. 状态机幂等

数据库唯一键约束

设计思路:

  1. 设计一张消费表,每个消息中携带一个全局唯一的 ID,

  2. 消费时先 insert 记录

  3. 如果插入成功,表示第一次消费,接着执行业务逻辑

  4. 如果插入失败,表示已经消费过,直接返回 ACK 即可。

适用场景: 消息会落库/业务的最终状态在数据库中

@KafkaListener(topics = "order.created", groupId = "order-group")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String eventId = extractEventId(record); // 从 header/payload 提取

    // 1) 先做“去重标记”
    boolean first = consumedRepo.tryMarkConsumed(
        "order-group",
        record.topic(),
        record.partition(),
        record.offset(),
        eventId
    );

    if (!first) {
        // 重复消息:直接 ack,推进 offset(否则会无限重复拉到)
        ack.acknowledge();
        return;
    }

    try {
        // 2) 执行业务
        orderService.handleCreated(record.value());

        // 3) 业务成功后提交 offset
        ack.acknowledge();
    } catch (Exception e) {
        // 业务失败:不 ack,让 Spring Kafka 的重试/DLT 接管
        throw e;
    }
}

Redis + TTL

设计思路:

  1. 使用 SETNX 指令,将消息唯一 ID 作为 Key, Key 对应的 value 随意,建议设置成数字,节省内存空间

  2. 设置 TTL

  3. 如果 SETNX 指令返回 true,说明是第一次消费,继续执行业务逻辑

  4. 如果 SETNX 指令返回 false, 说明已经消费过了,直接返回 ACK

适用场景:

  • 业务结果不一定落 DB
  • 需要极高吞吐
  • 能接受 TTL 窗口内去重(通常足够)
@Service
public class RedisIdempotencyService {
    private final StringRedisTemplate redis;

    public RedisIdempotencyService(StringRedisTemplate redis) {
        this.redis = redis;
    }

    /** @return true=首次消费,false=重复 */
    public boolean tryAcquire(String key, Duration ttl) {
        Boolean ok = redis.opsForValue().setIfAbsent(key, "1", ttl);
        return Boolean.TRUE.equals(ok);
    }

    /** 可选:失败时释放(谨慎使用) */
    public void release(String key) {
        redis.delete(key);
    }
}

@KafkaListener(topics = "order.created", groupId = "order-group")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String eventId = extractEventId(record);
    String idemKey = "kafka:idem:order-group:" + eventId;

    boolean first = idem.tryAcquire(idemKey, Duration.ofDays(3));
    if (!first) {
        ack.acknowledge();
        return;
    }

    try {
        orderService.handleCreated(record.value());
        ack.acknowledge();
    } catch (Exception e) {
        // 这里是否 release?——通常不建议直接 release
        // 因为如果业务已部分成功但你没感知到,release 会导致重复执行更危险
        throw e;
    }
}

注意:

  • 不要轻易的释放 redis 中用于判断消息是否被处理过的 key,否则可能会出现重复消费问题
  • key 的 TTL 一定要合理设置。

状态机幂等

设计思路: 状态之间的转换关系是确定的,例如: CREATED -> PAID -> SHIPPED -> FINISHED,重复消费消息只会尝试做同样的状态转换,不会造成副作用。

适用场景: 订单、支付、物流、账户等状态流转型业务

状态转换的逻辑: 当前状态 + 输入事件 => 新状态

-- 只有当状态还是 CREATED 才能推进到 PAID
UPDATE orders
SET status = 'PAID', paid_time = NOW()
WHERE order_id = ? AND status = 'CREATED';

Java 代码

@KafkaListener(topics = "order.created", groupId = "order-group")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {

    int updated = jdbcTemplate.update(sql, orderId);
    if(updated == 0) {
	    // 可能已经处理过,也可能是乱序消息
	    
	    // 假设有 1 -> 2 -> 3 -> 4 这 4 种状态转换
	    // 记录的当前状态是 3
	    // 此时来了一个从 1 -> 2 的状态转换的消息
	    // 由于当前状态是 3,因此消息是处理过了,直接丢弃
	    // 处理过的消息也可以用 redis 快速去重
	    if(isPrevStateMessage(record)) {
		    ack.acknowledge();
		    return;
	    }
	    
	    // 假设有 1 -> 2 -> 3 -> 4 这 4 种状态转换
	    // 记录的当前状态是 1
	    // 此时来了一个从 2 -> 3 的状态转换消息,这个就是乱序消息
	    // 如果是乱序消息,将当前消息重新入队或者是发送到其他队列,并回复 ack
	    if(isOutOfOrderMessage(record)) {
		    kafkaTemplate.send(record);
		    ack.acknowledge();
		    return;
	    }
	    
	    // 如果是重复消息,直接回复 ACK
	    ack.acknowledge();
      return;
    }

		// 第一次消费,执行相关业务逻辑
    try {
        orderService.handleCreated(record.value());
        ack.acknowledge();
    } catch (Exception e) {
        throw e;
    }
	
	// ....
}

如何在消费端实现 Exactly Once 语义

消费端需要处理的问题:

  1. 重复消费
  2. 乱序消费

有状态的消息和无状态的消息

  • 有状态的消息: 例如订单,有明确的 CREATED -> PAID -> SHIPPED -> FINISHED 状态转换
  • 无状态的消息: 例如点赞,每一次点赞都是一种全新的状态

实现方式

有状态

有状态的消息实现步骤:

  1. Redis 去重,判断是否处理过(可选)
  2. 乱序消息直接 ACK 并重新入队
  3. 执行业务幂等 SQL
  4. 将消息已处理过的信息写入 Redis 并加上恰当的 TTL

无状态

无状态的消息实现步骤:

  1. 单独增加一张数据库表,例如: user_assets_message,用于持久化记录消息是否被处理
  2. Redis 去重,判断是否处理过(可选)
  3. 乱序消息直接 ACK 并重新入队(有状态的消息才会出现乱序的现象)
  4. 查询数据库,当前消息是否已经被处理过(最后一步的 Redis 写入失败)
  5. 执行业务 SQL 以及消息是否被处理的 SQL,这两个 SQL 必须处于同一个事务中
  6. 将消息已经处理过的信息写入 Redis 并加上恰当的 TTL(可选)

思考

重复消费,乱序消费本身是一个小概率事件,是否有必要还加上 Redis 来去重? 大部分的业务都是不需要加上 Redis 来去重的,业务中出现如下的情况时,加上 Redis 去重才有意义:

  • 重复执行的代价太高

如何设置重试以及死信队列?

Kafka 自身是不支持死信队列,但是 spring-kafka 提供了消费重试以及重试失败后将消息发送到指定队列的能力。

  1. 关闭自动提交 + 手动 ack
spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
    listener:
      ack-mode: manual_immediate
  1. 配置错误处理器: 重试 N 次,最终进入 DLT(Dead Letter Topic)
@Configuration
public class KafkaDltConfig {

    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {

        // 把失败消息发布到 DLT 的“发布器”
        DeadLetterPublishingRecoverer recoverer =
                new DeadLetterPublishingRecoverer(template,
                        (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));

        // 重试策略:例如重试 3 次,每次间隔 1s(固定退避)
        FixedBackOff backOff = new FixedBackOff(1000L, 3L);

        DefaultErrorHandler handler = new DefaultErrorHandler(recoverer, backOff);

        // 不可重试异常:直接进 DLT(避免白重试)
        handler.addNotRetryableExceptions(
                IllegalArgumentException.class,
                ValidationException.class
        );

        // 可选:记录日志(默认也会打)
        handler.setRetryListeners((record, ex, deliveryAttempt) -> {
            // deliveryAttempt: 第几次投递(1 开始)
        });

        return handler;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConsumerFactory<Object, Object> consumerFactory,
            DefaultErrorHandler errorHandler) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setCommonErrorHandler(errorHandler);

        // 你希望手动 ack 的话,确保这里是 MANUAL 或 MANUAL_IMMEDIATE(也可放 YAML)
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }
}

关键点: @KafkaListener 注解修饰的方法不要捕获异常,从而交给错误处理器去处理,如果 @KafkaListener 注解修饰的方法捕获了异常,导致调用方无法感知到异常,错误处理器就无法起作用。

使用 Hugo 构建
主题 StackJimmy 设计