From e27bc6b0b255e6a5e870801d6c60a530091731cf Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 25 May 2026 14:16:35 +0700 Subject: [PATCH] Spark: Bound runaway serializable-isolation and concurrent-refresh tests Co-Authored-By: Claude Opus 4.7 (1M context) --- .../extensions/SparkRowLevelOperationsTestBase.java | 5 +++++ .../spark/extensions/TestCopyOnWriteDelete.java | 7 ++++--- .../spark/extensions/TestCopyOnWriteMerge.java | 7 ++++--- .../spark/extensions/TestCopyOnWriteUpdate.java | 7 ++++--- .../apache/iceberg/spark/extensions/TestDelete.java | 11 ++++++----- .../apache/iceberg/spark/extensions/TestMerge.java | 11 ++++++----- .../apache/iceberg/spark/extensions/TestUpdate.java | 11 ++++++----- .../extensions/SparkRowLevelOperationsTestBase.java | 5 +++++ .../spark/extensions/TestCopyOnWriteDelete.java | 7 ++++--- .../spark/extensions/TestCopyOnWriteMerge.java | 7 ++++--- .../spark/extensions/TestCopyOnWriteUpdate.java | 7 ++++--- .../apache/iceberg/spark/extensions/TestDelete.java | 11 ++++++----- .../apache/iceberg/spark/extensions/TestMerge.java | 11 ++++++----- .../apache/iceberg/spark/extensions/TestUpdate.java | 11 ++++++----- .../extensions/SparkRowLevelOperationsTestBase.java | 5 +++++ .../apache/iceberg/spark/extensions/TestDelete.java | 11 ++++++----- .../apache/iceberg/spark/extensions/TestMerge.java | 11 ++++++----- .../apache/iceberg/spark/extensions/TestUpdate.java | 11 ++++++----- 18 files changed, 93 insertions(+), 63 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java index 72988ae0ed9e..352bf9692110 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java @@ -82,6 +82,11 @@ @ExtendWith(ParameterizedTestExtension.class) public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase { + // Bound the concurrency tests below so they fail fast instead of running until CI or + // disk limits are hit when the expected conflict never occurs. + protected static final int MAX_OPERATIONS = 20; + protected static final int OPERATION_TIMEOUT_MINUTES = 5; + @Parameter(index = 3) protected FileFormat fileFormat; diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java index d39dff060c9a..33862059adc2 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java @@ -101,7 +101,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -122,7 +122,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -148,13 +148,14 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception }); try { - assertThatThrownBy(deleteFuture::get) + assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(IllegalStateException.class) .hasMessageContaining("the table has been concurrently modified"); } finally { shouldAppend.set(false); + deleteFuture.cancel(true); appendFuture.cancel(true); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java index 394dbbda1a3d..3e19de1f0556 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java @@ -90,7 +90,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -116,7 +116,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -137,13 +137,14 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception }); try { - assertThatThrownBy(mergeFuture::get) + assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(IllegalStateException.class) .hasMessageContaining("the table has been concurrently modified"); } finally { shouldAppend.set(false); + mergeFuture.cancel(true); appendFuture.cancel(true); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java index b547218acbd4..6d144a0ec235 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java @@ -88,7 +88,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -109,7 +109,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -135,13 +135,14 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception }); try { - assertThatThrownBy(updateFuture::get) + assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(IllegalStateException.class) .hasMessageContaining("the table has been concurrently modified"); } finally { shouldAppend.set(false); + updateFuture.cancel(true); appendFuture.cancel(true); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 79d6bea12f67..e79fdfba6928 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -1078,7 +1078,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1102,7 +1102,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1128,13 +1128,14 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte }); try { - assertThatThrownBy(deleteFuture::get) + assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + deleteFuture.cancel(true); appendFuture.cancel(true); } @@ -1180,7 +1181,7 @@ public synchronized void testDeleteWithSnapshotIsolation() Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1204,7 +1205,7 @@ public synchronized void testDeleteWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index cac853dae746..eea0b9fd1e08 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -1542,7 +1542,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1571,7 +1571,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1596,13 +1596,14 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted }); try { - assertThatThrownBy(mergeFuture::get) + assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + mergeFuture.cancel(true); appendFuture.cancel(true); } @@ -1648,7 +1649,7 @@ public synchronized void testMergeWithSnapshotIsolation() Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1677,7 +1678,7 @@ public synchronized void testMergeWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 20cd0f239179..87a587081e85 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -648,7 +648,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -672,7 +672,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -698,13 +698,14 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte }); try { - assertThatThrownBy(updateFuture::get) + assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + updateFuture.cancel(true); appendFuture.cancel(true); } @@ -741,7 +742,7 @@ public synchronized void testUpdateWithSnapshotIsolation() Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -765,7 +766,7 @@ public synchronized void testUpdateWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java index 72988ae0ed9e..352bf9692110 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java @@ -82,6 +82,11 @@ @ExtendWith(ParameterizedTestExtension.class) public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase { + // Bound the concurrency tests below so they fail fast instead of running until CI or + // disk limits are hit when the expected conflict never occurs. + protected static final int MAX_OPERATIONS = 20; + protected static final int OPERATION_TIMEOUT_MINUTES = 5; + @Parameter(index = 3) protected FileFormat fileFormat; diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java index d39dff060c9a..33862059adc2 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java @@ -101,7 +101,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -122,7 +122,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -148,13 +148,14 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception }); try { - assertThatThrownBy(deleteFuture::get) + assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(IllegalStateException.class) .hasMessageContaining("the table has been concurrently modified"); } finally { shouldAppend.set(false); + deleteFuture.cancel(true); appendFuture.cancel(true); } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java index 394dbbda1a3d..3e19de1f0556 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java @@ -90,7 +90,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -116,7 +116,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -137,13 +137,14 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception }); try { - assertThatThrownBy(mergeFuture::get) + assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(IllegalStateException.class) .hasMessageContaining("the table has been concurrently modified"); } finally { shouldAppend.set(false); + mergeFuture.cancel(true); appendFuture.cancel(true); } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java index b547218acbd4..6d144a0ec235 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java @@ -88,7 +88,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -109,7 +109,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -135,13 +135,14 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception }); try { - assertThatThrownBy(updateFuture::get) + assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(IllegalStateException.class) .hasMessageContaining("the table has been concurrently modified"); } finally { shouldAppend.set(false); + updateFuture.cancel(true); appendFuture.cancel(true); } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 79d6bea12f67..e79fdfba6928 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -1078,7 +1078,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1102,7 +1102,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1128,13 +1128,14 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte }); try { - assertThatThrownBy(deleteFuture::get) + assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + deleteFuture.cancel(true); appendFuture.cancel(true); } @@ -1180,7 +1181,7 @@ public synchronized void testDeleteWithSnapshotIsolation() Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1204,7 +1205,7 @@ public synchronized void testDeleteWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 1d9a3de9fa23..193c01d1ecc7 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -1563,7 +1563,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1592,7 +1592,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1617,13 +1617,14 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted }); try { - assertThatThrownBy(mergeFuture::get) + assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + mergeFuture.cancel(true); appendFuture.cancel(true); } @@ -1669,7 +1670,7 @@ public synchronized void testMergeWithSnapshotIsolation() Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1698,7 +1699,7 @@ public synchronized void testMergeWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 27c64bb28ec5..9db621277eea 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -648,7 +648,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -672,7 +672,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -698,13 +698,14 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte }); try { - assertThatThrownBy(updateFuture::get) + assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + updateFuture.cancel(true); appendFuture.cancel(true); } @@ -741,7 +742,7 @@ public synchronized void testUpdateWithSnapshotIsolation() Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -765,7 +766,7 @@ public synchronized void testUpdateWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java index 72988ae0ed9e..352bf9692110 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java @@ -82,6 +82,11 @@ @ExtendWith(ParameterizedTestExtension.class) public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase { + // Bound the concurrency tests below so they fail fast instead of running until CI or + // disk limits are hit when the expected conflict never occurs. + protected static final int MAX_OPERATIONS = 20; + protected static final int OPERATION_TIMEOUT_MINUTES = 5; + @Parameter(index = 3) protected FileFormat fileFormat; diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 9e9d751691be..09b30aa7820b 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -1078,7 +1078,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1102,7 +1102,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1128,13 +1128,14 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte }); try { - assertThatThrownBy(deleteFuture::get) + assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + deleteFuture.cancel(true); appendFuture.cancel(true); } @@ -1180,7 +1181,7 @@ public synchronized void testDeleteWithSnapshotIsolation() Future deleteFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1204,7 +1205,7 @@ public synchronized void testDeleteWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 3f428963c412..9ad59d964646 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -1563,7 +1563,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1592,7 +1592,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1617,13 +1617,14 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted }); try { - assertThatThrownBy(mergeFuture::get) + assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + mergeFuture.cancel(true); appendFuture.cancel(true); } @@ -1669,7 +1670,7 @@ public synchronized void testMergeWithSnapshotIsolation() Future mergeFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -1698,7 +1699,7 @@ public synchronized void testMergeWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index d32908aeb0ed..10fe81a12e0d 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -648,7 +648,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -672,7 +672,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -698,13 +698,14 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte }); try { - assertThatThrownBy(updateFuture::get) + assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES)) .isInstanceOf(ExecutionException.class) .cause() .isInstanceOf(ValidationException.class) .hasMessageContaining("Found conflicting files that can contain"); } finally { shouldAppend.set(false); + updateFuture.cancel(true); appendFuture.cancel(true); } @@ -741,7 +742,7 @@ public synchronized void testUpdateWithSnapshotIsolation() Future updateFuture = executorService.submit( () -> { - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS) @@ -765,7 +766,7 @@ public synchronized void testUpdateWithSnapshotIsolation() record.set(0, 1); // id record.set(1, "hr"); // dep - for (int numOperations = 0; numOperations < 20; numOperations++) { + for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) { int currentNumOperations = numOperations; Awaitility.await() .pollInterval(10, TimeUnit.MILLISECONDS)