From bac19e573077dc06217eb78d7f64f65bca0f856c Mon Sep 17 00:00:00 2001 From: Schiewatir Date: Fri, 15 May 2026 23:02:54 +0000 Subject: [PATCH 1/2] Kafka Connect: recover from metadata commit stall after cluster recreation After a Kafka cluster is recreated the control topic resets to offset 0, but the Iceberg snapshot still stores committed offsets from the old cluster (e.g. {0:100}). Every DataWritten event on the new cluster carries a low offset (< 100) and the deduplication filter in commitToTable() silently discards all of them, causing the coordinator to log "nothing to commit" indefinitely while parquet files accumulate without a snapshot. Detect this scenario by comparing the coordinator's observed control topic offsets against the stored committed offsets. When the current offsets are lower, the control topic was likely reset; skip deduplication and store the new (lower) baseline so subsequent commits behave correctly. Fixes #15293 --- .../iceberg/connect/channel/Coordinator.java | 34 +++++++++++++++- .../connect/channel/TestCoordinator.java | 39 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index c986f8afc2eb..16619c4c2245 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -229,8 +229,37 @@ private void commitToTable( // records for other partitions. Merge the updated topic partitions with the last committed // offsets. Map committedOffsets = lastCommittedOffsetsForTable(table, branch); + + // Detect if the control topic was reset (e.g., after Kafka cluster recreation). + // If the current coordinator's observed control topic offsets are lower than the + // previously committed offsets stored in the snapshot, the control topic has likely + // been reset and the stored offsets are stale. In this case, skip deduplication + // to avoid silently dropping all data files and blocking metadata commits. + boolean controlTopicReset = + !committedOffsets.isEmpty() + && committedOffsets.entrySet().stream() + .anyMatch( + e -> { + Long current = controlTopicOffsets.get(e.getKey()); + return current != null && current < e.getValue(); + }); + + if (controlTopicReset) { + LOG.warn( + "Coordinator {}: detected possible Kafka cluster recreation for table {}. " + + "Control topic offsets {} are lower than previously committed offsets {}. " + + "Skipping offset deduplication and resetting stored offset baseline.", + taskId, + tableIdentifier, + controlTopicOffsets, + committedOffsets); + } + + // When a reset is detected, base the stored offsets only on the current control topic + // offsets so subsequent commits use the correct (new-cluster) baseline. + Map baseOffsets = controlTopicReset ? Map.of() : committedOffsets; Map mergedOffsets = - Stream.of(committedOffsets, controlTopicOffsets) + Stream.of(baseOffsets, controlTopicOffsets) .flatMap(map -> map.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max)); String offsetsJson = offsetsToJson(mergedOffsets); @@ -239,6 +268,9 @@ private void commitToTable( envelopeList.stream() .filter( envelope -> { + if (controlTopicReset) { + return true; + } Long minOffset = committedOffsets.get(envelope.partition()); return minOffset == null || envelope.offset() >= minOffset; }) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index ed370fcdad35..a36388259a39 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -298,4 +298,43 @@ public void testCoordinatorCommittedOffsetValidation() { assertThat(table.snapshots()).hasSize(2); assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); } + + @Test + public void testCoordinatorCommittedOffsetResetAfterClusterRecreation() { + // Simulate a Kafka cluster recreation scenario. The snapshot stores a high control + // topic offset from the old cluster ({0:100}). After recreation the new cluster's + // control topic starts at offset 0, so all incoming DataWritten events carry offsets + // below 100. Without the fix, every event would be filtered as "already committed" + // and the connector would log "nothing to commit" indefinitely. + + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":100}") + .commit(); + + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + assertThat(table.currentSnapshot().summary()) + .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":100}"); + + // coordinatorTest adds DataWritten at consumer offset 1 and DataComplete at offset 2, + // making controlTopicOffsets = {0:3}. Since 3 < 100 the reset is detected and + // deduplication is skipped, so the data file is committed. + OffsetDateTime ts = EventTestUtil.now(); + UUID commitId = + coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), ts); + + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + + // The snapshot offset baseline must be reset to the current (new-cluster) offsets, + // not the stale old-cluster value of 100. + assertThat(table.currentSnapshot().summary()) + .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3}"); + + assertThat(producer.history()).hasSize(3); + assertCommitTable(1, commitId, ts); + assertCommitComplete(2, commitId, ts); + } } From 6752aa406a127d8070945274f6fbd843f6af68da Mon Sep 17 00:00:00 2001 From: Schiewatir Date: Fri, 15 May 2026 23:20:24 +0000 Subject: [PATCH 2/2] Kafka Connect: fix spotless formatting in TestCoordinator --- .../org/apache/iceberg/connect/channel/TestCoordinator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index a36388259a39..55f44d75ad82 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -330,8 +330,7 @@ public void testCoordinatorCommittedOffsetResetAfterClusterRecreation() { // The snapshot offset baseline must be reset to the current (new-cluster) offsets, // not the stale old-cluster value of 100. - assertThat(table.currentSnapshot().summary()) - .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3}"); + assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3}"); assertThat(producer.history()).hasSize(3); assertCommitTable(1, commitId, ts);