diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
index 8ce9816fd1ac..27f5d8b3efeb 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
+++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
@@ -340,6 +340,11 @@
junit-platform-suite-engine
test
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java
index e6469cba9f69..60938102778d 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java
@@ -42,6 +42,7 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
+import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.StatusCode;
@@ -1731,32 +1732,52 @@ private ResultSet getTablesImpl(
"getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s",
effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types));
+ final Schema resultSchema = defineGetTablesSchema();
+ final BlockingQueue queue =
+ new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
+
+ Thread fetcherThread =
+ runGetTablesTaskAsync(
+ effectiveCatalog, effectiveSchemaPattern, tableNamePattern, types, resultSchema, queue);
+
+ BigQueryJsonResultSet resultSet =
+ BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
+
+ LOG.info("Started background thread for getTables");
+ return resultSet;
+ }
+
+ @VisibleForTesting
+ Thread runGetTablesTaskAsync(
+ String effectiveCatalog,
+ String effectiveSchemaPattern,
+ String tableNamePattern,
+ String[] types,
+ Schema resultSchema,
+ BlockingQueue queue)
+ throws SQLException {
+
final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern);
final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern);
final Set requestedTypes =
(types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types));
- final Schema resultSchema = defineGetTablesSchema();
final FieldList resultSchemaFields = resultSchema.getFields();
-
- final BlockingQueue queue =
- new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
final List collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = effectiveCatalog;
final String schemaParam = effectiveSchemaPattern;
-
- Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable tableFetcher =
() -> {
Span backgroundSpan =
- tracer
+ this.connection
+ .getTracer()
.spanBuilder("BigQueryDatabaseMetaData.getTables.background")
.setNoParent()
.addLink(parentSpanContext)
.startSpan();
- try (Scope backgroundScope = backgroundSpan.makeCurrent()) {
+ try (Scope scope = backgroundSpan.makeCurrent()) {
ExecutorService apiExecutor = null;
ExecutorService tableProcessorExecutor = null;
final FieldList localResultSchemaFields = resultSchemaFields;
@@ -1898,12 +1919,8 @@ private ResultSet getTablesImpl(
Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher);
Thread fetcherThread = new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog);
- BigQueryJsonResultSet resultSet =
- BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
-
fetcherThread.start();
- LOG.info("Started background thread for getTables");
- return resultSet;
+ return fetcherThread;
}
Schema defineGetTablesSchema() {
@@ -2127,24 +2144,51 @@ private ResultSet getColumnsImpl(
+ " columnNamePattern: %s",
effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern);
+ final Schema resultSchema = defineGetColumnsSchema();
+ final BlockingQueue queue =
+ new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
+
+ Thread fetcherThread =
+ runGetColumnsTaskAsync(
+ effectiveCatalog,
+ effectiveSchemaPattern,
+ tableNamePattern,
+ columnNamePattern,
+ resultSchema,
+ queue);
+
+ BigQueryJsonResultSet resultSet =
+ BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
+
+ LOG.info("Started background thread for getColumns");
+ return resultSet;
+ }
+
+ @VisibleForTesting
+ Thread runGetColumnsTaskAsync(
+ String effectiveCatalog,
+ String effectiveSchemaPattern,
+ String tableNamePattern,
+ String columnNamePattern,
+ Schema resultSchema,
+ BlockingQueue queue)
+ throws SQLException {
+
Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern);
Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern);
Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern);
- final Schema resultSchema = defineGetColumnsSchema();
final FieldList resultSchemaFields = resultSchema.getFields();
- final BlockingQueue queue =
- new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
final List collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = effectiveCatalog;
final String schemaParam = effectiveSchemaPattern;
- Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable columnFetcher =
() -> {
Span backgroundSpan =
- tracer
+ this.connection
+ .getTracer()
.spanBuilder("BigQueryDatabaseMetaData.getColumns.background")
.setNoParent()
.addLink(parentSpanContext)
@@ -2252,12 +2296,8 @@ private ResultSet getColumnsImpl(
Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher);
Thread fetcherThread =
new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog);
- BigQueryJsonResultSet resultSet =
- BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
-
fetcherThread.start();
- LOG.info("Started background thread for getColumns");
- return resultSet;
+ return fetcherThread;
}
private void processTableColumns(
@@ -2324,7 +2364,7 @@ private void processTableColumns(
}
}
- private Schema defineGetColumnsSchema() {
+ Schema defineGetColumnsSchema() {
List fields = new ArrayList<>(24);
fields.add(
Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING)
@@ -3690,27 +3730,44 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ
LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern);
- final Pattern schemaRegex = compileSqlLikePattern(schemaPattern);
final Schema resultSchema = defineGetSchemasSchema();
- final FieldList resultSchemaFields = resultSchema.getFields();
-
final BlockingQueue queue =
new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
+
+ Thread fetcherThread = runGetSchemasTaskAsync(catalog, schemaPattern, resultSchema, queue);
+
+ BigQueryJsonResultSet resultSet =
+ BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
+
+ LOG.info("Started background thread for getSchemas");
+ return resultSet;
+ }
+
+ @VisibleForTesting
+ Thread runGetSchemasTaskAsync(
+ String catalog,
+ String schemaPattern,
+ Schema resultSchema,
+ BlockingQueue queue)
+ throws SQLException {
+
+ final Pattern schemaRegex = compileSqlLikePattern(schemaPattern);
+ final FieldList resultSchemaFields = resultSchema.getFields();
final List collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = catalog;
- Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable schemaFetcher =
() -> {
Span backgroundSpan =
- tracer
+ this.connection
+ .getTracer()
.spanBuilder("BigQueryDatabaseMetaData.getSchemas.background")
.setNoParent()
.addLink(parentSpanContext)
.startSpan();
- try (Scope backgroundScope = backgroundSpan.makeCurrent()) {
+ try (Scope scope = backgroundSpan.makeCurrent()) {
final FieldList localResultSchemaFields = resultSchemaFields;
List projectsToScanList = new ArrayList<>();
@@ -3791,12 +3848,8 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ
Runnable wrappedFetcher = Context.current().wrap(schemaFetcher);
Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog);
- BigQueryJsonResultSet resultSet =
- BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
-
fetcherThread.start();
- LOG.info("Started background thread for getSchemas");
- return resultSet;
+ return fetcherThread;
}
Schema defineGetSchemasSchema() {
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
index a87ff6f3a5cb..011b385620d5 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
@@ -1356,7 +1356,9 @@ public int[] executeBatch() throws SQLException {
"BigQueryStatement.executeBatch",
(span) -> {
span.setAttribute("db.statement.count", this.batchQueries.size());
- span.setAttribute(AttributeKey.stringArrayKey("db.batch.statements"), this.batchQueries);
+ span.setAttribute(
+ AttributeKey.stringArrayKey("db.batch.statements"),
+ new ArrayList<>(this.batchQueries));
StringBuilder sb = new StringBuilder();
for (String query : this.batchQueries) {
@@ -1561,7 +1563,6 @@ private void fetchNextPages(
BlockingQueue> rpcResponseQueue,
BlockingQueue bigQueryFieldValueListWrapperBlockingQueue,
TableResult result) {
- Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
String currentPageToken = firstPageToken;
TableResult currentResults = result;
@@ -1571,6 +1572,7 @@ private void fetchNextPages(
}
try {
+ Tracer tracer = this.connection.getTracer();
while (currentPageToken != null) {
if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
LOG.warning("%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java
index 81fe6d4d13cb..c3fec1b4b5cc 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java
@@ -30,6 +30,12 @@
import com.google.api.gax.paging.Page;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.BigQuery.RoutineListOption;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.io.InputStream;
import java.sql.DatabaseMetaData;
@@ -39,16 +45,29 @@
import java.sql.Statement;
import java.sql.Types;
import java.util.*;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class BigQueryDatabaseMetaDataTest {
+ @RegisterExtension
+ public static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create();
+
private BigQueryConnection bigQueryConnection;
private BigQueryDatabaseMetaData dbMetadata;
private BigQuery bigqueryClient;
@@ -62,6 +81,20 @@ public void setUp() throws SQLException {
when(bigQueryConnection.getConnectionUrl()).thenReturn("jdbc:bigquery://test-project");
when(bigQueryConnection.getBigQuery()).thenReturn(bigqueryClient);
when(bigQueryConnection.createStatement()).thenReturn(mockStatement);
+ when(bigQueryConnection.getTracer())
+ .thenReturn(
+ otelTesting
+ .getOpenTelemetry()
+ .getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME));
+
+ Page datasetPageMock = mock(Page.class);
+ when(bigqueryClient.listDatasets(anyString(), any())).thenReturn(datasetPageMock);
+
+ Page tablePageMock = mock(Page.class);
+ when(bigqueryClient.listTables(any(DatasetId.class), any())).thenReturn(tablePageMock);
+
+ Table mockTable = mock(Table.class);
+ when(bigqueryClient.getTable(any(TableId.class))).thenReturn(mockTable);
dbMetadata = new BigQueryDatabaseMetaData(bigQueryConnection);
}
@@ -3206,4 +3239,96 @@ public void testSupportsResultSetConcurrency() throws SQLException {
public void testGetSQLStateType() throws SQLException {
assertEquals(DatabaseMetaData.sqlStateSQL, dbMetadata.getSQLStateType());
}
+
+ @ParameterizedTest
+ @MethodSource("metadataOperationProvider")
+ public void testMetadataOperation_generatesSpan(
+ MetadataOperation operation, String expectedSpanName) throws Exception {
+ operation.run();
+
+ SpanData span =
+ OpenTelemetryTestUtility.findSpanByName(otelTesting.getSpans(), expectedSpanName);
+ OpenTelemetryTestUtility.assertSpanStatus(span, StatusCode.UNSET);
+ }
+
+ @FunctionalInterface
+ interface MetadataOperation {
+ void run() throws SQLException;
+ }
+
+ Stream metadataOperationProvider() {
+ return Stream.of(
+ Arguments.of(
+ (MetadataOperation) () -> dbMetadata.getCatalogs(),
+ "BigQueryDatabaseMetaData.getCatalogs"),
+ Arguments.of(
+ (MetadataOperation) () -> dbMetadata.getSchemas("catalog", "schema"),
+ "BigQueryDatabaseMetaData.getSchemas"),
+ Arguments.of(
+ (MetadataOperation)
+ () -> dbMetadata.getTables("catalog", "schema", "table", new String[] {"TABLE"}),
+ "BigQueryDatabaseMetaData.getTables"),
+ Arguments.of(
+ (MetadataOperation) () -> dbMetadata.getColumns("catalog", "schema", "table", "column"),
+ "BigQueryDatabaseMetaData.getColumns"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("asyncMetadataOperationProvider")
+ public void testAsyncMetadataOperation_createsDetachedLinkedSpan(
+ AsyncMetadataOperation operation, String expectedSpanName) throws Exception {
+ BlockingQueue queue = new LinkedBlockingQueue<>();
+
+ Tracer testTracer = otelTesting.getOpenTelemetry().getTracer("test");
+ Span parentSpan = testTracer.spanBuilder("parent-span").startSpan();
+
+ try (Scope scope = parentSpan.makeCurrent()) {
+ Thread workerThread = operation.run(queue);
+
+ Assertions.assertNotNull(workerThread, "Worker thread should not be null");
+ workerThread.join();
+
+ OpenTelemetryTestUtility.assertSpanLinkedToParent(
+ otelTesting.getSpans(), expectedSpanName, parentSpan);
+ } finally {
+ parentSpan.end();
+ }
+ }
+
+ @FunctionalInterface
+ interface AsyncMetadataOperation {
+ Thread run(BlockingQueue queue) throws Exception;
+ }
+
+ Stream asyncMetadataOperationProvider() {
+ return Stream.of(
+ Arguments.of(
+ (AsyncMetadataOperation)
+ (q) ->
+ dbMetadata.runGetTablesTaskAsync(
+ "catalog",
+ "schema",
+ "table",
+ new String[] {"TABLE"},
+ dbMetadata.defineGetTablesSchema(),
+ q),
+ "BigQueryDatabaseMetaData.getTables.background"),
+ Arguments.of(
+ (AsyncMetadataOperation)
+ (q) ->
+ dbMetadata.runGetColumnsTaskAsync(
+ "catalog",
+ "schema",
+ "table",
+ "column",
+ dbMetadata.defineGetColumnsSchema(),
+ q),
+ "BigQueryDatabaseMetaData.getColumns.background"),
+ Arguments.of(
+ (AsyncMetadataOperation)
+ (q) ->
+ dbMetadata.runGetSchemasTaskAsync(
+ "catalog", "schema", dbMetadata.defineGetSchemasSchema(), q),
+ "BigQueryDatabaseMetaData.getSchemas.background"));
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java
index d7e7a1a6a52e..cffde5d4daec 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java
@@ -27,8 +27,10 @@
import static org.mockito.Mockito.verify;
import com.google.cloud.ServiceOptions;
+import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
+import com.google.cloud.bigquery.BigQuery.TableDataListOption;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
@@ -52,28 +54,47 @@
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
-import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.stream.Stream;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class BigQueryStatementTest {
+ @RegisterExtension
+ public static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create();
+
private BigQueryConnection bigQueryConnection;
private static final String PROJECT = "project";
@@ -125,10 +146,36 @@ private Job getJobMock(
return job;
}
+ private TableResult setupMockQueryResults(JobId jobId, StatementType type, Long affectedRows)
+ throws Exception {
+ doReturn(true).when(bigQueryConnection).getUseStatelessQueryMode();
+ TableResult tableResultMock = mock(TableResult.class);
+ doReturn(jobId).when(tableResultMock).getJobId();
+ doReturn(Schema.of()).when(tableResultMock).getSchema();
+ doReturn(tableResultMock)
+ .when(bigquery)
+ .queryWithTimeout(any(QueryJobConfiguration.class), any(), any());
+
+ Job jobMock = getJobMock(tableResultMock, null, type);
+ if (affectedRows != null) {
+ JobStatistics.QueryStatistics stats = (JobStatistics.QueryStatistics) jobMock.getStatistics();
+ doReturn(affectedRows).when(stats).getNumDmlAffectedRows();
+ }
+ doReturn(jobMock).when(bigquery).getJob(any(JobId.class));
+ doReturn(jobMock).when(jobMock).waitFor();
+
+ Job dryRunJobMock = getJobMock(null, null, type);
+ doReturn(dryRunJobMock).when(bigquery).create(any(JobInfo.class));
+ return tableResultMock;
+ }
+
@BeforeEach
public void setUp() throws IOException, SQLException {
bigQueryConnection = mock(BigQueryConnection.class);
- doReturn(OpenTelemetry.noop().getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))
+ doReturn(
+ otelTesting
+ .getOpenTelemetry()
+ .getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))
.when(bigQueryConnection)
.getTracer();
rpcFactoryMock = mock(BigQueryRpcFactory.class);
@@ -453,21 +500,11 @@ public void testCloseCancelsJob() throws SQLException, InterruptedException {
}
@Test
- public void testCancelWithJoblessQuery() throws SQLException, InterruptedException {
- doReturn(true).when(bigQueryConnection).getUseStatelessQueryMode();
+ public void testCancelWithJoblessQuery() throws Exception {
+ TableResult tableResultMock = setupMockQueryResults(null, StatementType.SELECT, null);
BigQueryStatement joblessStatement = new BigQueryStatement(bigQueryConnection);
BigQueryStatement joblessStatementSpy = Mockito.spy(joblessStatement);
- TableResult tableResultMock = mock(TableResult.class);
- doReturn(null).when(tableResultMock).getJobId();
-
- doReturn(tableResultMock)
- .when(bigquery)
- .queryWithTimeout(any(QueryJobConfiguration.class), any(), any());
-
- Job dryRunJobMock = getJobMock(null, null, StatementType.SELECT);
- doReturn(dryRunJobMock).when(bigquery).create(any(JobInfo.class));
-
BigQueryJsonResultSet resultSetMock = mock(BigQueryJsonResultSet.class);
doReturn(resultSetMock).when(joblessStatementSpy).processJsonResultSet(tableResultMock);
@@ -484,4 +521,115 @@ public void testCancelWithJoblessQuery() throws SQLException, InterruptedExcepti
// And no backend cancellation was attempted
verify(bigquery, Mockito.never()).cancel(any(JobId.class));
}
+
+ @Test
+ public void testFetchNextPages_addsLinkToParent() throws Exception {
+ Tracer testTracer = otelTesting.getOpenTelemetry().getTracer("test");
+ Span parentSpan = testTracer.spanBuilder("parent-span").startSpan();
+
+ try (Scope scope = parentSpan.makeCurrent()) {
+
+ BlockingQueue> rpcResponseQueue = new LinkedBlockingDeque<>();
+ BlockingQueue bigQueryFieldValueListWrapperBlockingQueue =
+ new LinkedBlockingDeque<>();
+ TableResult mockResult = mock(TableResult.class);
+ JobId mockJobId = JobId.of("job");
+
+ Job mockJob = mock(Job.class);
+ QueryJobConfiguration realConfig =
+ QueryJobConfiguration.newBuilder("SELECT 1")
+ .setDestinationTable(TableId.of("project", "dataset", "table"))
+ .build();
+ doReturn(mockJob).when(bigquery).getJob(any(JobId.class));
+ doReturn(realConfig).when(mockJob).getConfiguration();
+
+ TableResult mockNextResult = mock(TableResult.class);
+ doReturn(mockNextResult)
+ .when(bigquery)
+ .listTableData(
+ any(TableId.class), any(TableDataListOption.class), any(TableDataListOption.class));
+ doReturn(null).when(mockNextResult).getNextPageToken();
+
+ Thread workerThread =
+ bigQueryStatement.runNextPageTaskAsync(
+ mockResult,
+ "token",
+ mockJobId,
+ rpcResponseQueue,
+ bigQueryFieldValueListWrapperBlockingQueue);
+
+ Assertions.assertNotNull(workerThread, "Worker thread should not be null");
+ workerThread.join();
+
+ OpenTelemetryTestUtility.assertSpanLinkedToParent(
+ otelTesting.getSpans(), "BigQueryStatement.pagination", parentSpan);
+ } finally {
+ parentSpan.end();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("statementOperationProvider")
+ public void testExecuteOperation_generatesSpan(
+ StatementOperation operation,
+ String expectedSpanName,
+ StatementType type,
+ Map, Object> expectedAttributes)
+ throws Exception {
+ setupMockQueryResults(JobId.of("job"), type, 1L);
+ operation.run();
+
+ SpanData span =
+ OpenTelemetryTestUtility.findSpanByName(otelTesting.getSpans(), expectedSpanName);
+ OpenTelemetryTestUtility.assertSpanStatus(span, StatusCode.UNSET);
+
+ if (expectedAttributes != null) {
+ for (Map.Entry, Object> entry : expectedAttributes.entrySet()) {
+ OpenTelemetryTestUtility.assertSpanHasAttribute(
+ span, (AttributeKey