SNOW-3449105 add Reset for ArrowStreamBatch method#1782
Conversation
|
All contributors have signed the CLA ✍️ ✅ |
There was a problem hiding this comment.
Pull request overview
This PR adds a Reset() method to ArrowStreamBatch so callers can discard a cached/partially-read batch stream and force a re-download on the next GetStream() call, enabling retry behavior for mid-stream failures (e.g., TCP resets during Arrow IPC reads).
Changes:
- Added
ArrowStreamBatch.Reset()to close and clear the cached stream reader. - Added unit tests covering
Reset()behavior andGetStream()retry/redownload scenarios. - Added an integration test that reads a batch, resets it, and re-reads to validate content stability.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
arrow_stream.go |
Introduces ArrowStreamBatch.Reset() and documents retry usage with GetStream(). |
arrow_test.go |
Adds unit tests for Reset(), redownload behavior, gzip handling, and streamWrapReader close semantics. |
chunk_test.go |
Adds an end-to-end test that resets and re-reads an Arrow stream batch from Snowflake. |
Comments suppressed due to low confidence (1)
arrow_stream.go:95
Reset()can leaverrnil on batches that are not downloadable (e.g., inline-only results wherescd.ChunkMetasis empty). A subsequentGetStream()will calldownloadChunkStreamHelper()and index intoChunkMetas[asb.idx], which can panic (index out of range) or return the wrong content for the inline batch. Consider guardingGetStream()(and/orReset()) with a bounds check forasb.idxagainstlen(asb.scd.ChunkMetas)and returning a regular error instead of panicking, or explicitly marking inline batches soReset()can behave as a safe no-op for them.
// Reset closes any existing stream and clears the cached reader, allowing
// GetStream to re-download the chunk on the next call. This enables callers
// to retry after a mid-stream failure (e.g. TCP RST) without re-executing
// the entire query.
func (asb *ArrowStreamBatch) Reset() error {
if asb.rr != nil {
err := asb.rr.Close()
asb.rr = nil
return err
}
return nil
}
// GetStream downloads the chunk (if not already cached) and returns a
// stream of bytes. The content may be Arrow IPC or JSON (row fragments)
// depending on the current QueryResultFormat. Close should be called
// on the returned stream when done to ensure no leaked memory.
//
// If a previous stream failed mid-read, call Reset() first to clear the
// cached reader and allow re-download.
func (asb *ArrowStreamBatch) GetStream(ctx context.Context) (io.ReadCloser, error) {
if asb.rr == nil {
if err := asb.downloadChunkStreamHelper(ctx); err != nil {
return nil, err
}
}
return asb.rr, nil
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I have read the CLA Document and I hereby sign the CLA |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
recheck |
|
@copilot apply changes based on the comments in this thread |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
hey @davidhcoe again thanks for this contribution; apologies but due to absence of Piotr we need to arrange someone else from the dev team to continue work with you on this; we're hoping to do it this week . Thank you for bearing with us! |
sfc-gh-boler
left a comment
There was a problem hiding this comment.
David, this is great. I primarily have a few nitpicks which failed to send last time while Github had an outage.
The major thing I'd like to discuss before we merge this is the potential leak of wrapped reader. See the copillot's comment I reopened.
|
@davidhcoe all good, but the linter complained. Could you do this one-line fix in arrow_test.go:711 — run ci/gofix.sh locally (or apply manually) and commit: |
…1792) ## Description Adds `ArrowStreamBatch.Reset()` so callers can retry a chunk download after a mid-stream failure (e.g. a TCP RST from cloud storage) without re-executing the whole query. Addresses #1781: customers using ADBC → goSnowflake from Power BI hit `arrow/ipc: could not read message body: ... An existing connection was forcibly closed by the remote host.` mid-stream during long Arrow reads. The TCP reset happens on the chunk download connection to S3/Azure Blob, so `ClientSessionKeepAlive` doesn't help — it keeps the Snowflake control-plane session alive, not the chunk transfer. Without a way to clear the cached reader, the only recovery path was re-executing the query. `Reset()` closes the in-flight reader and nils out the cached `rr`, so the next `GetStream()` triggers a fresh chunk download (or, for inline `RowSetBase64` batches, restores the inline reader from `inlineData`). The mid-read failure becomes a recoverable retry instead of a query-level redo. ### Checklist - [ ] Added proper logging (if possible) - [x] Created tests which fail without the change (if possible) - [x] Extended the README / documentation, if necessary --- Mirror of #1782 by @davidhcoe. Co-authored-by: davidhcoe <13318837+davidhcoe@users.noreply.github.com>
|
#1792 is now merged. I'll close this. |
Description
Proposes the Reset method to help address #1781
Checklist