Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.math.RoundingMode;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -122,6 +123,7 @@ public void accept(String file) {
private SnapshotAncestryValidator snapshotAncestryValidator =
SnapshotAncestryValidator.NON_VALIDATING;

private Clock clock = Clock.systemUTC();
private ExecutorService workerPool;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
private CommitMetrics commitMetrics;
Expand Down Expand Up @@ -337,7 +339,7 @@ public Snapshot apply() {
sequenceNumber,
snapshotId(),
parentSnapshotId,
System.currentTimeMillis(),
snapshotTimestampMillis(parentSnapshot),
operation(),
summary(base),
base.currentSchemaId(),
Expand Down Expand Up @@ -662,6 +664,31 @@ protected ManifestReader<DeleteFile> newDeleteManifestReader(ManifestFile manife
return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById());
}

@VisibleForTesting
void setClock(Clock newClock) {
this.clock = newClock;
}

/**
* Generates the snapshot timestamp in milliseconds.
*
* <p>For format version 4 and above, this implements the Lamport clock algorithm to guarantee
* monotonically increasing snapshot timestamps. For older format versions, this returns the
* current wall clock time.
*
* @param parentSnapshot the parent snapshot on the target branch, or null if there is no parent
* @return the snapshot timestamp in milliseconds
*/
private long snapshotTimestampMillis(Snapshot parentSnapshot) {
long now = clock.millis();
if (base.formatVersion() >= TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS
&& parentSnapshot != null) {
return Math.max(now, parentSnapshot.timestampMillis() + 1);
}

return now;
}

protected long snapshotId() {
if (snapshotId == null) {
synchronized (this) {
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class TableMetadata implements Serializable {
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 4;
static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
static final int MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS = 4;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
Expand Down
89 changes: 89 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import static org.apache.iceberg.avro.AvroTestHelpers.readAvroCodec;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -243,4 +247,89 @@ public void testManifestCompressionFromTableProperty() throws IOException {
ManifestFile manifest = table.currentSnapshot().dataManifests(table.io()).get(0);
assertThat(readAvroCodec(new File(manifest.path()))).isEqualTo("snappy");
}

@TestTemplate
public void testSnapshotTimestampsAreMonotonicallyIncreasing() {
assumeThat(formatVersion)
.isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS);

table.newFastAppend().appendFile(FILE_A).commit();
Snapshot first = table.currentSnapshot();

table.newFastAppend().appendFile(FILE_B).commit();
Snapshot second = table.currentSnapshot();
assertThat(second.timestampMillis())
.as("V4 snapshot timestamps must be strictly increasing")
.isGreaterThan(first.timestampMillis());

table.newFastAppend().appendFile(FILE_C).commit();
Snapshot third = table.currentSnapshot();
assertThat(third.timestampMillis())
.as("V4 snapshot timestamps must be strictly increasing")
.isGreaterThan(second.timestampMillis());
}

@TestTemplate
public void testV4LamportClockFastForwardsDriftedClock() {
assumeThat(formatVersion)
.isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS);

table.newFastAppend().appendFile(FILE_A).commit();
long firstTs = table.currentSnapshot().timestampMillis();

// simulate clock drift: wall clock reports a time far in the past
long driftedTime = firstTs - 10_000;
Clock driftedClock = Clock.fixed(Instant.ofEpochMilli(driftedTime), ZoneOffset.UTC);
AppendFiles append = table.newFastAppend().appendFile(FILE_B);
((SnapshotProducer<?>) append).setClock(driftedClock);
append.commit();

long secondTs = table.currentSnapshot().timestampMillis();
assertThat(secondTs)
.as(
"Lamport clock should fast-forward past the drifted wall clock to the last snapshot timestamp + 1 ms")
.isEqualTo(firstTs + 1);
}

@TestTemplate
public void testV4MonotonicityIsScopedToTargetBranch() {
assumeThat(formatVersion)
.isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS);

// Establish a base snapshot on main.
table.newFastAppend().appendFile(FILE_A).commit();
long mainParentTs = table.currentSnapshot().timestampMillis();

// Create a branch pointing at the current main head.
String branchName = "test-branch";
table.manageSnapshots().createBranch(branchName).commit();

// Commit to the branch with a clock far in the future so the branch head's
// timestamp-ms is much higher than main's head timestamp-ms.
long branchFutureTs = mainParentTs + 1_000_000L;
Clock branchClock = Clock.fixed(Instant.ofEpochMilli(branchFutureTs), ZoneOffset.UTC);
AppendFiles branchAppend = table.newFastAppend().appendFile(FILE_B).toBranch(branchName);
((SnapshotProducer<?>) branchAppend).setClock(branchClock);
branchAppend.commit();
long branchTs = table.snapshot(branchName).timestampMillis();
assertThat(branchTs)
.as("Sanity: branch commit should adopt the simulated far-future wall-clock value")
.isEqualTo(branchFutureTs);

// Commit to main with a clock just slightly after mainParentTs but far below branchTs.
// The spec requires monotonicity "on the same branch", so main's new timestamp must be
// constrained only by main's previous head, not by the unrelated future timestamp on
// the other branch.
long mainNewTs = mainParentTs + 5;
Clock mainClock = Clock.fixed(Instant.ofEpochMilli(mainNewTs), ZoneOffset.UTC);
AppendFiles mainAppend = table.newFastAppend().appendFile(FILE_C);
((SnapshotProducer<?>) mainAppend).setClock(mainClock);
mainAppend.commit();

long actualMainTs = table.currentSnapshot().timestampMillis();
assertThat(actualMainTs)
.as("Main's monotonicity must be relative to main's parent, not the branch head")
.isEqualTo(mainNewTs)
.isLessThan(branchTs);
}
}