diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index c986f8afc2eb..16619c4c2245 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -229,8 +229,37 @@ private void commitToTable( // records for other partitions. Merge the updated topic partitions with the last committed // offsets. Map committedOffsets = lastCommittedOffsetsForTable(table, branch); + + // Detect if the control topic was reset (e.g., after Kafka cluster recreation). + // If the current coordinator's observed control topic offsets are lower than the + // previously committed offsets stored in the snapshot, the control topic has likely + // been reset and the stored offsets are stale. In this case, skip deduplication + // to avoid silently dropping all data files and blocking metadata commits. + boolean controlTopicReset = + !committedOffsets.isEmpty() + && committedOffsets.entrySet().stream() + .anyMatch( + e -> { + Long current = controlTopicOffsets.get(e.getKey()); + return current != null && current < e.getValue(); + }); + + if (controlTopicReset) { + LOG.warn( + "Coordinator {}: detected possible Kafka cluster recreation for table {}. " + + "Control topic offsets {} are lower than previously committed offsets {}. " + + "Skipping offset deduplication and resetting stored offset baseline.", + taskId, + tableIdentifier, + controlTopicOffsets, + committedOffsets); + } + + // When a reset is detected, base the stored offsets only on the current control topic + // offsets so subsequent commits use the correct (new-cluster) baseline. + Map baseOffsets = controlTopicReset ? Map.of() : committedOffsets; Map mergedOffsets = - Stream.of(committedOffsets, controlTopicOffsets) + Stream.of(baseOffsets, controlTopicOffsets) .flatMap(map -> map.entrySet().stream()) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max)); String offsetsJson = offsetsToJson(mergedOffsets); @@ -239,6 +268,9 @@ private void commitToTable( envelopeList.stream() .filter( envelope -> { + if (controlTopicReset) { + return true; + } Long minOffset = committedOffsets.get(envelope.partition()); return minOffset == null || envelope.offset() >= minOffset; }) diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index ed370fcdad35..55f44d75ad82 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -298,4 +298,42 @@ public void testCoordinatorCommittedOffsetValidation() { assertThat(table.snapshots()).hasSize(2); assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); } + + @Test + public void testCoordinatorCommittedOffsetResetAfterClusterRecreation() { + // Simulate a Kafka cluster recreation scenario. The snapshot stores a high control + // topic offset from the old cluster ({0:100}). After recreation the new cluster's + // control topic starts at offset 0, so all incoming DataWritten events carry offsets + // below 100. Without the fix, every event would be filtered as "already committed" + // and the connector would log "nothing to commit" indefinitely. + + table + .newAppend() + .appendFile(EventTestUtil.createDataFile()) + .set(OFFSETS_SNAPSHOT_PROP, "{\"0\":100}") + .commit(); + + table.refresh(); + assertThat(table.snapshots()).hasSize(1); + assertThat(table.currentSnapshot().summary()) + .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":100}"); + + // coordinatorTest adds DataWritten at consumer offset 1 and DataComplete at offset 2, + // making controlTopicOffsets = {0:3}. Since 3 < 100 the reset is detected and + // deduplication is skipped, so the data file is committed. + OffsetDateTime ts = EventTestUtil.now(); + UUID commitId = + coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), ts); + + table.refresh(); + assertThat(table.snapshots()).hasSize(2); + + // The snapshot offset baseline must be reset to the current (new-cluster) offsets, + // not the stale old-cluster value of 100. + assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3}"); + + assertThat(producer.history()).hasSize(3); + assertCommitTable(1, commitId, ts); + assertCommitComplete(2, commitId, ts); + } }