Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java-spanner/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public ListenableAsyncResultSet readAsync(
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, readInternal(table, null, keys, columns, options), bufferRows);
executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows);
}

@Override
Expand All @@ -607,7 +607,7 @@ public ListenableAsyncResultSet readUsingIndexAsync(
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
readInternal(table, checkNotNull(index), keys, columns, options),
() -> readInternal(table, checkNotNull(index), keys, columns, options),
bufferRows);
}

Expand Down Expand Up @@ -659,8 +659,9 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
() ->
executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
bufferRows);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,21 @@ private enum State {

AsyncResultSetImpl(
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) {
super(delegate);
this(
executorProvider,
Suppliers.memoize(Preconditions.checkNotNull(delegate)),
bufferSize,
true);
}

private AsyncResultSetImpl(
ExecutorProvider executorProvider,
Supplier<ResultSet> memoizedDelegate,
int bufferSize,
boolean dummy) {
super(memoizedDelegate);
this.executorProvider = Preconditions.checkNotNull(executorProvider);
this.delegateResultSet = Preconditions.checkNotNull(delegate);
this.delegateResultSet = memoizedDelegate;
this.service = MoreExecutors.listeningDecorator(executorProvider.getExecutor());
this.buffer = new LinkedBlockingDeque<>(bufferSize);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class AsyncReadOnlyTransactionTest extends AbstractAsyncTransactionTest {

@Test
public void asyncReadOnlyTransactionIsNonBlocking() throws Exception {
// Warm up session pool to avoid CreateSession blocking when server is frozen.
try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) {
while (resultSet.next()) {}
}
mockSpanner.reset();

try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) {
mockSpanner.freeze();
// Call executeQueryAsync. It should not block even though mock server is
// frozen!
AsyncResultSet rs = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);

// Verify that no requests have been sent yet.
assertTrue(mockSpanner.getRequestTypes().isEmpty());

// Now register a callback to start the stream.
final CountDownLatch latch = new CountDownLatch(1);
rs.setCallback(
executor,
resultSet -> {
try {
AsyncResultSet.CursorState state;
while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {
// consume
}
if (state == AsyncResultSet.CursorState.DONE) {
latch.countDown();
}
return AsyncResultSet.CallbackResponse.CONTINUE;
} catch (Throwable t) {
latch.countDown();
return AsyncResultSet.CallbackResponse.DONE;
}
});

// Unfreeze the mock server so the background thread can proceed.
mockSpanner.unfreeze();

// Wait for the callback to complete.
assertTrue("Timeout waiting for callback", latch.await(10, TimeUnit.SECONDS));

// Verify that requests were sent on the background thread.
// It should contain one BeginTransaction and one ExecuteSql.
assertThat(mockSpanner.getRequestTypes())
.containsExactly(BeginTransactionRequest.class, ExecuteSqlRequest.class);
}
}

@Test
public void testMultipleQueriesOnlyCallsBeginTransactionOnce() throws Exception {
// Warm up session pool to avoid CreateSession blocking when server is frozen.
try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) {
while (resultSet.next()) {}
}
mockSpanner.reset();

try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) {
mockSpanner.freeze();
// Call executeQueryAsync twice.
AsyncResultSet rs1 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);
AsyncResultSet rs2 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT);

// Verify that no requests have been sent yet.
assertTrue(mockSpanner.getRequestTypes().isEmpty());

// Unfreeze the mock server.
mockSpanner.unfreeze();

// Now register callbacks to start the streams.
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);

rs1.setCallback(
executor,
resultSet -> {
try {
AsyncResultSet.CursorState state;
while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {}
if (state == AsyncResultSet.CursorState.DONE) {
latch1.countDown();
}
return AsyncResultSet.CallbackResponse.CONTINUE;
} catch (Throwable t) {
latch1.countDown();
return AsyncResultSet.CallbackResponse.DONE;
}
});

rs2.setCallback(
executor,
resultSet -> {
try {
AsyncResultSet.CursorState state;
while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {}
if (state == AsyncResultSet.CursorState.DONE) {
latch2.countDown();
}
return AsyncResultSet.CallbackResponse.CONTINUE;
} catch (Throwable t) {
latch2.countDown();
return AsyncResultSet.CallbackResponse.DONE;
}
});

// Wait for both callbacks to complete.
assertTrue("Timeout waiting for callback 1", latch1.await(10, TimeUnit.SECONDS));
assertTrue("Timeout waiting for callback 2", latch2.await(10, TimeUnit.SECONDS));

// Verify that requests were sent.
// It should contain one BeginTransaction and two ExecuteSql.
assertThat(mockSpanner.getRequestTypes())
.containsExactly(
BeginTransactionRequest.class, ExecuteSqlRequest.class, ExecuteSqlRequest.class);
}
}
}
Loading