diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 6ba10e8049f6..467e9eeeffa7 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -35,6 +35,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; import java.math.RoundingMode; +import java.time.Clock; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -122,6 +123,7 @@ public void accept(String file) { private SnapshotAncestryValidator snapshotAncestryValidator = SnapshotAncestryValidator.NON_VALIDATING; + private Clock clock = Clock.systemUTC(); private ExecutorService workerPool; private String targetBranch = SnapshotRef.MAIN_BRANCH; private CommitMetrics commitMetrics; @@ -337,7 +339,7 @@ public Snapshot apply() { sequenceNumber, snapshotId(), parentSnapshotId, - System.currentTimeMillis(), + snapshotTimestampMillis(parentSnapshot), operation(), summary(base), base.currentSchemaId(), @@ -662,6 +664,31 @@ protected ManifestReader newDeleteManifestReader(ManifestFile manife return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById()); } + @VisibleForTesting + void setClock(Clock newClock) { + this.clock = newClock; + } + + /** + * Generates the snapshot timestamp in milliseconds. + * + *

For format version 4 and above, this implements the Lamport clock algorithm to guarantee + * monotonically increasing snapshot timestamps. For older format versions, this returns the + * current wall clock time. + * + * @param parentSnapshot the parent snapshot on the target branch, or null if there is no parent + * @return the snapshot timestamp in milliseconds + */ + private long snapshotTimestampMillis(Snapshot parentSnapshot) { + long now = clock.millis(); + if (base.formatVersion() >= TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS + && parentSnapshot != null) { + return Math.max(now, parentSnapshot.timestampMillis() + 1); + } + + return now; + } + protected long snapshotId() { if (snapshotId == null) { synchronized (this) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 43a67dd2bef2..0b29cae3ffd1 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -57,6 +57,7 @@ public class TableMetadata implements Serializable { static final int DEFAULT_TABLE_FORMAT_VERSION = 2; static final int SUPPORTED_TABLE_FORMAT_VERSION = 4; static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3; + static final int MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS = 4; static final int INITIAL_SPEC_ID = 0; static final int INITIAL_SORT_ORDER_ID = 1; static final int INITIAL_SCHEMA_ID = 0; diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java index dd97738759f4..f4b1b223b2b3 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java @@ -22,10 +22,14 @@ import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -243,4 +247,89 @@ public void testManifestCompressionFromTableProperty() throws IOException { ManifestFile manifest = table.currentSnapshot().dataManifests(table.io()).get(0); assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("snappy"); } + + @TestTemplate + public void testSnapshotTimestampsAreMonotonicallyIncreasing() { + assumeThat(formatVersion) + .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS); + + table.newFastAppend().appendFile(FILE_A).commit(); + Snapshot first = table.currentSnapshot(); + + table.newFastAppend().appendFile(FILE_B).commit(); + Snapshot second = table.currentSnapshot(); + assertThat(second.timestampMillis()) + .as("V4 snapshot timestamps must be strictly increasing") + .isGreaterThan(first.timestampMillis()); + + table.newFastAppend().appendFile(FILE_C).commit(); + Snapshot third = table.currentSnapshot(); + assertThat(third.timestampMillis()) + .as("V4 snapshot timestamps must be strictly increasing") + .isGreaterThan(second.timestampMillis()); + } + + @TestTemplate + public void testV4LamportClockFastForwardsDriftedClock() { + assumeThat(formatVersion) + .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS); + + table.newFastAppend().appendFile(FILE_A).commit(); + long firstTs = table.currentSnapshot().timestampMillis(); + + // simulate clock drift: wall clock reports a time far in the past + long driftedTime = firstTs - 10_000; + Clock driftedClock = Clock.fixed(Instant.ofEpochMilli(driftedTime), ZoneOffset.UTC); + AppendFiles append = table.newFastAppend().appendFile(FILE_B); + ((SnapshotProducer) append).setClock(driftedClock); + append.commit(); + + long secondTs = table.currentSnapshot().timestampMillis(); + assertThat(secondTs) + .as( + "Lamport clock should fast-forward past the drifted wall clock to the last snapshot timestamp + 1 ms") + .isEqualTo(firstTs + 1); + } + + @TestTemplate + public void testV4MonotonicityIsScopedToTargetBranch() { + assumeThat(formatVersion) + .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS); + + // Establish a base snapshot on main. + table.newFastAppend().appendFile(FILE_A).commit(); + long mainParentTs = table.currentSnapshot().timestampMillis(); + + // Create a branch pointing at the current main head. + String branchName = "test-branch"; + table.manageSnapshots().createBranch(branchName).commit(); + + // Commit to the branch with a clock far in the future so the branch head's + // timestamp-ms is much higher than main's head timestamp-ms. + long branchFutureTs = mainParentTs + 1_000_000L; + Clock branchClock = Clock.fixed(Instant.ofEpochMilli(branchFutureTs), ZoneOffset.UTC); + AppendFiles branchAppend = table.newFastAppend().appendFile(FILE_B).toBranch(branchName); + ((SnapshotProducer) branchAppend).setClock(branchClock); + branchAppend.commit(); + long branchTs = table.snapshot(branchName).timestampMillis(); + assertThat(branchTs) + .as("Sanity: branch commit should adopt the simulated far-future wall-clock value") + .isEqualTo(branchFutureTs); + + // Commit to main with a clock just slightly after mainParentTs but far below branchTs. + // The spec requires monotonicity "on the same branch", so main's new timestamp must be + // constrained only by main's previous head, not by the unrelated future timestamp on + // the other branch. + long mainNewTs = mainParentTs + 5; + Clock mainClock = Clock.fixed(Instant.ofEpochMilli(mainNewTs), ZoneOffset.UTC); + AppendFiles mainAppend = table.newFastAppend().appendFile(FILE_C); + ((SnapshotProducer) mainAppend).setClock(mainClock); + mainAppend.commit(); + + long actualMainTs = table.currentSnapshot().timestampMillis(); + assertThat(actualMainTs) + .as("Main's monotonicity must be relative to main's parent, not the branch head") + .isEqualTo(mainNewTs) + .isLessThan(branchTs); + } }