From f5d2acc0d4bda230176ea342531c1fad07899353 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Tue, 10 Mar 2026 13:32:54 -0700 Subject: [PATCH 1/2] Core: make sure snapshot timestamp is monotonically increasing for V4 tables. Made-with: Cursor Model: claude-4.6-opus-high-thinking Co-authored-by: Cursor --- .../org/apache/iceberg/SnapshotProducer.java | 29 +++++++++++- .../org/apache/iceberg/TableMetadata.java | 1 + .../apache/iceberg/TestSnapshotProducer.java | 47 +++++++++++++++++++ 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 6ba10e8049f6..467e9eeeffa7 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -35,6 +35,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; import java.math.RoundingMode; +import java.time.Clock; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -122,6 +123,7 @@ public void accept(String file) { private SnapshotAncestryValidator snapshotAncestryValidator = SnapshotAncestryValidator.NON_VALIDATING; + private Clock clock = Clock.systemUTC(); private ExecutorService workerPool; private String targetBranch = SnapshotRef.MAIN_BRANCH; private CommitMetrics commitMetrics; @@ -337,7 +339,7 @@ public Snapshot apply() { sequenceNumber, snapshotId(), parentSnapshotId, - System.currentTimeMillis(), + snapshotTimestampMillis(parentSnapshot), operation(), summary(base), base.currentSchemaId(), @@ -662,6 +664,31 @@ protected ManifestReader newDeleteManifestReader(ManifestFile manife return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById()); } + @VisibleForTesting + void setClock(Clock newClock) { + this.clock = newClock; + } + + /** + * Generates the snapshot timestamp in milliseconds. + * + *

For format version 4 and above, this implements the Lamport clock algorithm to guarantee + * monotonically increasing snapshot timestamps. For older format versions, this returns the + * current wall clock time. + * + * @param parentSnapshot the parent snapshot on the target branch, or null if there is no parent + * @return the snapshot timestamp in milliseconds + */ + private long snapshotTimestampMillis(Snapshot parentSnapshot) { + long now = clock.millis(); + if (base.formatVersion() >= TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS + && parentSnapshot != null) { + return Math.max(now, parentSnapshot.timestampMillis() + 1); + } + + return now; + } + protected long snapshotId() { if (snapshotId == null) { synchronized (this) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 43a67dd2bef2..0b29cae3ffd1 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -57,6 +57,7 @@ public class TableMetadata implements Serializable { static final int DEFAULT_TABLE_FORMAT_VERSION = 2; static final int SUPPORTED_TABLE_FORMAT_VERSION = 4; static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; + static final int MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS = 4; static final int INITIAL_SPEC_ID = 0; static final int INITIAL_SORT_ORDER_ID = 1; static final int INITIAL_SCHEMA_ID = 0; diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java index dd97738759f4..89d77ae19e73 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java @@ -22,10 +22,14 @@ import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -243,4 +247,47 @@ public void testManifestCompressionFromTableProperty() throws IOException { ManifestFile manifest = table.currentSnapshot().dataManifests(table.io()).get(0); assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("snappy"); } + + @TestTemplate + public void testSnapshotTimestampsAreMonotonicallyIncreasing() { + assumeThat(formatVersion) + .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS); + + table.newFastAppend().appendFile(FILE_A).commit(); + Snapshot first = table.currentSnapshot(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Snapshot second = table.currentSnapshot(); + assertThat(second.timestampMillis()) + .as("V4 snapshot timestamps must be strictly increasing") + .isGreaterThan(first.timestampMillis()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Snapshot third = table.currentSnapshot(); + assertThat(third.timestampMillis()) + .as("V4 snapshot timestamps must be strictly increasing") + .isGreaterThan(second.timestampMillis()); + } + + @TestTemplate + public void testV4LamportClockFastForwardsDriftedClock() { + assumeThat(formatVersion) + .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS); + + table.newFastAppend().appendFile(FILE_A).commit(); + long firstTs = table.currentSnapshot().timestampMillis(); + + // simulate clock drift: wall clock reports a time far in the past + long driftedTime = firstTs - 10_000; + Clock driftedClock = Clock.fixed(Instant.ofEpochMilli(driftedTime), ZoneOffset.UTC); + AppendFiles append = table.newFastAppend().appendFile(FILE_B); + ((SnapshotProducer) append).setClock(driftedClock); + append.commit(); + + long secondTs = table.currentSnapshot().timestampMillis(); + assertThat(secondTs) + .as( + "Lamport clock should fast-forward past the drifted wall clock to the last snapshot timestamp + 1 ms") + .isEqualTo(firstTs + 1); + } } From 8c58bd2851585ddffbc57808bae185fe0d4f1639 Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Mon, 11 May 2026 22:39:07 -0700 Subject: [PATCH 2/2] Core: test that V4 snapshot timestamp monotonicity is scoped to the target branch Pins that SnapshotProducer.snapshotTimestampMillis uses the parent on the target branch (via SnapshotUtil.latestSnapshot(base, targetBranch)) rather than the table's globally newest snapshot. The test commits to a separate branch with a clock pinned far in the future, then commits to main with a clock pinned just slightly past main's head. Main's new timestamp must equal the pinned wall-clock value (no fast-forward) and remain well below the sibling branch's head timestamp. Without this test, a regression that read base.currentSnapshot() (or any max-over-branches) for the parent would silently pass the existing single-branch monotonicity tests. Made-with: Cursor Model: claude-4.7-opus --- .../apache/iceberg/TestSnapshotProducer.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java index 89d77ae19e73..f4b1b223b2b3 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java @@ -290,4 +290,46 @@ public void testV4LamportClockFastForwardsDriftedClock() { "Lamport clock should fast-forward past the drifted wall clock to the last snapshot timestamp + 1 ms") .isEqualTo(firstTs + 1); } + + @TestTemplate + public void testV4MonotonicityIsScopedToTargetBranch() { + assumeThat(formatVersion) + .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS); + + // Establish a base snapshot on main. + table.newFastAppend().appendFile(FILE_A).commit(); + long mainParentTs = table.currentSnapshot().timestampMillis(); + + // Create a branch pointing at the current main head. + String branchName = "test-branch"; + table.manageSnapshots().createBranch(branchName).commit(); + + // Commit to the branch with a clock far in the future so the branch head's + // timestamp-ms is much higher than main's head timestamp-ms. + long branchFutureTs = mainParentTs + 1_000_000L; + Clock branchClock = Clock.fixed(Instant.ofEpochMilli(branchFutureTs), ZoneOffset.UTC); + AppendFiles branchAppend = table.newFastAppend().appendFile(FILE_B).toBranch(branchName); + ((SnapshotProducer) branchAppend).setClock(branchClock); + branchAppend.commit(); + long branchTs = table.snapshot(branchName).timestampMillis(); + assertThat(branchTs) + .as("Sanity: branch commit should adopt the simulated far-future wall-clock value") + .isEqualTo(branchFutureTs); + + // Commit to main with a clock just slightly after mainParentTs but far below branchTs. + // The spec requires monotonicity "on the same branch", so main's new timestamp must be + // constrained only by main's previous head, not by the unrelated future timestamp on + // the other branch. + long mainNewTs = mainParentTs + 5; + Clock mainClock = Clock.fixed(Instant.ofEpochMilli(mainNewTs), ZoneOffset.UTC); + AppendFiles mainAppend = table.newFastAppend().appendFile(FILE_C); + ((SnapshotProducer) mainAppend).setClock(mainClock); + mainAppend.commit(); + + long actualMainTs = table.currentSnapshot().timestampMillis(); + assertThat(actualMainTs) + .as("Main's monotonicity must be relative to main's parent, not the branch head") + .isEqualTo(mainNewTs) + .isLessThan(branchTs); + } }