Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
Expand Down Expand Up @@ -236,29 +237,31 @@ public boolean next() throws SQLException {
|| this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) {
/* Start of iteration or we have exhausted the current batch */
// Advance the cursor. Potentially blocking operation.
BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
if (batchWrapper.getException() != null) {
throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
}
if (batchWrapper.isLast()) {
/* Marks the end of the records */
if (this.vectorSchemaRoot != null) {
// IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
// the last batch
this.vectorSchemaRoot.clear();
try (Scope scope = makeOriginalContextCurrent()) {
BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
if (batchWrapper.getException() != null) {
throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
}
if (batchWrapper.isLast()) {
/* Marks the end of the records */
if (this.vectorSchemaRoot != null) {
// IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
// the last batch
this.vectorSchemaRoot.clear();
}
this.hasReachedEnd = true;
this.rowCount++;
return false;
}
this.hasReachedEnd = true;
// Valid batch, process it
ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
// Populates vectorSchemaRoot
this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
// Pointing to the first row in this fresh batch
this.currentBatchRowIndex = 0;
this.rowCount++;
return false;
return true;
}
// Valid batch, process it
ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
// Populates vectorSchemaRoot
this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
// Pointing to the first row in this fresh batch
this.currentBatchRowIndex = 0;
this.rowCount++;
return true;
}
// There are rows left in the current batch.
else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.google.cloud.bigquery.exception.BigQueryConversionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionNotFoundException;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
Expand Down Expand Up @@ -58,6 +62,7 @@ public abstract class BigQueryBaseResultSet extends BigQueryNoOpsResultSet
protected boolean isClosed = false;
protected boolean wasNull = false;
protected final BigQueryTypeCoercer bigQueryTypeCoercer = BigQueryTypeCoercionUtility.INSTANCE;
protected final SpanContext originalSpanContext;

protected BigQueryBaseResultSet(
BigQuery bigQuery, BigQueryStatement statement, Schema schema, boolean isNested) {
Expand All @@ -66,6 +71,11 @@ protected BigQueryBaseResultSet(
this.schema = schema;
this.schemaFieldList = schema != null ? schema.getFields() : null;
this.isNested = isNested;
this.originalSpanContext = Span.current().getSpanContext();
}

protected Scope makeOriginalContextCurrent() {
return Context.current().with(Span.wrap(this.originalSpanContext)).makeCurrent();
}

public QueryStatistics getQueryStatistics() {
Expand Down
Loading
Loading