Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,37 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;

/**
* Row cursor over an {@link ArrowStreamReader} that drives the {@link DataCloudResultSet}.
*
* <p>The cursor owns the supplied {@link BufferAllocator} alongside the reader: closing the
* cursor closes the reader (which releases ArrowBuf accounting) and then the allocator (which
* returns its budget). This is the single place that guarantees root-allocator hygiene for the
* driver; callers of {@link DataCloudResultSet#of} hand ownership over and do not close the
* allocator themselves.
*/
@Slf4j
class ArrowStreamReaderCursor implements AutoCloseable {

private static final int INIT_ROW_NUMBER = -1;

private final ArrowStreamReader reader;
private final BufferAllocator allocator;
private final ZoneId sessionZone;

@lombok.Getter
private int rowsSeen = 0;

private final AtomicInteger currentIndex = new AtomicInteger(INIT_ROW_NUMBER);

ArrowStreamReaderCursor(ArrowStreamReader reader, ZoneId sessionZone) {
ArrowStreamReaderCursor(ArrowStreamReader reader, BufferAllocator allocator, ZoneId sessionZone) {
this.reader = reader;
this.allocator = allocator;
this.sessionZone = sessionZone;
}

Expand Down Expand Up @@ -91,6 +103,12 @@ public boolean next() {
@SneakyThrows
@Override
public void close() {
reader.close();
// Close the reader first: it releases the buffers accounted against the allocator, so the
// allocator's closing budget check passes. Reversing the order trips a leak detector.
try {
reader.close();
} finally {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestedtry { reader.close(); } finally { allocator.close(); } with no addSuppressed. The javadoc above says reader-first ordering is load-bearing because the allocator's leak detector trips when buffers are still outstanding — meaning the most diagnostically interesting failure mode is exactly the one where reader.close() throws and allocator.close() then throws on top, and the second exception masks the first via Java's standard try/finally semantics.

Same pattern in DataCloudResultSet.of's failure cleanup at line 96-103: arrowStream.getReader().close() is wrapped with addSuppressed, but the immediately-following arrowStream.getAllocator().close() is bare; if it throws (leak detector), it replaces the original construction SQLException rather than riding as suppressed.

Throwable primary = null;
try { reader.close(); } catch (Throwable t) { primary = t; }
try { allocator.close(); } catch (Throwable t) {
    if (primary != null) primary.addSuppressed(t); else primary = t;
}
if (primary != null) throw primary;

Generated by the review-pr-tavern skill — a human did not write this comment.

allocator.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.sql.Statement;
import java.sql.Struct;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -220,7 +221,7 @@ public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long
QueryResultArrowStream.OUTPUT_FORMAT);
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
iterator, connectionProperties.isIncludeCustomerDetailInReason(), queryId, null);
return StreamingResultSet.of(arrowStream, queryId);
return DataCloudResultSet.of(arrowStream, queryId, ZoneId.systemDefault());
} catch (StatusRuntimeException ex) {
throw QueryExceptionHandler.createException(
connectionProperties.isIncludeCustomerDetailInReason(), null, queryId, ex);
Expand Down Expand Up @@ -263,7 +264,7 @@ public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId, l
QueryResultArrowStream.OUTPUT_FORMAT);
val arrowStream = SQLExceptionQueryResultIterator.createSqlExceptionArrowStreamReader(
iterator, connectionProperties.isIncludeCustomerDetailInReason(), queryId, null);
return StreamingResultSet.of(arrowStream, queryId);
return DataCloudResultSet.of(arrowStream, queryId, ZoneId.systemDefault());
} catch (StatusRuntimeException ex) {
throw QueryExceptionHandler.createException(
connectionProperties.isIncludeCustomerDetailInReason(), null, queryId, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import com.google.common.collect.ImmutableList;
import com.salesforce.datacloud.jdbc.config.DriverVersion;
import com.salesforce.datacloud.jdbc.core.metadata.DataCloudResultSetMetaData;
import com.salesforce.datacloud.jdbc.core.metadata.MetadataResultSets;
import com.salesforce.datacloud.jdbc.core.types.HyperTypes;
import com.salesforce.datacloud.jdbc.util.JdbcURL;
import com.salesforce.datacloud.jdbc.util.ThrowingJdbcSupplier;
Expand Down Expand Up @@ -706,39 +706,39 @@ public ResultSet getColumns(String catalog, String schemaPattern, String tableNa
@Override
public ResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern)
throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
public ResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern)
throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
public ResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable)
throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
public ResultSet getVersionColumns(String catalog, String schema, String table) throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
public ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
public ResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
Expand All @@ -750,19 +750,18 @@ public ResultSet getCrossReference(
String foreignSchema,
String foreignTable)
throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
public ResultSet getTypeInfo() throws SQLException {
return DataCloudMetadataResultSet.of(
new DataCloudResultSetMetaData(MetadataSchemas.TYPE_INFO), HyperTypes.typeInfoRows());
return MetadataResultSets.ofRawRows(MetadataSchemas.TYPE_INFO, HyperTypes.typeInfoRows());
}

@Override
public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate)
throws SQLException {
return DataCloudMetadataResultSet.empty();
return MetadataResultSets.emptyNoColumns();
}

@Override
Expand Down

This file was deleted.

Loading
Loading