From 11258ba8718ad05da2b109f2ae71335529c5261a Mon Sep 17 00:00:00 2001 From: Kirill Logachev Date: Fri, 27 Mar 2026 03:39:30 +0000 Subject: [PATCH 1/4] feat(bq jdbc): run getStatementType in parallel --- .../bigquery/jdbc/BigQueryConnection.java | 16 +++++++++ .../bigquery/jdbc/BigQueryJdbcUrlUtility.java | 8 +++++ .../bigquery/jdbc/BigQueryStatement.java | 34 +++++++++++-------- .../cloud/bigquery/jdbc/DataSource.java | 19 +++++++++++ .../bigquery/jdbc/BigQueryConnectionTest.java | 32 +++++++++++++++++ .../bigquery/jdbc/BigQueryStatementTest.java | 27 ++++++++++++--- 6 files changed, 118 insertions(+), 18 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 17471e252205..b2e73129bf0c 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -60,6 +60,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -138,6 +140,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Long connectionPoolSize; Long listenerPoolSize; String partnerToken; + private int queryTaskThreadCount; + private ExecutorService queryTaskExecutor; BigQueryConnection(String url) throws IOException { this(url, DataSource.fromUrl(url)); @@ -238,6 +242,10 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset(); this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope(); this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount(); + this.queryTaskThreadCount = ds.getQueryTaskThreadCount(); + this.queryTaskExecutor = + Executors.newFixedThreadPool( + this.queryTaskThreadCount, new BigQueryThreadFactory("BigQuery-query-task-")); this.requestReason = ds.getRequestReason(); this.connectionPoolSize = ds.getConnectionPoolSize(); this.listenerPoolSize = ds.getListenerPoolSize(); @@ -596,6 +604,10 @@ int getMetadataFetchThreadCount() { return this.metadataFetchThreadCount; } + public ExecutorService getQueryTaskExecutor() { + return this.queryTaskExecutor; + } + boolean isEnableWriteAPI() { return enableWriteAPI; } @@ -836,6 +848,10 @@ public void close() throws SQLException { statement.close(); } this.openStatements.clear(); + + if (this.queryTaskExecutor != null) { + this.queryTaskExecutor.shutdown(); + } } catch (ConcurrentModificationException ex) { throw new BigQueryJdbcException(ex); } catch (InterruptedException e) { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java index 5b89cf27eecf..11cb8c7f2345 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java @@ -142,6 +142,8 @@ protected boolean removeEldestEntry(Map.Entry> eldes Pattern.CASE_INSENSITIVE); static final String METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME = "MetaDataFetchThreadCount"; static final int DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE = 32; + static final String QUERY_TASK_THREAD_COUNT_PROPERTY_NAME = "QueryTaskThreadCount"; + static final int DEFAULT_QUERY_TASK_THREAD_COUNT_VALUE = 16; static final String RETRY_TIMEOUT_IN_SECS_PROPERTY_NAME = "Timeout"; static final long DEFAULT_RETRY_TIMEOUT_IN_SECS_VALUE = 0L; static final String JOB_TIMEOUT_PROPERTY_NAME = "JobTimeout"; @@ -535,6 +537,12 @@ protected boolean removeEldestEntry(Map.Entry> eldes "The number of threads used to call a DatabaseMetaData method.") .setDefaultValue(String.valueOf(DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE)) .build(), + BigQueryConnectionProperty.newBuilder() + .setName(QUERY_TASK_THREAD_COUNT_PROPERTY_NAME) + .setDescription( + "The number of background threads used for executing queries parallel tasks.") + .setDefaultValue(String.valueOf(DEFAULT_QUERY_TASK_THREAD_COUNT_VALUE)) + .build(), BigQueryConnectionProperty.newBuilder() .setName(ENABLE_WRITE_API_PROPERTY_NAME) .setDescription( diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index ca579d1d0c1b..d829aedc4d7a 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -72,8 +72,8 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadFactory; import java.util.logging.Level; @@ -88,9 +88,6 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { // TODO (obada): Update this after benchmarking - private static final int MAX_PROCESS_QUERY_THREADS_CNT = 50; - protected static ExecutorService queryTaskExecutor = - Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); private static final String DEFAULT_DATASET_NAME = "_google_jdbc"; private static final String DEFAULT_TABLE_NAME = "temp_table_"; @@ -594,15 +591,20 @@ void runQuery(String query, QueryJobConfiguration jobConfiguration) try { resetStatementFields(); + + final QueryJobConfiguration finalJobConfiguration = jobConfiguration; + Future statementTypeFuture = + connection.getQueryTaskExecutor().submit(() -> getStatementType(finalJobConfiguration)); + ExecuteResult executeResult = executeJob(jobConfiguration); - StatementType statementType = - executeResult.job == null - ? getStatementType(jobConfiguration) - : ((QueryStatistics) executeResult.job.getStatistics()).getStatementType(); + + StatementType statementType = statementTypeFuture.get(); SqlType queryType = getQueryType(jobConfiguration, statementType); handleQueryResult(query, executeResult.tableResult, queryType); } catch (InterruptedException ex) { throw new BigQueryJdbcRuntimeException(ex); + } catch (ExecutionException e) { + throw new BigQueryJdbcException(e.getCause()); } catch (BigQueryException ex) { if (ex.getMessage().contains("Syntax error")) { throw new BigQueryJdbcSqlSyntaxErrorException(ex); @@ -829,7 +831,8 @@ Thread populateArrowBufferedQueue( com.google.api.gax.rpc.ServerStream stream = bqReadClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() + || connection.getQueryTaskExecutor().isShutdown()) { break; } @@ -1042,7 +1045,8 @@ Thread runNextPageTaskAsync( try { while (currentPageToken != null) { // do not process further pages and shutdown - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() + || connection.getQueryTaskExecutor().isShutdown()) { LOG.warning( "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); break; @@ -1073,7 +1077,8 @@ Thread runNextPageTaskAsync( // completes Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false)); } - // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // We cannot do connection.getQueryTaskExecutor().shutdownNow() here as populate buffer + // method may not // have finished processing the records and even that will be interrupted }; @@ -1117,7 +1122,7 @@ Thread parseAndPopulateRpcDataAsync( } if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() + || connection.getQueryTaskExecutor().isShutdown() || fieldValueLists == null) { // do not process further pages and shutdown (outerloop) break; @@ -1127,7 +1132,8 @@ Thread parseAndPopulateRpcDataAsync( long results = 0; for (FieldValueList fieldValueList : fieldValueLists) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + if (Thread.currentThread().isInterrupted() + || connection.getQueryTaskExecutor().isShutdown()) { // do not process further pages and shutdown (inner loop) break; } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java index 681595f8b05c..b37bbb454daa 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java @@ -84,6 +84,7 @@ public class DataSource implements javax.sql.DataSource { private Boolean filterTablesOnDefaultDataset; private Integer requestGoogleDriveScope; private Integer metadataFetchThreadCount; + private Integer queryTaskThreadCount; private String sslTrustStorePath; private String sslTrustStorePassword; private Map labels; @@ -240,6 +241,9 @@ public class DataSource implements javax.sql.DataSource { .put( BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME, (ds, val) -> ds.setMetadataFetchThreadCount(Integer.parseInt(val))) + .put( + BigQueryJdbcUrlUtility.QUERY_TASK_THREAD_COUNT_PROPERTY_NAME, + (ds, val) -> ds.setQueryTaskThreadCount(Integer.parseInt(val))) .put( BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME, DataSource::setSSLTrustStorePath) @@ -546,6 +550,11 @@ private Properties createProperties() { BigQueryJdbcUrlUtility.METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME, String.valueOf(this.metadataFetchThreadCount)); } + if (this.queryTaskThreadCount != null) { + connectionProperties.setProperty( + BigQueryJdbcUrlUtility.QUERY_TASK_THREAD_COUNT_PROPERTY_NAME, + String.valueOf(this.queryTaskThreadCount)); + } if (this.sslTrustStorePath != null) { connectionProperties.setProperty( BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME, @@ -1024,6 +1033,16 @@ public void setMetadataFetchThreadCount(Integer metadataFetchThreadCount) { this.metadataFetchThreadCount = metadataFetchThreadCount; } + public Integer getQueryTaskThreadCount() { + return queryTaskThreadCount != null + ? queryTaskThreadCount + : BigQueryJdbcUrlUtility.DEFAULT_QUERY_TASK_THREAD_COUNT_VALUE; + } + + public void setQueryTaskThreadCount(Integer queryTaskThreadCount) { + this.queryTaskThreadCount = queryTaskThreadCount; + } + public String getSSLTrustStorePath() { return sslTrustStorePath; } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java index 4430ba4cf315..f271eba3ba9c 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java @@ -363,6 +363,38 @@ public void testMetaDataFetchThreadCountProperty() throws SQLException, IOExcept } } + @Test + public void testQueryTaskThreadCountProperty() throws SQLException, IOException { + // Test Case 1: Should use the default value when the property is not provided. + String urlDefault = + "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;" + + "OAuthType=2;ProjectId=MyBigQueryProject;" + + "OAuthAccessToken=redactedToken;OAuthClientId=redactedToken;" + + "OAuthClientSecret=redactedToken;"; + try (BigQueryConnection connectionDefault = new BigQueryConnection(urlDefault)) { + assertEquals( + 4, + ((java.util.concurrent.ThreadPoolExecutor) connectionDefault.getQueryTaskExecutor()) + .getCorePoolSize(), + "Should use the default value of 4 when the property is not provided"); + } + + // Test Case 2: Should use the custom value when a valid integer is provided. + String urlCustom = + "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;" + + "OAuthType=2;ProjectId=MyBigQueryProject;" + + "OAuthAccessToken=redactedToken;OAuthClientId=redactedToken;" + + "OAuthClientSecret=redactedToken;" + + "QueryTaskThreadCount=16;"; + try (BigQueryConnection connectionCustom = new BigQueryConnection(urlCustom)) { + assertEquals( + 16, + ((java.util.concurrent.ThreadPoolExecutor) connectionCustom.getQueryTaskExecutor()) + .getCorePoolSize(), + "Should use the custom value when a valid integer is provided"); + } + } + @Test public void testBigQueryReadClientKeepAliveSettings() throws SQLException, IOException { String url = diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java index f3971bd71150..079ea1c7d5bd 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java @@ -60,11 +60,14 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -84,6 +87,8 @@ public class BigQueryStatementTest { private BigQueryStatement bigQueryStatement; + private ExecutorService queryTaskExecutor; + private final String query = "select * from test"; private final String jobIdVal = UUID.randomUUID().toString(); @@ -126,6 +131,7 @@ private Job getJobMock( @BeforeEach public void setUp() throws IOException, SQLException { + queryTaskExecutor = Executors.newFixedThreadPool(1); bigQueryConnection = mock(BigQueryConnection.class); rpcFactoryMock = mock(BigQueryRpcFactory.class); bigquery = mock(BigQuery.class); @@ -133,6 +139,8 @@ public void setUp() throws IOException, SQLException { storageReadClient = mock(BigQueryReadClient.class); jobId = JobId.newBuilder().setJob(jobIdVal).build(); + doReturn(queryTaskExecutor).when(bigQueryConnection).getQueryTaskExecutor(); + doReturn(bigquery).when(bigQueryConnection).getBigQuery(); doReturn(10L).when(bigQueryConnection).getJobTimeoutInSeconds(); doReturn(10L).when(bigQueryConnection).getMaxBytesBilled(); @@ -148,7 +156,13 @@ public void setUp() throws IOException, SQLException { .setSerializedSchema(serializeSchema(vectorSchemaRoot.getSchema())) .build(); // bigQueryConnection.addOpenStatements(bigQueryStatement); + } + @AfterEach + public void tearDown() { + if (queryTaskExecutor != null) { + queryTaskExecutor.shutdown(); + } } private VectorSchemaRoot getTestVectorSchemaRoot() { @@ -303,8 +317,13 @@ public void setQueryTimeoutTest() throws Exception { ArgumentCaptor captor = ArgumentCaptor.forClass(JobInfo.class); bigQueryStatementSpy.runQuery(query, jobConfiguration); - verify(bigquery).create(captor.capture()); - QueryJobConfiguration jobConfig = captor.getValue().getConfiguration(); + verify(bigquery, Mockito.times(2)).create(captor.capture()); + QueryJobConfiguration jobConfig = + captor.getAllValues().stream() + .map(jobInfo -> (QueryJobConfiguration) jobInfo.getConfiguration()) + .filter(config -> config.dryRun() == null || !config.dryRun()) + .findFirst() + .get(); assertEquals(3000L, jobConfig.getJobTimeoutMs().longValue()); } @@ -401,10 +420,10 @@ public void testJoblessQuery() throws SQLException, InterruptedException { jobfulStatementSpy.executeQuery("SELECT 1"); - verify(bigquery).create(any(JobInfo.class)); + verify(bigquery, Mockito.times(2)).create(any(JobInfo.class)); assertTrue( jobfulCaptor.getAllValues().stream() - .noneMatch( + .anyMatch( jobInfo -> Boolean.TRUE.equals( ((QueryJobConfiguration) jobInfo.getConfiguration()).dryRun()))); From 003d7f900fa25b1c884185507c58b2b03c1d15f8 Mon Sep 17 00:00:00 2001 From: Kirill Logachev Date: Fri, 27 Mar 2026 03:44:15 +0000 Subject: [PATCH 2/4] lint --- .../google/cloud/bigquery/jdbc/BigQueryStatement.java | 2 -- .../cloud/bigquery/jdbc/BigQueryConnectionTest.java | 9 ++++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index d829aedc4d7a..06f1d0ea7479 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -86,8 +86,6 @@ * @see ResultSet */ public class BigQueryStatement extends BigQueryNoOpsStatement { - - // TODO (obada): Update this after benchmarking private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); private static final String DEFAULT_DATASET_NAME = "_google_jdbc"; private static final String DEFAULT_TABLE_NAME = "temp_table_"; diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java index f271eba3ba9c..7475bfe7cdb1 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryConnectionTest.java @@ -30,6 +30,7 @@ import java.io.InputStream; import java.sql.SQLException; import java.util.Properties; +import java.util.concurrent.ThreadPoolExecutor; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -373,9 +374,8 @@ public void testQueryTaskThreadCountProperty() throws SQLException, IOException + "OAuthClientSecret=redactedToken;"; try (BigQueryConnection connectionDefault = new BigQueryConnection(urlDefault)) { assertEquals( - 4, - ((java.util.concurrent.ThreadPoolExecutor) connectionDefault.getQueryTaskExecutor()) - .getCorePoolSize(), + 16, + ((ThreadPoolExecutor) connectionDefault.getQueryTaskExecutor()).getCorePoolSize(), "Should use the default value of 4 when the property is not provided"); } @@ -389,8 +389,7 @@ public void testQueryTaskThreadCountProperty() throws SQLException, IOException try (BigQueryConnection connectionCustom = new BigQueryConnection(urlCustom)) { assertEquals( 16, - ((java.util.concurrent.ThreadPoolExecutor) connectionCustom.getQueryTaskExecutor()) - .getCorePoolSize(), + ((ThreadPoolExecutor) connectionCustom.getQueryTaskExecutor()).getCorePoolSize(), "Should use the custom value when a valid integer is provided"); } } From ec0c417e6424e26fe739be713312866e8cff8d7c Mon Sep 17 00:00:00 2001 From: Kirill Logachev Date: Wed, 6 May 2026 23:04:35 +0000 Subject: [PATCH 3/4] fix merge --- .../bigquery/jdbc/BigQueryConnection.java | 69 +++---------------- 1 file changed, 8 insertions(+), 61 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 9d766e122e2d..382defc9e5cb 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -142,6 +142,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Long connectionPoolSize; Long listenerPoolSize; String partnerToken; + DatabaseMetaData databaseMetaData; + Boolean reqGoogleDriveScope; + private boolean isReadOnlyTokenUsed = false; private int queryTaskThreadCount; private ExecutorService queryTaskExecutor; @@ -269,68 +272,12 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.headerProvider = createHeaderProvider(); this.bigQuery = getBigQueryConnection(); + + this.queryTaskThreadCount = ds.getQueryTaskThreadCount(); + this.queryTaskExecutor = + Executors.newFixedThreadPool( + this.queryTaskThreadCount, new BigQueryThreadFactory("BigQuery-query-task-")); } - this.location = ds.getLocation(); - this.enableHighThroughputAPI = ds.getEnableHighThroughputAPI(); - this.highThroughputMinTableSize = ds.getHighThroughputMinTableSize(); - this.highThroughputActivationRatio = ds.getHighThroughputActivationRatio(); - this.useQueryCache = ds.getUseQueryCache(); - this.useStatelessQueryMode = ds.getUseStatelessQueryMode(); - - this.queryDialect = ds.getQueryDialect(); - this.allowLargeResults = ds.getAllowLargeResults(); - this.destinationTable = ds.getDestinationTable(); - this.destinationDataset = ds.getDestinationDataset(); - this.destinationDatasetExpirationTime = ds.getDestinationDatasetExpirationTime(); - this.kmsKeyName = ds.getKmsKeyName(); - Map proxyProperties = - BigQueryJdbcProxyUtility.parseProxyProperties(ds, this.connectionClassName); - - this.sslTrustStorePath = ds.getSSLTrustStorePath(); - this.sslTrustStorePassword = ds.getSSLTrustStorePassword(); - this.httpConnectTimeout = ds.getHttpConnectTimeout(); - this.httpReadTimeout = ds.getHttpReadTimeout(); - - this.httpTransportOptions = - BigQueryJdbcProxyUtility.getHttpTransportOptions( - proxyProperties, - this.sslTrustStorePath, - this.sslTrustStorePassword, - this.httpConnectTimeout, - this.httpReadTimeout, - this.connectionClassName); - this.transportChannelProvider = - BigQueryJdbcProxyUtility.getTransportChannelProvider( - proxyProperties, - this.sslTrustStorePath, - this.sslTrustStorePassword, - this.connectionClassName); - this.enableSession = ds.getEnableSession(); - this.unsupportedHTAPIFallback = ds.getUnsupportedHTAPIFallback(); - this.maxResults = ds.getMaxResults(); - Map queryPropertiesMap = ds.getQueryProperties(); - this.sessionInfoConnectionProperty = getSessionPropertyFromQueryProperties(queryPropertiesMap); - this.queryProperties = convertMapToConnectionPropertiesList(queryPropertiesMap); - this.enableWriteAPI = ds.getEnableWriteAPI(); - this.writeAPIActivationRowCount = ds.getSwaActivationRowCount(); - this.writeAPIAppendRowCount = ds.getSwaAppendRowCount(); - - this.additionalProjects = ds.getAdditionalProjects(); - - this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset(); - this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope(); - this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount(); - this.queryTaskThreadCount = ds.getQueryTaskThreadCount(); - this.queryTaskExecutor = - Executors.newFixedThreadPool( - this.queryTaskThreadCount, new BigQueryThreadFactory("BigQuery-query-task-")); - this.requestReason = ds.getRequestReason(); - this.connectionPoolSize = ds.getConnectionPoolSize(); - this.listenerPoolSize = ds.getListenerPoolSize(); - this.partnerToken = ds.getPartnerToken(); - - this.headerProvider = createHeaderProvider(); - this.bigQuery = getBigQueryConnection(); } String getLibraryVersion(Class libraryClass) { From 25462a8d9f23116135ee5dcbdce8bf29b30bcf38 Mon Sep 17 00:00:00 2001 From: Kirill Logachev Date: Wed, 6 May 2026 23:23:16 +0000 Subject: [PATCH 4/4] fix tests --- .../bigquery/jdbc/BigQueryStatementTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java index 384c52a1f18b..daeee2a134ea 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java @@ -229,6 +229,7 @@ public void testExecSlowQueryPath() throws SQLException, InterruptedException { Job job = getJobMock(tableResult, queryJobConfiguration, StatementType.SELECT); doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any()); + doReturn(job).when(bigquery).create(any(JobInfo.class)); doReturn(jobIdWrapper) .when(bigQueryStatementSpy) @@ -314,19 +315,16 @@ public void setQueryTimeoutTest() throws Exception { Job job = getJobMock(result, jobConfiguration, StatementType.SELECT); doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any()); + doReturn(job).when(bigquery).create(any(JobInfo.class)); doReturn(jsonResultSet).when(bigQueryStatementSpy).processJsonResultSet(result); - ArgumentCaptor captor = + ArgumentCaptor queryCaptor = ArgumentCaptor.forClass(QueryJobConfiguration.class); bigQueryStatementSpy.runQuery(query, jobConfiguration); - verify(bigquery, Mockito.times(2)).create(captor.capture()); - QueryJobConfiguration jobConfig = - captor.getAllValues().stream() - .map(jobInfo -> (QueryJobConfiguration) jobInfo.getConfiguration()) - .filter(config -> config.dryRun() == null || !config.dryRun()) - .findFirst() - .get(); + verify(bigquery, Mockito.times(1)).create(any(JobInfo.class)); + verify(bigquery, Mockito.times(1)).queryWithTimeout(queryCaptor.capture(), any(), any()); + QueryJobConfiguration jobConfig = queryCaptor.getValue(); assertEquals(3000L, jobConfig.getJobTimeoutMs().longValue()); } @@ -418,20 +416,22 @@ public void testJoblessQuery() throws SQLException, InterruptedException { doReturn(jobMock) .when(bigquery) .queryWithTimeout(any(QueryJobConfiguration.class), any(), any()); + doReturn(jobMock).when(bigquery).create(any(JobInfo.class)); doReturn(mock(BigQueryJsonResultSet.class)) .when(jobfulStatementSpy) .processJsonResultSet(tableResultJobfulMock); jobfulStatementSpy.executeQuery("SELECT 1"); - verify(bigquery, Mockito.times(2)).create(any(JobInfo.class)); + ArgumentCaptor jobfulCaptor = ArgumentCaptor.forClass(JobInfo.class); + verify(bigquery, Mockito.times(1)).create(jobfulCaptor.capture()); assertTrue( jobfulCaptor.getAllValues().stream() .anyMatch( jobInfo -> Boolean.TRUE.equals( ((QueryJobConfiguration) jobInfo.getConfiguration()).dryRun()))); - verify(bigquery, Mockito.never()) + verify(bigquery, Mockito.times(1)) .queryWithTimeout(any(QueryJobConfiguration.class), any(), any()); }