Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
@ExtendWith(ParameterizedTestExtension.class)
public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase {

// Bound the concurrency tests below so they fail fast instead of running until CI or
// disk limits are hit when the expected conflict never occurs.
protected static final int MAX_OPERATIONS = 20;
protected static final int OPERATION_TIMEOUT_MINUTES = 5;

@Parameter(index = 3)
protected FileFormat fileFormat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
Future<?> deleteFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -122,7 +122,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -148,13 +148,14 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
});

try {
assertThatThrownBy(deleteFuture::get)
assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("the table has been concurrently modified");
} finally {
shouldAppend.set(false);
deleteFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception
Future<?> mergeFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -116,7 +116,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -137,13 +137,14 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception
});

try {
assertThatThrownBy(mergeFuture::get)
assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("the table has been concurrently modified");
} finally {
shouldAppend.set(false);
mergeFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception
Future<?> updateFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -109,7 +109,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -135,13 +135,14 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception
});

try {
assertThatThrownBy(updateFuture::get)
assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("the table has been concurrently modified");
} finally {
shouldAppend.set(false);
updateFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte
Future<?> deleteFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -1102,7 +1102,7 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -1128,13 +1128,14 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte
});

try {
assertThatThrownBy(deleteFuture::get)
assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found conflicting files that can contain");
} finally {
shouldAppend.set(false);
deleteFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down Expand Up @@ -1180,7 +1181,7 @@ public synchronized void testDeleteWithSnapshotIsolation()
Future<?> deleteFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < 20; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -1204,7 +1205,7 @@ public synchronized void testDeleteWithSnapshotIsolation()
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < 20; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted
Future<?> mergeFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -1571,7 +1571,7 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -1596,13 +1596,14 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted
});

try {
assertThatThrownBy(mergeFuture::get)
assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found conflicting files that can contain");
} finally {
shouldAppend.set(false);
mergeFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down Expand Up @@ -1648,7 +1649,7 @@ public synchronized void testMergeWithSnapshotIsolation()
Future<?> mergeFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < 20; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -1677,7 +1678,7 @@ public synchronized void testMergeWithSnapshotIsolation()
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < 20; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte
Future<?> updateFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -672,7 +672,7 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -698,13 +698,14 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte
});

try {
assertThatThrownBy(updateFuture::get)
assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Found conflicting files that can contain");
} finally {
shouldAppend.set(false);
updateFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down Expand Up @@ -741,7 +742,7 @@ public synchronized void testUpdateWithSnapshotIsolation()
Future<?> updateFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < 20; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -765,7 +766,7 @@ public synchronized void testUpdateWithSnapshotIsolation()
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < 20; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
@ExtendWith(ParameterizedTestExtension.class)
public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase {

// Bound the concurrency tests below so they fail fast instead of running until CI or
// disk limits are hit when the expected conflict never occurs.
protected static final int MAX_OPERATIONS = 20;
protected static final int OPERATION_TIMEOUT_MINUTES = 5;

@Parameter(index = 3)
protected FileFormat fileFormat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
Future<?> deleteFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -122,7 +122,7 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -148,13 +148,14 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception
});

try {
assertThatThrownBy(deleteFuture::get)
assertThatThrownBy(() -> deleteFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("the table has been concurrently modified");
} finally {
shouldAppend.set(false);
deleteFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception
Future<?> mergeFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -116,7 +116,7 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -137,13 +137,14 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception
});

try {
assertThatThrownBy(mergeFuture::get)
assertThatThrownBy(() -> mergeFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("the table has been concurrently modified");
} finally {
shouldAppend.set(false);
mergeFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception
Future<?> updateFuture =
executorService.submit(
() -> {
for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -109,7 +109,7 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception
record.set(0, 1); // id
record.set(1, "hr"); // dep

for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
for (int numOperations = 0; numOperations < MAX_OPERATIONS; numOperations++) {
int currentNumOperations = numOperations;
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
Expand All @@ -135,13 +135,14 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception
});

try {
assertThatThrownBy(updateFuture::get)
assertThatThrownBy(() -> updateFuture.get(OPERATION_TIMEOUT_MINUTES, TimeUnit.MINUTES))
.isInstanceOf(ExecutionException.class)
.cause()
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("the table has been concurrently modified");
} finally {
shouldAppend.set(false);
updateFuture.cancel(true);
appendFuture.cancel(true);
}

Expand Down
Loading