Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,37 @@ private void commitToTable(
// records for other partitions. Merge the updated topic partitions with the last committed
// offsets.
Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table, branch);

// Detect if the control topic was reset (e.g., after Kafka cluster recreation).
// If the current coordinator's observed control topic offsets are lower than the
// previously committed offsets stored in the snapshot, the control topic has likely
// been reset and the stored offsets are stale. In this case, skip deduplication
// to avoid silently dropping all data files and blocking metadata commits.
boolean controlTopicReset =
!committedOffsets.isEmpty()
&& committedOffsets.entrySet().stream()
.anyMatch(
e -> {
Long current = controlTopicOffsets.get(e.getKey());
return current != null && current < e.getValue();
});

if (controlTopicReset) {
LOG.warn(
"Coordinator {}: detected possible Kafka cluster recreation for table {}. "
+ "Control topic offsets {} are lower than previously committed offsets {}. "
+ "Skipping offset deduplication and resetting stored offset baseline.",
taskId,
tableIdentifier,
controlTopicOffsets,
committedOffsets);
}

// When a reset is detected, base the stored offsets only on the current control topic
// offsets so subsequent commits use the correct (new-cluster) baseline.
Map<Integer, Long> baseOffsets = controlTopicReset ? Map.of() : committedOffsets;
Map<Integer, Long> mergedOffsets =
Stream.of(committedOffsets, controlTopicOffsets)
Stream.of(baseOffsets, controlTopicOffsets)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max));
String offsetsJson = offsetsToJson(mergedOffsets);
Expand All @@ -239,6 +268,9 @@ private void commitToTable(
envelopeList.stream()
.filter(
envelope -> {
if (controlTopicReset) {
return true;
}
Long minOffset = committedOffsets.get(envelope.partition());
return minOffset == null || envelope.offset() >= minOffset;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,42 @@ public void testCoordinatorCommittedOffsetValidation() {
assertThat(table.snapshots()).hasSize(2);
assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");
}

@Test
public void testCoordinatorCommittedOffsetResetAfterClusterRecreation() {
// Simulate a Kafka cluster recreation scenario. The snapshot stores a high control
// topic offset from the old cluster ({0:100}). After recreation the new cluster's
// control topic starts at offset 0, so all incoming DataWritten events carry offsets
// below 100. Without the fix, every event would be filtered as "already committed"
// and the connector would log "nothing to commit" indefinitely.

table
.newAppend()
.appendFile(EventTestUtil.createDataFile())
.set(OFFSETS_SNAPSHOT_PROP, "{\"0\":100}")
.commit();

table.refresh();
assertThat(table.snapshots()).hasSize(1);
assertThat(table.currentSnapshot().summary())
.containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":100}");

// coordinatorTest adds DataWritten at consumer offset 1 and DataComplete at offset 2,
// making controlTopicOffsets = {0:3}. Since 3 < 100 the reset is detected and
// deduplication is skipped, so the data file is committed.
OffsetDateTime ts = EventTestUtil.now();
UUID commitId =
coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), ts);

table.refresh();
assertThat(table.snapshots()).hasSize(2);

// The snapshot offset baseline must be reset to the current (new-cluster) offsets,
// not the stale old-cluster value of 100.
assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3}");

assertThat(producer.history()).hasSize(3);
assertCommitTable(1, commitId, ts);
assertCommitComplete(2, commitId, ts);
}
}