From 000b58b5e49bf0224eb402d7b6d645e09582d6db Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Fri, 1 May 2026 17:25:16 -0400 Subject: [PATCH 1/8] propose Reset method --- arrow_stream.go | 16 +++ arrow_test.go | 318 +++++++++++++++++++++++++++++++++++++++++++++++- chunk_test.go | 39 ++++++ 3 files changed, 371 insertions(+), 2 deletions(-) diff --git a/arrow_stream.go b/arrow_stream.go index 2d84dc631..9787defed 100644 --- a/arrow_stream.go +++ b/arrow_stream.go @@ -66,10 +66,26 @@ type ArrowStreamBatch struct { // be in this stream of record batches. func (asb *ArrowStreamBatch) NumRows() int64 { return asb.numrows } +// Reset closes any existing stream and clears the cached reader, allowing +// GetStream to re-download the chunk on the next call. This enables callers +// to retry after a mid-stream failure (e.g. TCP RST) without re-executing +// the entire query. +func (asb *ArrowStreamBatch) Reset() error { + if asb.rr != nil { + err := asb.rr.Close() + asb.rr = nil + return err + } + return nil +} + // GetStream downloads the chunk (if not already cached) and returns a // stream of bytes. The content may be Arrow IPC or JSON (row fragments) // depending on the current QueryResultFormat. Close should be called // on the returned stream when done to ensure no leaked memory. +// +// If a previous stream failed mid-read, call Reset() first to clear the +// cached reader and allow re-download. func (asb *ArrowStreamBatch) GetStream(ctx context.Context) (io.ReadCloser, error) { if asb.rr == nil { if err := asb.downloadChunkStreamHelper(ctx); err != nil { diff --git a/arrow_test.go b/arrow_test.go index 087983e12..0451bd563 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -2,9 +2,14 @@ package gosnowflake import ( "bytes" + "compress/gzip" "context" + "database/sql/driver" + "errors" "fmt" + "io" "math/big" + "net/http" "reflect" "strings" "testing" @@ -12,8 +17,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" ia "github.com/snowflakedb/gosnowflake/v2/internal/arrow" - - "database/sql/driver" + "github.com/snowflakedb/gosnowflake/v2/internal/query" ) func TestArrowBatchDataProvider(t *testing.T) { @@ -575,3 +579,313 @@ func TestArrowMemoryCleanedUp(t *testing.T) { assertFalseE(t, rows.Next()) }) } + +// errReadCloser is a ReadCloser whose Close returns a predetermined error. +type errReadCloser struct { + io.Reader + closeErr error + closed bool +} + +func (e *errReadCloser) Close() error { + e.closed = true + return e.closeErr +} + +func TestArrowStreamBatchResetNilReader(t *testing.T) { + // Reset on a batch with no reader should be a no-op and return nil. + batch := ArrowStreamBatch{} + err := batch.Reset() + assertNilF(t, err, "Reset on nil reader should return nil") +} + +func TestArrowStreamBatchResetClosesReader(t *testing.T) { + // Reset should close the underlying reader and nil it out. + rc := &errReadCloser{Reader: bytes.NewReader([]byte("data"))} + batch := ArrowStreamBatch{rr: rc} + + err := batch.Reset() + assertNilF(t, err, "Reset should not return error on successful close") + assertTrueF(t, rc.closed, "underlying reader should have been closed") + assertNilF(t, batch.rr, "rr should be nil after Reset") +} + +func TestArrowStreamBatchResetPropagatesCloseError(t *testing.T) { + // Reset should propagate the error from Close. + expected := errors.New("close failed") + rc := &errReadCloser{Reader: bytes.NewReader(nil), closeErr: expected} + batch := ArrowStreamBatch{rr: rc} + + err := batch.Reset() + assertTrueF(t, errors.Is(err, expected), "Reset should propagate close error") + // rr should still be nilled out even when Close returns an error. + assertNilF(t, batch.rr, "rr should be nil after Reset even on error") +} + +func TestArrowStreamBatchResetAllowsRedownload(t *testing.T) { + // After Reset, GetStream should re-invoke the download path. + // We simulate this by setting rr, resetting, then confirming rr is nil + // so GetStream would attempt a fresh download. + rc := &errReadCloser{Reader: bytes.NewReader([]byte("stale"))} + batch := ArrowStreamBatch{rr: rc} + + // Confirm GetStream returns the cached reader before Reset. + stream, err := batch.GetStream(context.Background()) + assertNilF(t, err) + assertTrueF(t, stream == rc, "GetStream should return cached reader") + + // Reset clears the cache. + err = batch.Reset() + assertNilF(t, err) + assertNilF(t, batch.rr, "rr should be nil after Reset") +} + +func TestArrowStreamBatchDoubleResetIsIdempotent(t *testing.T) { + // Calling Reset twice should not error the second time. + rc := &errReadCloser{Reader: bytes.NewReader([]byte("data"))} + batch := ArrowStreamBatch{rr: rc} + + assertNilF(t, batch.Reset(), "first Reset should succeed") + assertNilF(t, batch.Reset(), "second Reset should be a no-op") +} + +// newTestArrowStreamBatch creates an ArrowStreamBatch wired to a mock FuncGet +// for unit testing without a live Snowflake connection. +func newTestArrowStreamBatch(funcGet func(context.Context, *snowflakeConn, string, map[string]string, time.Duration) (*http.Response, error)) ArrowStreamBatch { + sc := &snowflakeConn{rest: &snowflakeRestful{RequestTimeout: 0}} + scd := &snowflakeArrowStreamChunkDownloader{ + sc: sc, + ChunkMetas: []query.ExecResponseChunk{{URL: "http://fake/chunk0"}}, + Qrmk: "testQrmk", + FuncGet: funcGet, + } + return ArrowStreamBatch{idx: 0, scd: scd} +} + +func TestArrowStreamBatchResetThenGetStreamRedownloads(t *testing.T) { + // After Reset, calling GetStream should invoke FuncGet again, + // proving the chunk is actually re-downloaded. + callCount := 0 + mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { + callCount++ + body := []byte(fmt.Sprintf("payload-%d", callCount)) + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(body)), + }, nil + } + + batch := newTestArrowStreamBatch(mockGet) + + // First download. + stream1, err := batch.GetStream(context.Background()) + assertNilF(t, err) + data1, err := io.ReadAll(stream1) + assertNilF(t, err) + assertEqualE(t, callCount, 1) + + // Reset clears cached reader. + assertNilF(t, batch.Reset()) + + // Second download — FuncGet is called again. + stream2, err := batch.GetStream(context.Background()) + assertNilF(t, err) + data2, err := io.ReadAll(stream2) + assertNilF(t, err) + assertEqualE(t, callCount, 2) + + // Content should differ, proving it was re-downloaded. + assertFalseE(t, bytes.Equal(data1, data2), "re-downloaded data should differ from original") +} + +func TestArrowStreamBatchResetAfterPartialRead(t *testing.T) { + // Simulates the primary use case: a mid-stream failure followed by + // Reset + full re-download. + fullPayload := []byte("complete-data-here") + mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(fullPayload)), + }, nil + } + + batch := newTestArrowStreamBatch(mockGet) + + // Read only a few bytes (simulate partial/failed read). + stream, err := batch.GetStream(context.Background()) + assertNilF(t, err) + partial := make([]byte, 5) + _, err = stream.Read(partial) + assertNilF(t, err) + + // Reset and re-download the full payload. + assertNilF(t, batch.Reset()) + stream2, err := batch.GetStream(context.Background()) + assertNilF(t, err) + full, err := io.ReadAll(stream2) + assertNilF(t, err) + assertEqualE(t, string(full), string(fullPayload)) +} + +func TestArrowStreamBatchGetStreamGzipped(t *testing.T) { + // Verify that gzip-compressed responses are transparently decompressed. + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("decompressed payload")) + assertNilF(t, err) + assertNilF(t, gw.Close()) + gzBytes := buf.Bytes() + + mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(gzBytes)), + }, nil + } + + batch := newTestArrowStreamBatch(mockGet) + stream, err := batch.GetStream(context.Background()) + assertNilF(t, err) + data, err := io.ReadAll(stream) + assertNilF(t, err) + assertEqualE(t, string(data), "decompressed payload") +} + +func TestArrowStreamBatchGetStreamNon200(t *testing.T) { + // Non-200 responses should surface as a SnowflakeError. + mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusBadGateway, + Body: io.NopCloser(bytes.NewReader([]byte("error"))), + }, nil + } + + batch := newTestArrowStreamBatch(mockGet) + _, err := batch.GetStream(context.Background()) + var sfErr *SnowflakeError + assertTrueF(t, errors.As(err, &sfErr), "should return SnowflakeError") + assertEqualE(t, sfErr.Number, ErrFailedToGetChunk) +} + +func TestArrowStreamBatchGetStreamFuncGetError(t *testing.T) { + // Network-level errors from FuncGet should propagate directly. + expected := errors.New("network failure") + mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { + return nil, expected + } + + batch := newTestArrowStreamBatch(mockGet) + _, err := batch.GetStream(context.Background()) + assertTrueF(t, errors.Is(err, expected), "should propagate FuncGet error") +} + +func TestArrowStreamBatchResetThenGetStreamAfterError(t *testing.T) { + // If the first GetStream fails, Reset should allow a successful retry. + calls := 0 + mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { + calls++ + if calls == 1 { + return nil, errors.New("transient error") + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte("ok"))), + }, nil + } + + batch := newTestArrowStreamBatch(mockGet) + + // First attempt fails. + _, err := batch.GetStream(context.Background()) + assertNotNilE(t, err) + + // Reset (rr is still nil since download failed, but Reset is safe). + assertNilF(t, batch.Reset()) + + // Retry succeeds. + stream, err := batch.GetStream(context.Background()) + assertNilF(t, err) + data, err := io.ReadAll(stream) + assertNilF(t, err) + assertEqualE(t, string(data), "ok") +} + +func TestStreamWrapReaderCloseClosesInnerAndWrapped(t *testing.T) { + // When inner Reader implements io.ReadCloser, both inner and wrapped + // should be closed. + inner := &errReadCloser{Reader: bytes.NewReader(nil)} + wrappedClosed := false + wrapped := &errReadCloser{ + Reader: bytes.NewReader(nil), + closed: false, + } + w := &streamWrapReader{Reader: inner, wrapped: wrapped} + err := w.Close() + assertNilF(t, err) + assertTrueF(t, inner.closed, "inner ReadCloser should be closed") + assertTrueF(t, wrapped.closed, "wrapped body should be closed") + _ = wrappedClosed +} + +func TestStreamWrapReaderCloseNonClosableInner(t *testing.T) { + // When inner Reader is not an io.ReadCloser, only wrapped is closed. + wrapped := &errReadCloser{Reader: bytes.NewReader(nil)} + w := &streamWrapReader{Reader: bytes.NewReader(nil), wrapped: wrapped} + err := w.Close() + assertNilF(t, err) + assertTrueF(t, wrapped.closed, "wrapped body should be closed") +} + +func TestStreamWrapReaderClosePropagatesInnerError(t *testing.T) { + // If the inner ReadCloser's Close fails, the error should propagate. + expected := errors.New("inner close fail") + inner := &errReadCloser{Reader: bytes.NewReader(nil), closeErr: expected} + wrapped := &errReadCloser{Reader: bytes.NewReader(nil)} + w := &streamWrapReader{Reader: inner, wrapped: wrapped} + err := w.Close() + assertTrueF(t, errors.Is(err, expected), "should propagate inner close error") + // wrapped should NOT have been closed because inner errored first. + assertFalseE(t, wrapped.closed, "wrapped should not close if inner errors") +} + +func TestArrowStreamBatchResetClosesStreamWrapReader(t *testing.T) { + // Reset should properly close a streamWrapReader, including the + // wrapped HTTP body. + wrappedClosed := false + body := &fakeResponseBody{ + body: []byte("data"), + onClose: func() { wrappedClosed = true }, + } + batch := ArrowStreamBatch{ + rr: &streamWrapReader{ + Reader: bytes.NewReader([]byte("inner")), + wrapped: body, + }, + } + assertNilF(t, batch.Reset()) + assertNilF(t, batch.rr, "rr should be nil after Reset") + assertTrueF(t, wrappedClosed, "wrapped HTTP body should be closed") +} + +func TestArrowStreamBatchResetClosesGzipStreamWrapReader(t *testing.T) { + // Verify Reset properly tears down a gzip + streamWrapReader stack. + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("hello")) + assertNilF(t, err) + assertNilF(t, gw.Close()) + + wrappedClosed := false + body := &fakeResponseBody{ + body: buf.Bytes(), + onClose: func() { wrappedClosed = true }, + } + gr, err := gzip.NewReader(bytes.NewReader(buf.Bytes())) + assertNilF(t, err) + + batch := ArrowStreamBatch{ + rr: &streamWrapReader{Reader: gr, wrapped: body}, + } + assertNilF(t, batch.Reset()) + assertTrueF(t, wrappedClosed, "wrapped HTTP body should be closed via gzip reader") +} diff --git a/chunk_test.go b/chunk_test.go index 6fb5b22d9..1a2c2e6c1 100644 --- a/chunk_test.go +++ b/chunk_test.go @@ -653,3 +653,42 @@ func TestQueryArrowStreamArrowResponseExposesFormat(t *testing.T) { assertTrueF(t, len(batches) > 0, "should have Arrow batches") }) } + +func TestQueryArrowStreamResetAndRereadBatch(t *testing.T) { + runSnowflakeConnTest(t, func(sct *SCTest) { + numrows := 50000 + query := fmt.Sprintf(selectRandomGenerator, numrows) + loader, err := sct.sc.QueryArrowStream(sct.sc.ctx, query) + assertNilF(t, err) + + batches, err := loader.GetBatches() + assertNilF(t, err) + assertTrueF(t, len(batches) > 0, "should have at least one batch") + + // Pick a batch that requires a download (skip index 0 which may be inline). + idx := 0 + if len(batches) > 1 { + idx = 1 + } + + // First read. + stream1, err := batches[idx].GetStream(sct.sc.ctx) + assertNilF(t, err) + data1, err := io.ReadAll(stream1) + assertNilF(t, err) + assertTrueF(t, len(data1) > 0, "first read should return data") + + // Reset the batch. + assertNilF(t, batches[idx].Reset()) + + // Re-read the same batch. + stream2, err := batches[idx].GetStream(sct.sc.ctx) + assertNilF(t, err) + data2, err := io.ReadAll(stream2) + assertNilF(t, err) + assertTrueF(t, len(data2) > 0, "re-read after Reset should return data") + + // Both reads should return the same content. + assertTrueF(t, bytes.Equal(data1, data2), "data should match after Reset + re-download") + }) +} From 4c9d8fa6bc759192e0ae5313c0fbe1ecaf16b7c0 Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Fri, 1 May 2026 17:25:17 -0400 Subject: [PATCH 2/8] Copilot PR feedback --- arrow_test.go | 4 ++-- chunk_test.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/arrow_test.go b/arrow_test.go index 0451bd563..759a6e6ed 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -690,6 +690,7 @@ func TestArrowStreamBatchResetThenGetStreamRedownloads(t *testing.T) { // Second download — FuncGet is called again. stream2, err := batch.GetStream(context.Background()) assertNilF(t, err) + defer stream2.Close() data2, err := io.ReadAll(stream2) assertNilF(t, err) assertEqualE(t, callCount, 2) @@ -722,6 +723,7 @@ func TestArrowStreamBatchResetAfterPartialRead(t *testing.T) { assertNilF(t, batch.Reset()) stream2, err := batch.GetStream(context.Background()) assertNilF(t, err) + defer stream2.Close() full, err := io.ReadAll(stream2) assertNilF(t, err) assertEqualE(t, string(full), string(fullPayload)) @@ -814,7 +816,6 @@ func TestStreamWrapReaderCloseClosesInnerAndWrapped(t *testing.T) { // When inner Reader implements io.ReadCloser, both inner and wrapped // should be closed. inner := &errReadCloser{Reader: bytes.NewReader(nil)} - wrappedClosed := false wrapped := &errReadCloser{ Reader: bytes.NewReader(nil), closed: false, @@ -824,7 +825,6 @@ func TestStreamWrapReaderCloseClosesInnerAndWrapped(t *testing.T) { assertNilF(t, err) assertTrueF(t, inner.closed, "inner ReadCloser should be closed") assertTrueF(t, wrapped.closed, "wrapped body should be closed") - _ = wrappedClosed } func TestStreamWrapReaderCloseNonClosableInner(t *testing.T) { diff --git a/chunk_test.go b/chunk_test.go index 1a2c2e6c1..c0a4718f8 100644 --- a/chunk_test.go +++ b/chunk_test.go @@ -684,6 +684,7 @@ func TestQueryArrowStreamResetAndRereadBatch(t *testing.T) { // Re-read the same batch. stream2, err := batches[idx].GetStream(sct.sc.ctx) assertNilF(t, err) + defer stream2.Close() data2, err := io.ReadAll(stream2) assertNilF(t, err) assertTrueF(t, len(data2) > 0, "re-read after Reset should return data") From 8f2e730ccc10d5c0b98d5e899fb5045306ba305f Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Fri, 1 May 2026 17:25:18 -0400 Subject: [PATCH 3/8] more Copilot feedback --- arrow_stream.go | 25 ++++++++++++++++--------- arrow_test.go | 10 ++++++---- chunk_test.go | 1 + 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/arrow_stream.go b/arrow_stream.go index 9787defed..45da7fa56 100644 --- a/arrow_stream.go +++ b/arrow_stream.go @@ -55,11 +55,12 @@ type QueryResultFormatProvider interface { // QueryResultFormat: Arrow IPC record batches (use ipc.NewReader) when // the format is "arrow", or JSON (row fragments) when it is "json". type ArrowStreamBatch struct { - idx int - numrows int64 - scd *snowflakeArrowStreamChunkDownloader - Loc *time.Location - rr io.ReadCloser + idx int + numrows int64 + scd *snowflakeArrowStreamChunkDownloader + Loc *time.Location + rr io.ReadCloser + inlineData []byte // non-nil for inline (RowSetBase64) batches } // NumRows returns the total number of rows that the metadata stated should @@ -74,7 +75,12 @@ func (asb *ArrowStreamBatch) Reset() error { if asb.rr != nil { err := asb.rr.Close() asb.rr = nil - return err + if err != nil { + return err + } + } + if asb.inlineData != nil { + asb.rr = io.NopCloser(bytes.NewReader(asb.inlineData)) } return nil } @@ -236,9 +242,10 @@ func (scd *snowflakeArrowStreamChunkDownloader) GetBatches() (out []ArrowStreamB if len(rowSetBytes) > 0 { out = out[:chunkMetaLen+1] out[0] = ArrowStreamBatch{ - scd: scd, - Loc: loc, - rr: io.NopCloser(bytes.NewReader(rowSetBytes)), + scd: scd, + Loc: loc, + rr: io.NopCloser(bytes.NewReader(rowSetBytes)), + inlineData: rowSetBytes, } toFill = out[1:] } diff --git a/arrow_test.go b/arrow_test.go index 759a6e6ed..0028fe2a2 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -622,10 +622,11 @@ func TestArrowStreamBatchResetPropagatesCloseError(t *testing.T) { assertNilF(t, batch.rr, "rr should be nil after Reset even on error") } -func TestArrowStreamBatchResetAllowsRedownload(t *testing.T) { - // After Reset, GetStream should re-invoke the download path. - // We simulate this by setting rr, resetting, then confirming rr is nil - // so GetStream would attempt a fresh download. +func TestArrowStreamBatchResetClearsCachedReader(t *testing.T) { + // Reset should close and nil-out the cached reader so that a + // subsequent GetStream (tested separately with a full downloader + // in TestArrowStreamBatchResetThenGetStreamRedownloads) would + // attempt a fresh download. rc := &errReadCloser{Reader: bytes.NewReader([]byte("stale"))} batch := ArrowStreamBatch{rr: rc} @@ -680,6 +681,7 @@ func TestArrowStreamBatchResetThenGetStreamRedownloads(t *testing.T) { // First download. stream1, err := batch.GetStream(context.Background()) assertNilF(t, err) + defer stream1.Close() data1, err := io.ReadAll(stream1) assertNilF(t, err) assertEqualE(t, callCount, 1) diff --git a/chunk_test.go b/chunk_test.go index c0a4718f8..aadef74fd 100644 --- a/chunk_test.go +++ b/chunk_test.go @@ -674,6 +674,7 @@ func TestQueryArrowStreamResetAndRereadBatch(t *testing.T) { // First read. stream1, err := batches[idx].GetStream(sct.sc.ctx) assertNilF(t, err) + defer stream1.Close() data1, err := io.ReadAll(stream1) assertNilF(t, err) assertTrueF(t, len(data1) > 0, "first read should return data") From 105e07ea43a137e9ec8c2e34676b3ce24a35afa1 Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Fri, 1 May 2026 17:25:18 -0400 Subject: [PATCH 4/8] add changelog entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa190c8ee..5e1342e8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ New features: +- Added `ArrowStreamBatch.Reset()` method that closes any existing stream and clears the cached reader, allowing callers to retry `GetStream` after a mid-stream failure (e.g. TCP RST) without re-executing the entire query. Inline (RowSetBase64) batches are restored from cached bytes on reset. + Bug fixes: - Improved error message when `Host` is incorrectly configured with a URL scheme (e.g. `https://myorg-myaccount.snowflakecomputing.com`), previously this produced a cryptic `260004: failed to parse a port number` error (snowflakedb/gosnowflake#1784). From 61d3dd686b06d2dbab309ddd47d3daae483222cf Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Wed, 6 May 2026 13:59:40 -0400 Subject: [PATCH 5/8] PR feedback --- arrow_stream.go | 8 +++----- arrow_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/arrow_stream.go b/arrow_stream.go index 45da7fa56..7dcfef585 100644 --- a/arrow_stream.go +++ b/arrow_stream.go @@ -72,17 +72,15 @@ func (asb *ArrowStreamBatch) NumRows() int64 { return asb.numrows } // to retry after a mid-stream failure (e.g. TCP RST) without re-executing // the entire query. func (asb *ArrowStreamBatch) Reset() error { + var closeErr error if asb.rr != nil { - err := asb.rr.Close() + closeErr = asb.rr.Close() asb.rr = nil - if err != nil { - return err - } } if asb.inlineData != nil { asb.rr = io.NopCloser(bytes.NewReader(asb.inlineData)) } - return nil + return closeErr } // GetStream downloads the chunk (if not already cached) and returns a diff --git a/arrow_test.go b/arrow_test.go index 0028fe2a2..677b5b6b9 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -641,6 +641,45 @@ func TestArrowStreamBatchResetClearsCachedReader(t *testing.T) { assertNilF(t, batch.rr, "rr should be nil after Reset") } +func TestArrowStreamBatchResetRestoresInlineReaderOnCloseError(t *testing.T) { + // For inline (RowSetBase64) batches, Reset must restore rr from + // inlineData even when the underlying Close returns an error, so + // that the batch remains usable for retry. Otherwise GetStream + // would fall through to downloadChunkStreamHelper, which has no + // chunk URL for inline batches. + expected := errors.New("close failed") + rc := &errReadCloser{Reader: bytes.NewReader(nil), closeErr: expected} + inline := []byte("inline payload") + batch := ArrowStreamBatch{rr: rc, inlineData: inline} + + err := batch.Reset() + assertTrueF(t, errors.Is(err, expected), "Reset should propagate close error") + assertTrueF(t, rc.closed, "underlying reader should have been closed") + assertNotNilF(t, batch.rr, "rr should be restored from inlineData even after close error") + + // Verify the restored reader yields the inline payload. + got, readErr := io.ReadAll(batch.rr) + assertNilF(t, readErr) + assertEqualE(t, string(got), string(inline)) +} + +func TestArrowStreamBatchResetRestoresInlineReader(t *testing.T) { + // Reset on an inline batch should leave rr pointing at a fresh + // reader over inlineData (successful Close path). + rc := &errReadCloser{Reader: bytes.NewReader([]byte("consumed"))} + inline := []byte("inline payload") + batch := ArrowStreamBatch{rr: rc, inlineData: inline} + + err := batch.Reset() + assertNilF(t, err) + assertTrueF(t, rc.closed, "underlying reader should have been closed") + assertNotNilF(t, batch.rr, "rr should be restored from inlineData") + + got, readErr := io.ReadAll(batch.rr) + assertNilF(t, readErr) + assertEqualE(t, string(got), string(inline)) +} + func TestArrowStreamBatchDoubleResetIsIdempotent(t *testing.T) { // Calling Reset twice should not error the second time. rc := &errReadCloser{Reader: bytes.NewReader([]byte("data"))} From e484979050cc7dcd11ca41327b47fc66f8de0c77 Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Wed, 6 May 2026 14:08:58 -0400 Subject: [PATCH 6/8] more PR feedback --- arrow_stream.go | 8 +++++++ arrow_test.go | 56 +++++++++++++++++++++++++------------------------ 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/arrow_stream.go b/arrow_stream.go index 7dcfef585..52b262664 100644 --- a/arrow_stream.go +++ b/arrow_stream.go @@ -71,6 +71,14 @@ func (asb *ArrowStreamBatch) NumRows() int64 { return asb.numrows } // GetStream to re-download the chunk on the next call. This enables callers // to retry after a mid-stream failure (e.g. TCP RST) without re-executing // the entire query. +// +// For inline batches (those produced from RowSetBase64 in the initial +// query response), there is no remote chunk to re-download. In that +// case Reset rewinds the reader by re-wrapping the cached inline bytes +// so the batch can be read again from the beginning. This is done +// even when the underlying Close returns an error, so the batch is +// always left in a usable state for retry; the close error is still +// returned to the caller. func (asb *ArrowStreamBatch) Reset() error { var closeErr error if asb.rr != nil { diff --git a/arrow_test.go b/arrow_test.go index 677b5b6b9..65dbff25b 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -659,8 +659,8 @@ func TestArrowStreamBatchResetRestoresInlineReaderOnCloseError(t *testing.T) { // Verify the restored reader yields the inline payload. got, readErr := io.ReadAll(batch.rr) - assertNilF(t, readErr) - assertEqualE(t, string(got), string(inline)) + assertNilF(t, readErr, "reading restored inline reader should succeed") + assertEqualE(t, string(got), string(inline), "restored reader should yield inlineData") } func TestArrowStreamBatchResetRestoresInlineReader(t *testing.T) { @@ -671,13 +671,13 @@ func TestArrowStreamBatchResetRestoresInlineReader(t *testing.T) { batch := ArrowStreamBatch{rr: rc, inlineData: inline} err := batch.Reset() - assertNilF(t, err) + assertNilF(t, err, "Reset should not return error on successful close") assertTrueF(t, rc.closed, "underlying reader should have been closed") assertNotNilF(t, batch.rr, "rr should be restored from inlineData") got, readErr := io.ReadAll(batch.rr) - assertNilF(t, readErr) - assertEqualE(t, string(got), string(inline)) + assertNilF(t, readErr, "reading restored inline reader should succeed") + assertEqualE(t, string(got), string(inline), "restored reader should yield inlineData") } func TestArrowStreamBatchDoubleResetIsIdempotent(t *testing.T) { @@ -719,22 +719,22 @@ func TestArrowStreamBatchResetThenGetStreamRedownloads(t *testing.T) { // First download. stream1, err := batch.GetStream(context.Background()) - assertNilF(t, err) + assertNilF(t, err, "first GetStream should succeed") defer stream1.Close() data1, err := io.ReadAll(stream1) - assertNilF(t, err) - assertEqualE(t, callCount, 1) + assertNilF(t, err, "reading first stream should succeed") + assertEqualE(t, callCount, 1, "FuncGet should have been called once") // Reset clears cached reader. - assertNilF(t, batch.Reset()) + assertNilF(t, batch.Reset(), "Reset after first read should succeed") // Second download — FuncGet is called again. stream2, err := batch.GetStream(context.Background()) - assertNilF(t, err) + assertNilF(t, err, "second GetStream should succeed") defer stream2.Close() data2, err := io.ReadAll(stream2) - assertNilF(t, err) - assertEqualE(t, callCount, 2) + assertNilF(t, err, "reading second stream should succeed") + assertEqualE(t, callCount, 2, "FuncGet should have been called twice after Reset") // Content should differ, proving it was re-downloaded. assertFalseE(t, bytes.Equal(data1, data2), "re-downloaded data should differ from original") @@ -755,19 +755,19 @@ func TestArrowStreamBatchResetAfterPartialRead(t *testing.T) { // Read only a few bytes (simulate partial/failed read). stream, err := batch.GetStream(context.Background()) - assertNilF(t, err) + assertNilF(t, err, "initial GetStream should succeed") partial := make([]byte, 5) _, err = stream.Read(partial) - assertNilF(t, err) + assertNilF(t, err, "partial read should succeed") // Reset and re-download the full payload. - assertNilF(t, batch.Reset()) + assertNilF(t, batch.Reset(), "Reset after partial read should succeed") stream2, err := batch.GetStream(context.Background()) - assertNilF(t, err) + assertNilF(t, err, "GetStream after Reset should succeed") defer stream2.Close() full, err := io.ReadAll(stream2) - assertNilF(t, err) - assertEqualE(t, string(full), string(fullPayload)) + assertNilF(t, err, "reading full payload should succeed") + assertEqualE(t, string(full), string(fullPayload), "re-downloaded payload should match original") } func TestArrowStreamBatchGetStreamGzipped(t *testing.T) { @@ -788,10 +788,11 @@ func TestArrowStreamBatchGetStreamGzipped(t *testing.T) { batch := newTestArrowStreamBatch(mockGet) stream, err := batch.GetStream(context.Background()) - assertNilF(t, err) + assertNilF(t, err, "GetStream should succeed") + defer stream.Close() data, err := io.ReadAll(stream) - assertNilF(t, err) - assertEqualE(t, string(data), "decompressed payload") + assertNilF(t, err, "reading decompressed stream should succeed") + assertEqualE(t, string(data), "decompressed payload", "gzip-decoded payload should match") } func TestArrowStreamBatchGetStreamNon200(t *testing.T) { @@ -807,7 +808,7 @@ func TestArrowStreamBatchGetStreamNon200(t *testing.T) { _, err := batch.GetStream(context.Background()) var sfErr *SnowflakeError assertTrueF(t, errors.As(err, &sfErr), "should return SnowflakeError") - assertEqualE(t, sfErr.Number, ErrFailedToGetChunk) + assertEqualF(t, sfErr.Number, ErrFailedToGetChunk, "error number should be ErrFailedToGetChunk") } func TestArrowStreamBatchGetStreamFuncGetError(t *testing.T) { @@ -840,17 +841,18 @@ func TestArrowStreamBatchResetThenGetStreamAfterError(t *testing.T) { // First attempt fails. _, err := batch.GetStream(context.Background()) - assertNotNilE(t, err) + assertNotNilE(t, err, "first GetStream should fail with transient error") // Reset (rr is still nil since download failed, but Reset is safe). - assertNilF(t, batch.Reset()) + assertNilF(t, batch.Reset(), "Reset after failed download should succeed") // Retry succeeds. stream, err := batch.GetStream(context.Background()) - assertNilF(t, err) + assertNilF(t, err, "retry GetStream should succeed") + defer stream.Close() data, err := io.ReadAll(stream) - assertNilF(t, err) - assertEqualE(t, string(data), "ok") + assertNilF(t, err, "reading retried stream should succeed") + assertEqualE(t, string(data), "ok", "retried payload should match") } func TestStreamWrapReaderCloseClosesInnerAndWrapped(t *testing.T) { From 2a9afbc5570065837a417eb72cd9bfd80d1c78d5 Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Mon, 11 May 2026 14:47:53 -0400 Subject: [PATCH 7/8] more PR feedback --- arrow_test.go | 34 +++++++++++++++++----------------- chunk_test.go | 16 ++++++++-------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/arrow_test.go b/arrow_test.go index 65dbff25b..28b4f6f09 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -617,7 +617,7 @@ func TestArrowStreamBatchResetPropagatesCloseError(t *testing.T) { batch := ArrowStreamBatch{rr: rc} err := batch.Reset() - assertTrueF(t, errors.Is(err, expected), "Reset should propagate close error") + assertErrIsF(t, err, expected, "Reset should propagate close error") // rr should still be nilled out even when Close returns an error. assertNilF(t, batch.rr, "rr should be nil after Reset even on error") } @@ -632,12 +632,12 @@ func TestArrowStreamBatchResetClearsCachedReader(t *testing.T) { // Confirm GetStream returns the cached reader before Reset. stream, err := batch.GetStream(context.Background()) - assertNilF(t, err) - assertTrueF(t, stream == rc, "GetStream should return cached reader") + assertNilF(t, err, "GetStream on cached reader should succeed") + assertEqualF(t, stream, io.ReadCloser(rc), "GetStream should return cached reader") // Reset clears the cache. err = batch.Reset() - assertNilF(t, err) + assertNilF(t, err, "Reset should succeed") assertNilF(t, batch.rr, "rr should be nil after Reset") } @@ -653,7 +653,7 @@ func TestArrowStreamBatchResetRestoresInlineReaderOnCloseError(t *testing.T) { batch := ArrowStreamBatch{rr: rc, inlineData: inline} err := batch.Reset() - assertTrueF(t, errors.Is(err, expected), "Reset should propagate close error") + assertErrIsF(t, err, expected, "Reset should propagate close error") assertTrueF(t, rc.closed, "underlying reader should have been closed") assertNotNilF(t, batch.rr, "rr should be restored from inlineData even after close error") @@ -775,8 +775,8 @@ func TestArrowStreamBatchGetStreamGzipped(t *testing.T) { var buf bytes.Buffer gw := gzip.NewWriter(&buf) _, err := gw.Write([]byte("decompressed payload")) - assertNilF(t, err) - assertNilF(t, gw.Close()) + assertNilF(t, err, "gzip Write should succeed") + assertNilF(t, gw.Close(), "gzip Close should succeed") gzBytes := buf.Bytes() mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { @@ -807,7 +807,7 @@ func TestArrowStreamBatchGetStreamNon200(t *testing.T) { batch := newTestArrowStreamBatch(mockGet) _, err := batch.GetStream(context.Background()) var sfErr *SnowflakeError - assertTrueF(t, errors.As(err, &sfErr), "should return SnowflakeError") + assertErrorsAsF(t, err, &sfErr, "should return SnowflakeError") assertEqualF(t, sfErr.Number, ErrFailedToGetChunk, "error number should be ErrFailedToGetChunk") } @@ -820,7 +820,7 @@ func TestArrowStreamBatchGetStreamFuncGetError(t *testing.T) { batch := newTestArrowStreamBatch(mockGet) _, err := batch.GetStream(context.Background()) - assertTrueF(t, errors.Is(err, expected), "should propagate FuncGet error") + assertErrIsF(t, err, expected, "should propagate FuncGet error") } func TestArrowStreamBatchResetThenGetStreamAfterError(t *testing.T) { @@ -865,7 +865,7 @@ func TestStreamWrapReaderCloseClosesInnerAndWrapped(t *testing.T) { } w := &streamWrapReader{Reader: inner, wrapped: wrapped} err := w.Close() - assertNilF(t, err) + assertNilF(t, err, "Close should succeed when neither close errors") assertTrueF(t, inner.closed, "inner ReadCloser should be closed") assertTrueF(t, wrapped.closed, "wrapped body should be closed") } @@ -875,7 +875,7 @@ func TestStreamWrapReaderCloseNonClosableInner(t *testing.T) { wrapped := &errReadCloser{Reader: bytes.NewReader(nil)} w := &streamWrapReader{Reader: bytes.NewReader(nil), wrapped: wrapped} err := w.Close() - assertNilF(t, err) + assertNilF(t, err, "Close should succeed when only wrapped is closable") assertTrueF(t, wrapped.closed, "wrapped body should be closed") } @@ -886,7 +886,7 @@ func TestStreamWrapReaderClosePropagatesInnerError(t *testing.T) { wrapped := &errReadCloser{Reader: bytes.NewReader(nil)} w := &streamWrapReader{Reader: inner, wrapped: wrapped} err := w.Close() - assertTrueF(t, errors.Is(err, expected), "should propagate inner close error") + assertErrIsF(t, err, expected, "should propagate inner close error") // wrapped should NOT have been closed because inner errored first. assertFalseE(t, wrapped.closed, "wrapped should not close if inner errors") } @@ -905,7 +905,7 @@ func TestArrowStreamBatchResetClosesStreamWrapReader(t *testing.T) { wrapped: body, }, } - assertNilF(t, batch.Reset()) + assertNilF(t, batch.Reset(), "Reset should succeed") assertNilF(t, batch.rr, "rr should be nil after Reset") assertTrueF(t, wrappedClosed, "wrapped HTTP body should be closed") } @@ -915,8 +915,8 @@ func TestArrowStreamBatchResetClosesGzipStreamWrapReader(t *testing.T) { var buf bytes.Buffer gw := gzip.NewWriter(&buf) _, err := gw.Write([]byte("hello")) - assertNilF(t, err) - assertNilF(t, gw.Close()) + assertNilF(t, err, "gzip Write should succeed") + assertNilF(t, gw.Close(), "gzip Close should succeed") wrappedClosed := false body := &fakeResponseBody{ @@ -924,11 +924,11 @@ func TestArrowStreamBatchResetClosesGzipStreamWrapReader(t *testing.T) { onClose: func() { wrappedClosed = true }, } gr, err := gzip.NewReader(bytes.NewReader(buf.Bytes())) - assertNilF(t, err) + assertNilF(t, err, "gzip.NewReader should succeed") batch := ArrowStreamBatch{ rr: &streamWrapReader{Reader: gr, wrapped: body}, } - assertNilF(t, batch.Reset()) + assertNilF(t, batch.Reset(), "Reset should succeed") assertTrueF(t, wrappedClosed, "wrapped HTTP body should be closed via gzip reader") } diff --git a/chunk_test.go b/chunk_test.go index aadef74fd..095679645 100644 --- a/chunk_test.go +++ b/chunk_test.go @@ -659,10 +659,10 @@ func TestQueryArrowStreamResetAndRereadBatch(t *testing.T) { numrows := 50000 query := fmt.Sprintf(selectRandomGenerator, numrows) loader, err := sct.sc.QueryArrowStream(sct.sc.ctx, query) - assertNilF(t, err) + assertNilF(t, err, "QueryArrowStream should succeed") batches, err := loader.GetBatches() - assertNilF(t, err) + assertNilF(t, err, "GetBatches should succeed") assertTrueF(t, len(batches) > 0, "should have at least one batch") // Pick a batch that requires a download (skip index 0 which may be inline). @@ -673,24 +673,24 @@ func TestQueryArrowStreamResetAndRereadBatch(t *testing.T) { // First read. stream1, err := batches[idx].GetStream(sct.sc.ctx) - assertNilF(t, err) + assertNilF(t, err, "first GetStream should succeed") defer stream1.Close() data1, err := io.ReadAll(stream1) - assertNilF(t, err) + assertNilF(t, err, "reading first stream should succeed") assertTrueF(t, len(data1) > 0, "first read should return data") // Reset the batch. - assertNilF(t, batches[idx].Reset()) + assertNilF(t, batches[idx].Reset(), "Reset should succeed") // Re-read the same batch. stream2, err := batches[idx].GetStream(sct.sc.ctx) - assertNilF(t, err) + assertNilF(t, err, "second GetStream should succeed") defer stream2.Close() data2, err := io.ReadAll(stream2) - assertNilF(t, err) + assertNilF(t, err, "reading second stream should succeed") assertTrueF(t, len(data2) > 0, "re-read after Reset should return data") // Both reads should return the same content. - assertTrueF(t, bytes.Equal(data1, data2), "data should match after Reset + re-download") + assertBytesEqualE(t, data1, data2, "data should match after Reset + re-download") }) } From 6df51dffbaecff3ef40ab9896509d831088a6da0 Mon Sep 17 00:00:00 2001 From: davidhcoe <13318837+davidhcoe@users.noreply.github.com> Date: Tue, 12 May 2026 09:51:38 -0400 Subject: [PATCH 8/8] linter fix --- arrow_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow_test.go b/arrow_test.go index 28b4f6f09..136439ee6 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -708,7 +708,7 @@ func TestArrowStreamBatchResetThenGetStreamRedownloads(t *testing.T) { callCount := 0 mockGet := func(_ context.Context, _ *snowflakeConn, _ string, _ map[string]string, _ time.Duration) (*http.Response, error) { callCount++ - body := []byte(fmt.Sprintf("payload-%d", callCount)) + body := fmt.Appendf(nil, "payload-%d", callCount) return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewReader(body)),