Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ New features:
Bug fixes:

- Fixed stale OCSP cache `.lck` directory permanently blocking cache writes (and forcing an online OCSP validation if OCSP is enabled, as is by default) by using `os.RemoveAll` instead of `os.Remove` for stale lock recovery (snowflakedb/gosnowflake#1793).
- Fixed `QueryArrowStream` chunk reads so canceling the query context now interrupts stalled Arrow stream downloads and reports the cancellation instead of hanging on the HTTP body read (snowflakedb/gosnowflake#1789).
- Fixed `baseName` silently dropping files whose name ends with a dot (e.g. `myfile.txt.`), which caused PUT uploads to discard such files without error (snowflakedb/gosnowflake#1788).
- Improved error message when `Host` is incorrectly configured with a URL scheme (e.g. `https://myorg-myaccount.snowflakecomputing.com`), previously this produced a cryptic `260004: failed to parse a port number` error (snowflakedb/gosnowflake#1784).
- Fixed minicore build on OpenBSD by skipping the `-ldl` linker flag, since libdl is not a separate library on OpenBSD (`dlopen`/`dlsym` are provided by libc) (snowflakedb/gosnowflake#1791).
Expand Down
116 changes: 115 additions & 1 deletion arrow_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"maps"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/apache/arrow-go/v18/arrow/ipc"
Expand Down Expand Up @@ -76,7 +78,7 @@ func (asb *ArrowStreamBatch) GetStream(ctx context.Context) (io.ReadCloser, erro
return nil, err
}
}
return asb.rr, nil
return newCancelableStream(ctx, asb.rr), nil
}

// streamWrapReader wraps an io.Reader so that Close closes the underlying body.
Expand All @@ -94,6 +96,118 @@ func (w *streamWrapReader) Close() error {
return w.wrapped.Close()
}

// interrupt closes the raw response body directly. This bypasses gzip.Reader
// cleanup because the gzip layer may still be blocked waiting for more bytes,
// while closing the transport body is what unblocks the in-flight read.
func (w *streamWrapReader) interrupt() error {
return w.wrapped.Close()
}

// interruptibleReader lets cancelableStream bypass wrapper cleanup and close
// the raw transport body that actually unblocks a stalled read.
type interruptibleReader interface {
interrupt() error
}

// cancelableStream watches the stream context so cancellation can interrupt a
// stalled read, normalizes terminal transport errors caused by that interrupt
// to ctx.Err(), and still preserves true successful completion and EOF.
type cancelableStream struct {
ctx context.Context

inner io.ReadCloser
interrupt func() error

done chan struct{}
stopOnce sync.Once
interruptOnce sync.Once
completed atomic.Bool
}

func newCancelableStream(ctx context.Context, inner io.ReadCloser) io.ReadCloser {
out := &cancelableStream{
ctx: ctx,
inner: inner,
interrupt: inner.Close,
done: make(chan struct{}),
}
if interruptible, ok := inner.(interruptibleReader); ok {
out.interrupt = interruptible.interrupt
}
if ctx.Done() == nil {
return out
}

go func() {
select {
case <-ctx.Done():
if out.watchingStopped() || out.completed.Load() {
return
}
out.interruptOnce.Do(func() {
_ = out.interrupt()
})
case <-out.done:
}
}()
return out
}

func (r *cancelableStream) stopWatching() {
r.stopOnce.Do(func() {
close(r.done)
})
}

func (r *cancelableStream) watchingStopped() bool {
select {
case <-r.done:
return true
default:
return false
}
}

func (r *cancelableStream) Read(p []byte) (int, error) {
n, err := r.inner.Read(p)
if err == nil {
return n, nil
}
defer r.stopWatching()

// Preserve the successful final read where io.Reader returns payload and EOF
// together. Only terminal reads with no delivered bytes are candidates for
// cancellation rewriting.
if err == io.EOF && n > 0 {
r.completed.Store(true)
return n, err
}

if normalized := r.canceledReadErr(); normalized != nil {
return n, normalized
}

if err == io.EOF {
r.completed.Store(true)
}
return n, err
}

func (r *cancelableStream) canceledReadErr() error {
ctxErr := r.ctx.Err()
if ctxErr == nil || r.completed.Load() {
return nil
}
// A canceled context wins over terminal transport errors even if the watcher
// goroutine has not yet unblocked the read before the terminal error arrives.
return ctxErr
}

func (r *cancelableStream) Close() error {
r.stopWatching()
return r.inner.Close()
}

func (asb *ArrowStreamBatch) downloadChunkStreamHelper(ctx context.Context) error {
headers := make(map[string]string)
if len(asb.scd.ChunkHeader) > 0 {
Expand Down
Loading
Loading