Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
Expand Down Expand Up @@ -213,66 +216,68 @@ public void close() {
@Override
public boolean next() throws SQLException {
checkClosed();
if (this.isNested) {
if (this.currentNestedBatch == null || this.currentNestedBatch.getNestedRecords() == null) {
throw new IllegalStateException(
"currentNestedBatch/JsonStringArrayList can not be null working with the nested record");
}
if (this.nestedRowIndex < (this.toIndexExclusive - 1)) {
/* Check if there's a next record in the array which can be read */
this.nestedRowIndex++;
return true;
}
this.afterLast = true;
return false;
} else {
/* Non nested */
if (this.hasReachedEnd || this.isLast()) {
try (Scope scope = Context.current().with(Span.wrap(originalSpanContext)).makeCurrent()) {
Comment thread
keshavdandeva marked this conversation as resolved.
Outdated
if (this.isNested) {
if (this.currentNestedBatch == null || this.currentNestedBatch.getNestedRecords() == null) {
throw new IllegalStateException(
"currentNestedBatch/JsonStringArrayList can not be null working with the nested record");
}
if (this.nestedRowIndex < (this.toIndexExclusive - 1)) {
/* Check if there's a next record in the array which can be read */
this.nestedRowIndex++;
return true;
}
this.afterLast = true;
return false;
}
try {
if (this.currentBatchRowIndex == -1
|| this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) {
/* Start of iteration or we have exhausted the current batch */
// Advance the cursor. Potentially blocking operation.
BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
if (batchWrapper.getException() != null) {
throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
}
if (batchWrapper.isLast()) {
/* Marks the end of the records */
if (this.vectorSchemaRoot != null) {
// IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
// the last batch
this.vectorSchemaRoot.clear();
} else {
/* Non nested */
if (this.hasReachedEnd || this.isLast()) {
this.afterLast = true;
return false;
}
try {
if (this.currentBatchRowIndex == -1
|| this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) {
/* Start of iteration or we have exhausted the current batch */
// Advance the cursor. Potentially blocking operation.
BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
if (batchWrapper.getException() != null) {
throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
}
if (batchWrapper.isLast()) {
/* Marks the end of the records */
if (this.vectorSchemaRoot != null) {
// IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
// the last batch
this.vectorSchemaRoot.clear();
}
this.hasReachedEnd = true;
this.rowCount++;
return false;
}
this.hasReachedEnd = true;
// Valid batch, process it
ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
// Populates vectorSchemaRoot
this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
// Pointing to the first row in this fresh batch
this.currentBatchRowIndex = 0;
this.rowCount++;
return false;
return true;
}
// Valid batch, process it
ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
// Populates vectorSchemaRoot
this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
// Pointing to the first row in this fresh batch
this.currentBatchRowIndex = 0;
this.rowCount++;
return true;
}
// There are rows left in the current batch.
else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) {
this.currentBatchRowIndex++;
this.rowCount++;
return true;
// There are rows left in the current batch.
else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) {
this.currentBatchRowIndex++;
this.rowCount++;
return true;
}
} catch (InterruptedException | SQLException ex) {
throw new BigQueryJdbcException(
"Error occurred while advancing the cursor. This could happen when connection is closed while the next method is being called.",
ex);
}
} catch (InterruptedException | SQLException ex) {
throw new BigQueryJdbcException(
"Error occurred while advancing the cursor. This could happen when connection is closed while the next method is being called.",
ex);
}
return false;
}
return false;
}

private Object getObjectInternal(int columnIndex) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.cloud.bigquery.exception.BigQueryConversionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionNotFoundException;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
Expand Down Expand Up @@ -58,6 +60,7 @@ public abstract class BigQueryBaseResultSet extends BigQueryNoOpsResultSet
protected boolean isClosed = false;
protected boolean wasNull = false;
protected final BigQueryTypeCoercer bigQueryTypeCoercer = BigQueryTypeCoercionUtility.INSTANCE;
protected final SpanContext originalSpanContext;

protected BigQueryBaseResultSet(
BigQuery bigQuery, BigQueryStatement statement, Schema schema, boolean isNested) {
Expand All @@ -66,6 +69,7 @@ protected BigQueryBaseResultSet(
this.schema = schema;
this.schemaFieldList = schema != null ? schema.getFields() : null;
this.isNested = isNested;
this.originalSpanContext = Span.current().getSpanContext();
}

public QueryStatistics getQueryStatistics() {
Expand Down
Loading
Loading