Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions java-bigquery/google-cloud-bigquery-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@
<artifactId>junit-platform-suite-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.StatusCode;
Expand Down Expand Up @@ -1731,32 +1732,52 @@ private ResultSet getTablesImpl(
"getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s",
effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types));

final Schema resultSchema = defineGetTablesSchema();
final BlockingQueue<BigQueryFieldValueListWrapper> queue =
new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);

Thread fetcherThread =
runGetTablesTaskAsync(
effectiveCatalog, effectiveSchemaPattern, tableNamePattern, types, resultSchema, queue);
Comment thread
keshavdandeva marked this conversation as resolved.

BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});

LOG.info("Started background thread for getTables");
return resultSet;
}

@VisibleForTesting
Thread runGetTablesTaskAsync(
String effectiveCatalog,
String effectiveSchemaPattern,
String tableNamePattern,
String[] types,
Schema resultSchema,
BlockingQueue<BigQueryFieldValueListWrapper> queue)
throws SQLException {

final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern);
final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern);
final Set<String> requestedTypes =
(types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types));

final Schema resultSchema = defineGetTablesSchema();
final FieldList resultSchemaFields = resultSchema.getFields();

final BlockingQueue<BigQueryFieldValueListWrapper> queue =
new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = effectiveCatalog;
final String schemaParam = effectiveSchemaPattern;

Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable tableFetcher =
() -> {
Span backgroundSpan =
tracer
this.connection
.getTracer()
.spanBuilder("BigQueryDatabaseMetaData.getTables.background")
.setNoParent()
.addLink(parentSpanContext)
.startSpan();

try (Scope backgroundScope = backgroundSpan.makeCurrent()) {
try (Scope scope = backgroundSpan.makeCurrent()) {
ExecutorService apiExecutor = null;
ExecutorService tableProcessorExecutor = null;
final FieldList localResultSchemaFields = resultSchemaFields;
Expand Down Expand Up @@ -1898,12 +1919,8 @@ private ResultSet getTablesImpl(

Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher);
Thread fetcherThread = new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});

fetcherThread.start();
LOG.info("Started background thread for getTables");
return resultSet;
return fetcherThread;
}

Schema defineGetTablesSchema() {
Expand Down Expand Up @@ -2127,24 +2144,51 @@ private ResultSet getColumnsImpl(
+ " columnNamePattern: %s",
effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern);

final Schema resultSchema = defineGetColumnsSchema();
final BlockingQueue<BigQueryFieldValueListWrapper> queue =
new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);

Thread fetcherThread =
runGetColumnsTaskAsync(
effectiveCatalog,
effectiveSchemaPattern,
tableNamePattern,
columnNamePattern,
resultSchema,
queue);

BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});

LOG.info("Started background thread for getColumns");
return resultSet;
}

@VisibleForTesting
Thread runGetColumnsTaskAsync(
String effectiveCatalog,
String effectiveSchemaPattern,
String tableNamePattern,
String columnNamePattern,
Schema resultSchema,
BlockingQueue<BigQueryFieldValueListWrapper> queue)
throws SQLException {

Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern);
Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern);
Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern);

final Schema resultSchema = defineGetColumnsSchema();
final FieldList resultSchemaFields = resultSchema.getFields();
final BlockingQueue<BigQueryFieldValueListWrapper> queue =
new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = effectiveCatalog;
final String schemaParam = effectiveSchemaPattern;

Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable columnFetcher =
() -> {
Span backgroundSpan =
tracer
this.connection
.getTracer()
.spanBuilder("BigQueryDatabaseMetaData.getColumns.background")
.setNoParent()
.addLink(parentSpanContext)
Expand Down Expand Up @@ -2252,12 +2296,8 @@ private ResultSet getColumnsImpl(
Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher);
Thread fetcherThread =
new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});

fetcherThread.start();
LOG.info("Started background thread for getColumns");
return resultSet;
return fetcherThread;
}

private void processTableColumns(
Expand Down Expand Up @@ -2324,7 +2364,7 @@ private void processTableColumns(
}
}

private Schema defineGetColumnsSchema() {
Schema defineGetColumnsSchema() {
List<Field> fields = new ArrayList<>(24);
fields.add(
Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING)
Expand Down Expand Up @@ -3690,27 +3730,44 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ

LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern);

final Pattern schemaRegex = compileSqlLikePattern(schemaPattern);
final Schema resultSchema = defineGetSchemasSchema();
final FieldList resultSchemaFields = resultSchema.getFields();

final BlockingQueue<BigQueryFieldValueListWrapper> queue =
new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);

Thread fetcherThread = runGetSchemasTaskAsync(catalog, schemaPattern, resultSchema, queue);

BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});

LOG.info("Started background thread for getSchemas");
return resultSet;
}

@VisibleForTesting
Thread runGetSchemasTaskAsync(
String catalog,
String schemaPattern,
Schema resultSchema,
BlockingQueue<BigQueryFieldValueListWrapper> queue)
throws SQLException {

final Pattern schemaRegex = compileSqlLikePattern(schemaPattern);
final FieldList resultSchemaFields = resultSchema.getFields();
final List<FieldValueList> collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = catalog;

Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable schemaFetcher =
() -> {
Span backgroundSpan =
tracer
this.connection
.getTracer()
.spanBuilder("BigQueryDatabaseMetaData.getSchemas.background")
.setNoParent()
.addLink(parentSpanContext)
.startSpan();

try (Scope backgroundScope = backgroundSpan.makeCurrent()) {
try (Scope scope = backgroundSpan.makeCurrent()) {
final FieldList localResultSchemaFields = resultSchemaFields;
List<String> projectsToScanList = new ArrayList<>();

Expand Down Expand Up @@ -3791,12 +3848,8 @@ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQ

Runnable wrappedFetcher = Context.current().wrap(schemaFetcher);
Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});

fetcherThread.start();
LOG.info("Started background thread for getSchemas");
return resultSet;
return fetcherThread;
}

Schema defineGetSchemasSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,9 @@ public int[] executeBatch() throws SQLException {
"BigQueryStatement.executeBatch",
(span) -> {
span.setAttribute("db.statement.count", this.batchQueries.size());
span.setAttribute(AttributeKey.stringArrayKey("db.batch.statements"), this.batchQueries);
span.setAttribute(
AttributeKey.stringArrayKey("db.batch.statements"),
new ArrayList<>(this.batchQueries));

StringBuilder sb = new StringBuilder();
for (String query : this.batchQueries) {
Expand Down Expand Up @@ -1561,7 +1563,6 @@ private void fetchNextPages(
BlockingQueue<Tuple<TableResult, Boolean>> rpcResponseQueue,
BlockingQueue<BigQueryFieldValueListWrapper> bigQueryFieldValueListWrapperBlockingQueue,
TableResult result) {
Tracer tracer = this.connection.getTracer();
SpanContext parentSpanContext = Span.current().getSpanContext();
Comment thread
keshavdandeva marked this conversation as resolved.
String currentPageToken = firstPageToken;
TableResult currentResults = result;
Expand All @@ -1571,6 +1572,7 @@ private void fetchNextPages(
}

try {
Tracer tracer = this.connection.getTracer();
while (currentPageToken != null) {
if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
LOG.warning("%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
Expand Down
Loading
Loading