From 8457ee1b446959973d8aea88c350433e20f49bc2 Mon Sep 17 00:00:00 2001 From: duankaixuan <1417048384@qq.com> Date: Tue, 12 May 2026 22:49:42 +0800 Subject: [PATCH] [test] Fix unstable test BatchScannerITCase.testKvSnapshotLease --- .../scanner/batch/BatchScannerITCase.java | 67 +++++++++++++------ 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java index 67e1616eb6..53c0835a2c 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java @@ -47,6 +47,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -209,54 +210,65 @@ public void testKvSnapshotLease() throws Exception { assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty(); - // test register kv snapshot lease for snapshot 0. - Map consumeBuckets = new HashMap<>(); + // test register kv snapshot lease for the first round of snapshots. + Map consumeBuckets1 = new HashMap<>(); KvSnapshots kvSnapshots = admin.getLatestKvSnapshots(tablePath).get(); for (int bucketId : kvSnapshots.getBucketIds()) { TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId); - consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong()); + consumeBuckets1.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong()); } KvSnapshotLease kvSnapshotLease1 = admin.createKvSnapshotLease(kvSnapshotLeaseId1, Duration.ofDays(1).toMillis()); - kvSnapshotLease1.acquireSnapshots(consumeBuckets).get(); + kvSnapshotLease1.acquireSnapshots(consumeBuckets1).get(); checkKvSnapshotLeaseEquals( - metadataManager, kvSnapshotLeaseId1, tableId, new Long[] {0L, 0L, 0L}); + metadataManager, + kvSnapshotLeaseId1, + tableId, + buildExpectedBucketSnapshots(consumeBuckets1)); expectedRowByBuckets = putRows(tableId, tablePath, 10); // wait snapshot2 finish FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet()); - // test register kv snapshot lease for snapshot 1. - consumeBuckets = new HashMap<>(); + // test register kv snapshot lease for the second round of snapshots. + Map consumeBuckets2 = new HashMap<>(); kvSnapshots = admin.getLatestKvSnapshots(tablePath).get(); for (int bucketId : kvSnapshots.getBucketIds()) { TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId); - consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong()); + consumeBuckets2.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong()); } KvSnapshotLease kvSnapshotLease2 = admin.createKvSnapshotLease(kvSnapshotLeaseId2, Duration.ofDays(1).toMillis()); - kvSnapshotLease2.acquireSnapshots(consumeBuckets).get(); + kvSnapshotLease2.acquireSnapshots(consumeBuckets2).get(); checkKvSnapshotLeaseEquals( - metadataManager, kvSnapshotLeaseId2, tableId, new Long[] {1L, 1L, 1L}); - // check even snapshot1 is generated, snapshot0 also retained as lease exists. + metadataManager, + kvSnapshotLeaseId2, + tableId, + buildExpectedBucketSnapshots(consumeBuckets2)); + // check even the second round snapshot is generated, the first round snapshot is also + // retained as lease exists. for (TableBucket tb : expectedRowByBuckets.keySet()) { - assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isTrue(); - assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isTrue(); + long firstSnapshotId = consumeBuckets1.get(tb); + long secondSnapshotId = consumeBuckets2.get(tb); + assertThat(zkClient.getTableBucketSnapshot(tb, firstSnapshotId).isPresent()).isTrue(); + assertThat(zkClient.getTableBucketSnapshot(tb, secondSnapshotId).isPresent()).isTrue(); } expectedRowByBuckets = putRows(tableId, tablePath, 10); // wait snapshot3 finish FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet()); - // release lease1. + // release lease1 for bucket 0. kvSnapshotLease1.releaseSnapshots(Collections.singleton(new TableBucket(tableId, 0))).get(); + Long[] expectedAfterReleaseBucket0 = buildExpectedBucketSnapshots(consumeBuckets1); + expectedAfterReleaseBucket0[0] = -1L; checkKvSnapshotLeaseEquals( - metadataManager, kvSnapshotLeaseId1, tableId, new Long[] {-1L, 0L, 0L}); + metadataManager, kvSnapshotLeaseId1, tableId, expectedAfterReleaseBucket0); // release lease2. - kvSnapshotLease2.releaseSnapshots(consumeBuckets.keySet()).get(); + kvSnapshotLease2.releaseSnapshots(consumeBuckets2.keySet()).get(); assertThat(zkClient.getKvSnapshotLeasesList()).doesNotContain(kvSnapshotLeaseId2); // release all kv snapshot lease of lease1 @@ -264,13 +276,15 @@ public void testKvSnapshotLease() throws Exception { assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty(); expectedRowByBuckets = putRows(tableId, tablePath, 10); - // wait snapshot2 finish + // wait next snapshot finish FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet()); - // as all leases are dropped, and new snapshot is generated, all old snapshot are + // as all leases are dropped, and new snapshot is generated, all old snapshots are // cleared. for (TableBucket tb : expectedRowByBuckets.keySet()) { - assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isFalse(); - assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isFalse(); + long firstSnapshotId = consumeBuckets1.get(tb); + long secondSnapshotId = consumeBuckets2.get(tb); + assertThat(zkClient.getTableBucketSnapshot(tb, firstSnapshotId).isPresent()).isFalse(); + assertThat(zkClient.getTableBucketSnapshot(tb, secondSnapshotId).isPresent()).isFalse(); } // drop no exist lease, no exception. @@ -355,6 +369,19 @@ private static int getBucketId(InternalRow row) { return function.bucketing(key, DEFAULT_BUCKET_NUM); } + /** + * Build the expected bucket snapshots array from the consumeBuckets map. The array index is the + * bucket id, and the value is the snapshot id. Buckets not present in the map are set to -1L. + */ + private Long[] buildExpectedBucketSnapshots(Map consumeBuckets) { + Long[] expected = new Long[DEFAULT_BUCKET_NUM]; + Arrays.fill(expected, -1L); + for (Map.Entry entry : consumeBuckets.entrySet()) { + expected[entry.getKey().getBucket()] = entry.getValue(); + } + return expected; + } + private void checkKvSnapshotLeaseEquals( KvSnapshotLeaseMetadataManager metadataManager, String leaseId,