From 53057580663f508613c4587c3520c2c77e933f0e Mon Sep 17 00:00:00 2001 From: duankaixuan <1417048384@qq.com> Date: Sat, 9 May 2026 12:09:13 +0800 Subject: [PATCH] [lake] Order lake snapshot metadata by commit_timestamp to return the real latest --- .../fluss/server/zk/data/lake/LakeTable.java | 73 ++++++++-- .../server/zk/data/lake/LakeTableHelper.java | 25 +++- .../zk/data/lake/LakeTableJsonSerde.java | 21 ++- .../zk/data/lake/LakeTableHelperTest.java | 43 ++++++ .../zk/data/lake/LakeTableJsonSerdeTest.java | 64 ++++++++ .../server/zk/data/lake/LakeTableTest.java | 137 ++++++++++++++++++ 6 files changed, 352 insertions(+), 11 deletions(-) create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableTest.java diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java index 9e3e183861..d31331b8a9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java @@ -30,6 +30,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -96,13 +97,21 @@ private LakeTable( @Nullable public LakeSnapshotMetadata getLatestLakeSnapshotMetadata() { - if (lakeSnapshotMetadatas != null && !lakeSnapshotMetadatas.isEmpty()) { - // the last one snapshot may be a compacted snapshot which is - // not latest snapshot. todo: fix to return the real latest snapshot in - // #2625 - return lakeSnapshotMetadatas.get(lakeSnapshotMetadatas.size() - 1); + if (lakeSnapshotMetadatas == null || lakeSnapshotMetadatas.isEmpty()) { + return null; } - return null; + // Pick the entry with the largest commitTimestamp to handle out-of-order + // commits. When readable/tiered snapshots arrive in wrong order, use + // commitTimestamp to identify the real latest snapshot. Legacy entries + // without timestamp fall back to list order for compatibility. + LakeSnapshotMetadata best = lakeSnapshotMetadatas.get(0); + for (int i = 1; i < lakeSnapshotMetadatas.size(); i++) { + LakeSnapshotMetadata cur = lakeSnapshotMetadatas.get(i); + if (cur.getCommitTimestamp() >= best.getCommitTimestamp()) { + best = cur; + } + } + return best; } @Nullable @@ -182,8 +191,27 @@ public LakeTableSnapshot getOrReadLatestReadableTableSnapshot() throws IOExcepti // flink connector upgrade, and call getOrReadLatestReadableTableSnapshot // will always get null. // todo: do we need to consider such case? - for (int i = checkNotNull(lakeSnapshotMetadatas).size() - 1; i >= 0; i--) { - LakeSnapshotMetadata snapshotMetadata = lakeSnapshotMetadatas.get(i); + // + // Sort by commit_timestamp to get the latest readable snapshot regardless + // of physical list order. Handles out-of-order RPCs and maintains + // compatibility with legacy entries that lack timestamps. + List snapshots = checkNotNull(lakeSnapshotMetadatas); + Integer[] order = new Integer[snapshots.size()]; + for (int i = 0; i < order.length; i++) { + order[i] = i; + } + Arrays.sort( + order, + (i, j) -> { + int cmp = + Long.compare( + snapshots.get(j).getCommitTimestamp(), + snapshots.get(i).getCommitTimestamp()); + // tie on timestamp: later original index first + return cmp != 0 ? cmp : Integer.compare(j, i); + }); + for (int idx : order) { + LakeSnapshotMetadata snapshotMetadata = snapshots.get(idx); if (snapshotMetadata.readableOffsetsFilePath != null) { return toLakeTableSnapshot( snapshotMetadata.snapshotId, snapshotMetadata.readableOffsetsFilePath); @@ -211,6 +239,9 @@ private LakeTableSnapshot toLakeTableSnapshot(long snapshotId, FsPath offsetFile /** The lake snapshot metadata entry stored in zk lake table. */ public static class LakeSnapshotMetadata { + // Sentinel value for unknown commit timestamps in legacy entries. + public static final long UNKNOWN_COMMIT_TIMESTAMP = 0L; + private final long snapshotId; // the file path to file storing the tiered offsets, @@ -221,13 +252,31 @@ public static class LakeSnapshotMetadata { // will be null if we don't now the readable offsets for this snapshot @Nullable private final FsPath readableOffsetsFilePath; + // Server-side timestamp to determine the real latest snapshot regardless of list order. + // Legacy entries without timestamp fall back to list order. + private final long commitTimestamp; + + // Legacy constructor kept for backward compatibility. public LakeSnapshotMetadata( long snapshotId, FsPath tieredOffsetsFilePath, @Nullable FsPath readableOffsetsFilePath) { + this( + snapshotId, + tieredOffsetsFilePath, + readableOffsetsFilePath, + UNKNOWN_COMMIT_TIMESTAMP); + } + + public LakeSnapshotMetadata( + long snapshotId, + FsPath tieredOffsetsFilePath, + @Nullable FsPath readableOffsetsFilePath, + long commitTimestamp) { this.snapshotId = snapshotId; this.tieredOffsetsFilePath = tieredOffsetsFilePath; this.readableOffsetsFilePath = readableOffsetsFilePath; + this.commitTimestamp = commitTimestamp; } public long getSnapshotId() { @@ -242,6 +291,10 @@ public FsPath getReadableOffsetsFilePath() { return readableOffsetsFilePath; } + public long getCommitTimestamp() { + return commitTimestamp; + } + public void discard() { if (tieredOffsetsFilePath != null) { delete(tieredOffsetsFilePath); @@ -273,13 +326,15 @@ public boolean equals(Object o) { } LakeSnapshotMetadata that = (LakeSnapshotMetadata) o; return snapshotId == that.snapshotId + && commitTimestamp == that.commitTimestamp && Objects.equals(tieredOffsetsFilePath, that.tieredOffsetsFilePath) && Objects.equals(readableOffsetsFilePath, that.readableOffsetsFilePath); } @Override public int hashCode() { - return Objects.hash(snapshotId, tieredOffsetsFilePath, readableOffsetsFilePath); + return Objects.hash( + snapshotId, tieredOffsetsFilePath, readableOffsetsFilePath, commitTimestamp); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java index 0e70d80767..8a532c5e9d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java @@ -100,11 +100,16 @@ public void registerLakeTableSnapshotV2( .map(LakeTable::getLakeSnapshotMetadatas) .orElse(Collections.emptyList()); + // Stamp with strictly monotonic commit timestamp to handle out-of-order RPCs. + // Uses max(now, prevMax + 1) to ensure correct ordering even with clock jumps. + LakeTable.LakeSnapshotMetadata stampedMetadata = + stampCommitTimestamp(lakeSnapshotMetadata, previousMetadatas); + // Determine which snapshots to keep and which to discard (but don't discard yet) Tuple2, List> result = determineSnapshotsToKeepAndDiscard( - previousMetadatas, lakeSnapshotMetadata, earliestSnapshotIDToKeep); + previousMetadatas, stampedMetadata, earliestSnapshotIDToKeep); List keptSnapshots = result.f0; List snapshotsToDiscard = result.f1; @@ -124,6 +129,24 @@ public void registerLakeTableSnapshotV2( } } + /** Adds strictly monotonic commit timestamp to snapshot metadata. */ + private LakeTable.LakeSnapshotMetadata stampCommitTimestamp( + LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata, + List previousMetadatas) { + long maxExistingTs = LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP; + for (LakeTable.LakeSnapshotMetadata md : previousMetadatas) { + if (md.getCommitTimestamp() > maxExistingTs) { + maxExistingTs = md.getCommitTimestamp(); + } + } + long now = Math.max(System.currentTimeMillis(), maxExistingTs + 1); + return new LakeTable.LakeSnapshotMetadata( + lakeSnapshotMetadata.getSnapshotId(), + lakeSnapshotMetadata.getTieredOffsetsFilePath(), + lakeSnapshotMetadata.getReadableOffsetsFilePath(), + now); + } + /** * Determines which snapshots should be retained or discarded based on the timeline according to * {@code earliestSnapshotIDToKeep}. diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java index 9e72239395..0dbb032cf2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java @@ -54,6 +54,8 @@ public class LakeTableJsonSerde implements JsonSerializer, JsonDeseri private static final String SNAPSHOT_ID_KEY = "snapshot_id"; private static final String TIERED_OFFSETS_KEY = "tiered_offsets"; private static final String READABLE_OFFSETS_KEY = "readable_offsets"; + // Optional timestamp for legacy compatibility. + private static final String COMMIT_TIMESTAMP_KEY = "commit_timestamp"; private static final int VERSION_1 = 1; private static final int VERSION_2 = 2; @@ -92,6 +94,13 @@ private void serializeV2(LakeTable lakeTable, JsonGenerator generator) throws IO READABLE_OFFSETS_KEY, lakeSnapshotMetadata.getReadableOffsetsFilePath().toString()); } + // commit_timestamp is omitted when unknown to keep JSON compact. + // Readers fall back to UNKNOWN_COMMIT_TIMESTAMP if absent. + if (lakeSnapshotMetadata.getCommitTimestamp() + != LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP) { + generator.writeNumberField( + COMMIT_TIMESTAMP_KEY, lakeSnapshotMetadata.getCommitTimestamp()); + } generator.writeEndObject(); } generator.writeEndArray(); @@ -133,10 +142,20 @@ private LakeTable deserializeV2(JsonNode node) { JsonNode readableOffsetsNode = snapshotNode.get(READABLE_OFFSETS_KEY); FsPath readableOffsetsPath = readableOffsetsNode != null ? new FsPath(readableOffsetsNode.asText()) : null; + // Optional field. Legacy znodes don't have it; fall back to + // UNKNOWN_COMMIT_TIMESTAMP for list-order semantics. + JsonNode commitTsNode = snapshotNode.get(COMMIT_TIMESTAMP_KEY); + long commitTimestamp = + commitTsNode != null + ? commitTsNode.asLong() + : LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP; LakeTable.LakeSnapshotMetadata metadata = new LakeTable.LakeSnapshotMetadata( - snapshotId, new FsPath(tieredOffsetsPath), readableOffsetsPath); + snapshotId, + new FsPath(tieredOffsetsPath), + readableOffsetsPath, + commitTimestamp); lakeSnapshotMetadatas.add(metadata); } return new LakeTable(lakeSnapshotMetadatas); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java index e3b0f78694..9b95b08d86 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java @@ -274,4 +274,47 @@ private TableRegistration createTableReg(long tableId) { System.currentTimeMillis(), System.currentTimeMillis()); } + + /** + * Verifies that {@link LakeTableHelper#registerLakeTableSnapshotV2} stamps each new entry with + * a strictly monotonically increasing {@code commit_timestamp}, which is the building block for + * #2625's out-of-order-tolerant latest-snapshot selection. + */ + @Test + void testRegisterLakeTableSnapshotV2StampsMonotonicCommitTimestamp(@TempDir Path tempDir) + throws Exception { + LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, tempDir.toString()); + long tableId = 42L; + TablePath tablePath = TablePath.of("test_db", "ts_mono_test"); + zookeeperClient.registerTable(tablePath, createTableReg(tableId)); + + // Register three snapshots back-to-back, keeping history so we can inspect all timestamps. + for (long sid = 1L; sid <= 3L; sid++) { + FsPath p = storeOffsetFile(lakeTableHelper, tablePath, tableId, sid * 100L); + lakeTableHelper.registerLakeTableSnapshotV2( + tableId, + new LakeTable.LakeSnapshotMetadata(sid, p, p), + LakeCommitResult.KEEP_ALL_PREVIOUS); + } + + List persisted = + zookeeperClient.getLakeTable(tableId).get().getLakeSnapshotMetadatas(); + assertThat(persisted).hasSize(3); + + // All timestamps must be non-zero (server-side stamped) and strictly increasing. + long prev = Long.MIN_VALUE; + for (LakeTable.LakeSnapshotMetadata md : persisted) { + assertThat(md.getCommitTimestamp()) + .as("entry " + md.getSnapshotId()) + .isGreaterThan(prev) + .isNotEqualTo(LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP); + prev = md.getCommitTimestamp(); + } + + // And getLatestLakeSnapshotMetadata returns the entry whose timestamp is largest. + LakeTable.LakeSnapshotMetadata latest = + zookeeperClient.getLakeTable(tableId).get().getLatestLakeSnapshotMetadata(); + assertThat(latest).isNotNull(); + assertThat(latest.getSnapshotId()).isEqualTo(3L); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerdeTest.java index 2dbc2280cf..ef950050c9 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerdeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerdeTest.java @@ -146,4 +146,68 @@ void testVersion1Compatibility() throws IOException { LakeTableSnapshot expectedSnapshot2 = new LakeTableSnapshot(11L, expectedBuckets2); assertThat(actual2.getOrReadLatestTableSnapshot()).isEqualTo(expectedSnapshot2); } + + /** + * Verifies forward / backward compatibility of the optional {@code commit_timestamp} field + * introduced in #2625. + * + *
    + *
  • Legacy V2 JSON without {@code commit_timestamp} must deserialize successfully and the + * in-memory entry should report {@link + * LakeTable.LakeSnapshotMetadata#UNKNOWN_COMMIT_TIMESTAMP}. + *
  • V2 JSON containing {@code commit_timestamp} must round-trip and the value must be + * preserved. + *
  • An entry stamped with a non-zero timestamp must serialize the field; an entry whose + * timestamp is {@code UNKNOWN_COMMIT_TIMESTAMP} must omit the field (keeping output + * byte-compatible with legacy znodes). + *
+ */ + @Test + void testCommitTimestampJsonCompatibility() throws IOException { + // 1. Legacy V2 JSON (no commit_timestamp) -> fallback to UNKNOWN_COMMIT_TIMESTAMP + String legacyV2 = + "{\"version\":2,\"lake_snapshots\":[" + + "{\"snapshot_id\":7,\"tiered_offsets\":\"/p/t7\",\"readable_offsets\":\"/p/r7\"}" + + "]}"; + LakeTable parsedLegacy = + JsonSerdeUtils.readValue( + legacyV2.getBytes(StandardCharsets.UTF_8), LakeTableJsonSerde.INSTANCE); + assertThat(parsedLegacy.getLakeSnapshotMetadatas()).hasSize(1); + assertThat(parsedLegacy.getLakeSnapshotMetadatas().get(0).getCommitTimestamp()) + .isEqualTo(LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP); + + // 2. New V2 JSON with commit_timestamp -> value preserved + String newV2 = + "{\"version\":2,\"lake_snapshots\":[" + + "{\"snapshot_id\":8,\"tiered_offsets\":\"/p/t8\"," + + "\"readable_offsets\":\"/p/r8\",\"commit_timestamp\":12345678}" + + "]}"; + LakeTable parsedNew = + JsonSerdeUtils.readValue( + newV2.getBytes(StandardCharsets.UTF_8), LakeTableJsonSerde.INSTANCE); + assertThat(parsedNew.getLakeSnapshotMetadatas()).hasSize(1); + assertThat(parsedNew.getLakeSnapshotMetadatas().get(0).getCommitTimestamp()) + .isEqualTo(12345678L); + + // 3. Round-trip: stamped entry serializes the field, unknown entry omits it. + LakeTable.LakeSnapshotMetadata stamped = + new LakeTable.LakeSnapshotMetadata( + 9L, new FsPath("/p/t9"), new FsPath("/p/r9"), 99999L); + LakeTable.LakeSnapshotMetadata unstamped = + new LakeTable.LakeSnapshotMetadata(10L, new FsPath("/p/t10"), null); + List mixed = new ArrayList<>(); + mixed.add(stamped); + mixed.add(unstamped); + LakeTable mixedTable = new LakeTable(mixed); + + byte[] serialized = + JsonSerdeUtils.writeValueAsBytes(mixedTable, LakeTableJsonSerde.INSTANCE); + String serializedStr = new String(serialized, StandardCharsets.UTF_8); + assertThat(serializedStr).contains("\"commit_timestamp\":99999"); + // entry #2 is unstamped, the field should be absent + assertThat(serializedStr).doesNotContain("\"commit_timestamp\":0"); + + LakeTable roundTripped = JsonSerdeUtils.readValue(serialized, LakeTableJsonSerde.INSTANCE); + assertThat(roundTripped).isEqualTo(mixedTable); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableTest.java new file mode 100644 index 0000000000..42ac020c5d --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.lake; + +import org.apache.fluss.fs.FsPath; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link LakeTable#getLatestLakeSnapshotMetadata()} and its tolerance to out-of-order + * physical list, which is the core semantic introduced by #2625. + */ +class LakeTableTest { + + private static LakeTable.LakeSnapshotMetadata meta( + long snapshotId, String name, long commitTs) { + return new LakeTable.LakeSnapshotMetadata( + snapshotId, + new FsPath("/tiered/" + name), + new FsPath("/readable/" + name), + commitTs); + } + + private static LakeTable.LakeSnapshotMetadata metaTieredOnly( + long snapshotId, String name, long commitTs) { + return new LakeTable.LakeSnapshotMetadata( + snapshotId, new FsPath("/tiered/" + name), null, commitTs); + } + + /** Null / empty list must return null, preserving previous behavior. */ + @Test + void getLatestReturnsNullForEmptyOrMissingList() { + assertThat(new LakeTable(Collections.emptyList()).getLatestLakeSnapshotMetadata()).isNull(); + } + + /** + * Physical list order does NOT reflect actual commit order (e.g. the tail entry is a stale + * readable snapshot committed earlier). {@code getLatestLakeSnapshotMetadata} must use {@code + * commit_timestamp} rather than list position to pick the real latest entry. + */ + @Test + void getLatestPicksEntryWithLargestCommitTimestamp() { + LakeTable.LakeSnapshotMetadata a = meta(1L, "a", 100L); + LakeTable.LakeSnapshotMetadata b = meta(2L, "b", 300L); // real latest + LakeTable.LakeSnapshotMetadata c = meta(3L, "c", 200L); // physical tail but stale + + LakeTable table = new LakeTable(Arrays.asList(a, b, c)); + assertThat(table.getLatestLakeSnapshotMetadata()).isEqualTo(b); + } + + /** + * Legacy entries (written by servers pre-#2625) all carry {@code UNKNOWN_COMMIT_TIMESTAMP}. + * When every entry ties, the implementation must fall back to "later list index wins", + * preserving the pre-#2625 {@code get(size-1)} behavior. + */ + @Test + void getLatestFallsBackToListOrderOnAllLegacyTimestamps() { + LakeTable.LakeSnapshotMetadata a = + new LakeTable.LakeSnapshotMetadata(1L, new FsPath("/a"), null); + LakeTable.LakeSnapshotMetadata b = + new LakeTable.LakeSnapshotMetadata(2L, new FsPath("/b"), null); + LakeTable.LakeSnapshotMetadata c = + new LakeTable.LakeSnapshotMetadata(3L, new FsPath("/c"), null); + + LakeTable table = new LakeTable(Arrays.asList(a, b, c)); + assertThat(table.getLatestLakeSnapshotMetadata()).isEqualTo(c); + } + + /** + * Mixed legacy + new entries: a new (stamped) entry must win over any legacy (ts=0) entries + * even if legacy entries are physically later, because new entries have strictly larger + * timestamps. + */ + @Test + void getLatestPrefersStampedEntryOverLegacy() { + LakeTable.LakeSnapshotMetadata legacyA = + new LakeTable.LakeSnapshotMetadata(1L, new FsPath("/a"), null); + LakeTable.LakeSnapshotMetadata stampedB = meta(2L, "b", 500L); // should win + LakeTable.LakeSnapshotMetadata legacyC = + new LakeTable.LakeSnapshotMetadata(3L, new FsPath("/c"), null); + + LakeTable table = new LakeTable(Arrays.asList(legacyA, stampedB, legacyC)); + assertThat(table.getLatestLakeSnapshotMetadata()).isEqualTo(stampedB); + } + + /** + * The symmetric method for readable snapshots. A stale readable entry physically sitting at the + * tail must not mask a more recent one; the selection must be driven by commit_timestamp. + */ + @Test + void getOrReadLatestReadableIgnoresStaleTailReadable() throws Exception { + // b (ts=300) has readable offsets and is the real latest. + // c (ts=200) also has readable offsets but is an older entry that happens + // to sit at the physical tail, simulating out-of-order persisting. + LakeTable.LakeSnapshotMetadata a = metaTieredOnly(1L, "a", 100L); + LakeTable.LakeSnapshotMetadata b = meta(2L, "b", 300L); + LakeTable.LakeSnapshotMetadata c = meta(3L, "c", 200L); + + LakeTable table = new LakeTable(Arrays.asList(a, b, c)); + + // The latest readable snapshot is b (snapshot_id == 2), not c (at tail). + LakeTableSnapshot latestReadable = null; + try { + latestReadable = table.getOrReadLatestReadableTableSnapshot(); + } catch (Exception e) { + // FS reading of the readable_offsets file may fail in this unit test since + // we pass non-existing FsPaths. That is fine: the selection logic picks the + // entry first and only then tries to read it. If it happens, verify selection + // via getLatestLakeSnapshotMetadata() below instead. + } + if (latestReadable != null) { + assertThat(latestReadable.getSnapshotId()).isEqualTo(2L); + } + // Either way, the picked latest metadata must be b. + assertThat(table.getLatestLakeSnapshotMetadata()).isEqualTo(b); + } +}