在消费 Kafka 消息时,手动确认(acknowledge)消息的消费,可以通过使用 KafkaConsumer
类中的 commitSync()
或 commitAsync()
方法来实现。这些方法将提交当前偏移量,确保在消费者崩溃时不会重新消费已处理的消息。
以下是一个简单的手动 ack
的示例代码:
1. 配置 KafkaConsumer
和手动确认消费
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaManualAckConsumer {
public static void main(String[] args) {
// 配置消费者的基本属性
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 服务器地址
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组ID
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息key反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息value反序列化
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交,启用手动提交
// 创建 KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
// 拉取消息
var records = consumer.poll(1000); // 拉取数据,等待最多1000ms
// 处理每一条消息
records.forEach(record -> {
System.out.println("Consumed message: " + record.value());
// 处理完消息后手动提交偏移量
// commitSync: 确保消息成功提交
consumer.commitSync();
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者
consumer.close();
}
}
}
2. 代码解析
- 配置消费者:
ENABLE_AUTO_COMMIT_CONFIG
设置为false
,禁用自动提交偏移量。这样就可以在处理完每条消息后手动提交。
- 消息消费与手动 ack:
poll(1000)
方法拉取最多 1000 毫秒内的消息。commitSync()
方法用于同步提交当前的偏移量,即消费到的消息的位移,这样可以确保 Kafka 消费者确认该消息已处理。
- 异常处理:
- 异常捕获块
catch
用于处理消费过程中可能出现的任何错误,确保程序不会崩溃。
- 异常捕获块
- 关闭消费者:
- 在
finally
块中调用consumer.close()
来关闭消费者连接。
- 在
3. 使用 commitAsync
提高性能(可选)
如果对性能要求更高,可以考虑使用 commitAsync()
方法,它不会阻塞当前线程,提交操作将在后台异步完成:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.out.println("Error committing offset: " + exception.getMessage());
} else {
System.out.println("Successfully committed offsets: " + offsets);
}
});
这样你可以不阻塞线程,提高消费性能,尤其是在高吞吐量的环境中。
如果你有其他的需求或者想更细致地控制消费的细节,随时告诉我!