Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

New features:

- Added `ArrowStreamBatch.Reset()` method that closes any existing stream and clears the cached reader, allowing callers to retry `GetStream` after a mid-stream failure (e.g. TCP RST) without re-executing the entire query. Inline (RowSetBase64) batches are restored from cached bytes on reset.

Bug fixes:

- Improved error message when `Host` is incorrectly configured with a URL scheme (e.g. `https://myorg-myaccount.snowflakecomputing.com`), previously this produced a cryptic `260004: failed to parse a port number` error (snowflakedb/gosnowflake#1784).
Expand Down
39 changes: 31 additions & 8 deletions arrow_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,43 @@ type QueryResultFormatProvider interface {
// QueryResultFormat: Arrow IPC record batches (use ipc.NewReader) when
// the format is "arrow", or JSON (row fragments) when it is "json".
type ArrowStreamBatch struct {
idx int
numrows int64
scd *snowflakeArrowStreamChunkDownloader
Loc *time.Location
rr io.ReadCloser
idx int
numrows int64
scd *snowflakeArrowStreamChunkDownloader
Loc *time.Location
rr io.ReadCloser
inlineData []byte // non-nil for inline (RowSetBase64) batches
}

// NumRows returns the total number of rows that the metadata stated should
// be in this stream of record batches.
func (asb *ArrowStreamBatch) NumRows() int64 { return asb.numrows }

// Reset closes any existing stream and clears the cached reader, allowing
// GetStream to re-download the chunk on the next call. This enables callers
Comment thread
sfc-gh-boler marked this conversation as resolved.
// to retry after a mid-stream failure (e.g. TCP RST) without re-executing
// the entire query.
func (asb *ArrowStreamBatch) Reset() error {
if asb.rr != nil {
err := asb.rr.Close()
asb.rr = nil
if err != nil {
Comment thread
davidhcoe marked this conversation as resolved.
Outdated
return err
}
}
if asb.inlineData != nil {
asb.rr = io.NopCloser(bytes.NewReader(asb.inlineData))
}
return nil
Comment thread
davidhcoe marked this conversation as resolved.
Outdated
}

// GetStream downloads the chunk (if not already cached) and returns a
// stream of bytes. The content may be Arrow IPC or JSON (row fragments)
// depending on the current QueryResultFormat. Close should be called
// on the returned stream when done to ensure no leaked memory.
//
// If a previous stream failed mid-read, call Reset() first to clear the
// cached reader and allow re-download.
func (asb *ArrowStreamBatch) GetStream(ctx context.Context) (io.ReadCloser, error) {
if asb.rr == nil {
if err := asb.downloadChunkStreamHelper(ctx); err != nil {
Expand Down Expand Up @@ -220,9 +242,10 @@ func (scd *snowflakeArrowStreamChunkDownloader) GetBatches() (out []ArrowStreamB
if len(rowSetBytes) > 0 {
out = out[:chunkMetaLen+1]
out[0] = ArrowStreamBatch{
scd: scd,
Loc: loc,
rr: io.NopCloser(bytes.NewReader(rowSetBytes)),
scd: scd,
Loc: loc,
rr: io.NopCloser(bytes.NewReader(rowSetBytes)),
inlineData: rowSetBytes,
}
toFill = out[1:]
}
Expand Down
Loading
Loading