Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 88 additions & 45 deletions pkg/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func finalizeDPCResponse(
w http.ResponseWriter, r *http.Request, rsc *request.Resources,
rts timeseries.Timeseries, rh http.Header, sc int,
cacheStatus status.LookupStatus, ffStatus string, elapsed float64,
missRanges timeseries.ExtentList, uncachedValueCount int64,
missRanges, failed timeseries.ExtentList, uncachedValueCount int64,
key string, o *bo.Options, rlo *timeseries.RequestOptions,
modeler *timeseries.Modeler, wireBody []byte,
) {
Expand All @@ -153,7 +153,7 @@ func finalizeDPCResponse(
// Respond to the user. Using the response headers from a Delta Response,
// so as to not map conflict with cacheData on WriteCache
logDeltaRoutine(dpStatus)
recordDPCResult(r, cacheStatus, sc, r.URL.Path, ffStatus, elapsed, missRanges, rh)
recordDPCResult(r, cacheStatus, sc, r.URL.Path, ffStatus, elapsed, missRanges, failed, rh)

rsc.TS = rts
Respond(w, 0, rh, nil) // body and code are nil so this only sets appropriate headers; no writes
Expand Down Expand Up @@ -280,13 +280,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
v, sfErr, _ := dpcGroup.Do(sfKey, func() (any, error) {
isExecutor = true
// buildErrorResult constructs a dpcResult for error responses.
buildErrorResult := func(sc int, h http.Header, body []byte) *dpcResult {
buildErrorResult := func(sc int, h http.Header, body []byte, fext timeseries.ExtentList) *dpcResult {
return &dpcResult{
statusCode: sc,
headers: h,
body: body,
elapsed: float64(time.Since(now).Seconds()),
cacheStatus: status.LookupStatusProxyError,
statusCode: sc,
headers: h,
body: body,
elapsed: float64(time.Since(now).Seconds()),
cacheStatus: status.LookupStatusProxyError,
failedExtents: fext,
}
}

Expand All @@ -295,12 +296,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
var elapsed time.Duration
var cacheStatus status.LookupStatus
var missRanges, cvr timeseries.ExtentList
var failedExts timeseries.ExtentList
var severFault bool

doc, cacheStatus, _, err = QueryCache(ctx, cache, key, nil, modeler.CacheUnmarshaler)
if cacheStatus == status.LookupStatusKeyMiss && errors.Is(err, tc.ErrKNF) {
cts, doc, elapsed, err = fetchTimeseries(pr, trq, client, modeler)
if err != nil {
return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body), nil
cts, doc, elapsed, failedExts, severFault = fetchTimeseries(pr, trq, client, modeler)
if failedExts != nil && severFault {
return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body, failedExts), nil
}
} else {
if doc == nil {
Expand All @@ -312,9 +315,9 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
logger.Error("cache object unmarshaling failed",
logging.Pairs{"key": key, "backendName": client.Name(), "detail": err.Error()})
go cache.Remove(key)
cts, doc, elapsed, err = fetchTimeseries(pr, trq, client, modeler)
if err != nil {
return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body), nil
cts, doc, elapsed, failedExts, severFault = fetchTimeseries(pr, trq, client, modeler)
if failedExts != nil && severFault {
return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body, failedExts), nil
}
// entry was removed and data came from origin; don't inherit the pre-recovery status
cacheStatus = status.LookupStatusKeyMiss
Expand Down Expand Up @@ -375,12 +378,13 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
frsc.TimeRangeQuery = trq
var mts timeseries.List
var mresp *http.Response

fetchHeaders := http.Header(doc.Headers).Clone()
mts, _, mresp, ferr := fetchExtents(missRanges, frsc,
mts, _, mresp, failedExts, severFault = fetchExtents(missRanges, frsc,
fetchHeaders, client, pr, modeler.WireUnmarshalerReader, span)
if ferr != nil {
if failedExts != nil && severFault {
return buildErrorResult(mresp.StatusCode, mresp.Header.Clone(),
func() []byte { b, _ := io.ReadAll(mresp.Body); return b }()), nil
func() []byte { b, _ := io.ReadAll(mresp.Body); return b }(), failedExts), nil
}
doc.Headers = fetchHeaders
// Merge the new delta timeseries into the cached timeseries
Expand Down Expand Up @@ -478,6 +482,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
uncachedValueCount: uncachedValueCount,
cacheStatus: cacheStatus,
missRanges: missRanges,
failedExtents: failedExts,
}, nil
})

Expand All @@ -500,7 +505,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
if result.cacheStatus == status.LookupStatusProxyError {
rh := result.headers.Clone()
recordDPCResult(r, status.LookupStatusProxyError, result.statusCode,
r.URL.Path, "", result.elapsed, nil, rh)
r.URL.Path, "", result.elapsed, nil, result.failedExtents, rh)
Respond(w, result.statusCode, rh, bytes.NewReader(result.body))
return
}
Expand All @@ -524,14 +529,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
rts := result.rts.Clone()
finalizeDPCResponse(w, r, rsc, rts, rh, sc,
cacheStatus, result.ffStatus, result.elapsed, result.missRanges,
result.uncachedValueCount, key, o, rlo, modeler, nil)
result.failedExtents, result.uncachedValueCount, key, o, rlo, modeler, nil)
return
}

// normal path: serve the pre-marshaled wire bytes directly
finalizeDPCResponse(w, r, rsc, result.rts, rh, sc,
cacheStatus, result.ffStatus, result.elapsed, result.missRanges,
result.uncachedValueCount, key, o, rlo, modeler, result.wireBody)
result.failedExtents, result.uncachedValueCount, key, o, rlo, modeler, result.wireBody)
return
}

Expand All @@ -542,11 +547,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
cacheStatus = status.LookupStatusPurge
go cache.Remove(key)
var cts timeseries.Timeseries
cts, doc, elapsed, err = fetchTimeseries(pr, trq, client, modeler)
if err != nil {
var failedExts timeseries.ExtentList
var severFault bool

cts, doc, elapsed, failedExts, severFault = fetchTimeseries(pr, trq, client, modeler)
if failedExts != nil && severFault {
h := doc.SafeHeaderClone()
recordDPCResult(r, status.LookupStatusProxyError, doc.StatusCode,
r.URL.Path, "", elapsed.Seconds(), nil, h)
r.URL.Path, "", elapsed.Seconds(), nil, failedExts, h)
Respond(w, doc.StatusCode, h, bytes.NewReader(doc.Body))
return
}
Expand All @@ -562,7 +570,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
sc := doc.StatusCode

finalizeDPCResponse(w, r, rsc, rts, rh, sc,
cacheStatus, ffStatus, elapsed.Seconds(), missRanges, uncachedValueCount,
cacheStatus, ffStatus, elapsed.Seconds(), missRanges, failedExts, uncachedValueCount,
key, o, rlo, modeler, nil)
}

Expand All @@ -576,10 +584,12 @@ var dpcEncodingProfile = &profile.Profile{
SupportedHeaderVal: providers.AllSupportedWebProviders,
}

func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery,
client backends.TimeseriesBackend, modeler *timeseries.Modeler) (timeseries.Timeseries,
*HTTPDocument, time.Duration, error,
) {
func fetchTimeseries(
pr *proxyRequest,
trq *timeseries.TimeRangeQuery,
client backends.TimeseriesBackend,
modeler *timeseries.Modeler,
) (timeseries.Timeseries, *HTTPDocument, time.Duration, timeseries.ExtentList, bool) {
rsc := pr.rsc.Clone()
o := rsc.BackendOptions
pc := rsc.PathConfig
Expand All @@ -598,13 +608,13 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery,
pr.upstreamRequest = request.SetResources(pr.upstreamRequest.WithContext(ctx), rsc)

start := time.Now()
mts, _, resp, err := fetchExtents(timeseries.ExtentList{trq.Extent}.Splice(trq.Step,
mts, _, resp, failedExts, faultStatus := fetchExtents(timeseries.ExtentList{trq.Extent}.Splice(trq.Step,
o.MaxShardSizeTime, o.ShardStep, o.MaxShardSizePoints), rsc,
http.Header{}, client, pr, modeler.WireUnmarshalerReader, nil)

// elaspsed measures only the time spent making origin requests
var elapsed time.Duration
if err == nil {
if failedExts == nil {
elapsed = time.Since(start)
}

Expand All @@ -617,7 +627,7 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery,
Headers: resp.Header,
}

if err != nil {
if failedExts != nil && faultStatus {
// Capture the upstream error body so collapsed singleflight waiters
// and negative-cache entries see the origin's error detail instead
// of an empty body.
Expand All @@ -639,7 +649,7 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery,
d.Body = b
}
}
return nil, d, time.Duration(0), err
return nil, d, time.Duration(0), failedExts, faultStatus
}

var ts timeseries.Timeseries
Expand All @@ -650,15 +660,19 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery,
ts.Merge(true, mts[1:]...)
}

return ts, d, elapsed, nil
return ts, d, elapsed, failedExts, false
}

func recordDPCResult(r *http.Request, cacheStatus status.LookupStatus,
httpStatus int, path, ffStatus string, elapsed float64,
needed timeseries.ExtentList, header http.Header,
func recordDPCResult(
r *http.Request,
cacheStatus status.LookupStatus,
httpStatus int,
path, ffStatus string,
elapsed float64,
needed, failed timeseries.ExtentList, header http.Header,
) {
recordResults(r, "DeltaProxyCache", cacheStatus, httpStatus, path, ffStatus,
elapsed, needed, header)
elapsed, needed, failed, header)
}

func getDecoderReader(resp *http.Response) io.Reader {
Expand All @@ -675,16 +689,21 @@ func getDecoderReader(resp *http.Response) io.Reader {
}

// this will concurrently fetch provided requested extents
func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Header,
client backends.TimeseriesBackend, pr *proxyRequest, wur timeseries.UnmarshalerReaderFunc,
func fetchExtents(
el timeseries.ExtentList,
rsc *request.Resources,
h http.Header,
client backends.TimeseriesBackend,
pr *proxyRequest,
wur timeseries.UnmarshalerReaderFunc,
span trace.Span,
) (timeseries.List, int64, *http.Response, error) {
) (timeseries.List, int64, *http.Response, timeseries.ExtentList, bool) {
var uncachedValueCount atomic.Int64
var appendLock, respLock sync.Mutex
errs := make([]error, len(el))

// the list of time series created from the responses
mts := make(timeseries.List, len(el))
errTs := make(timeseries.ExtentList, len(el))
// the meta-response aggregating all upstream responses
mresp := &http.Response{Header: h}

Expand Down Expand Up @@ -728,7 +747,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
// Mid-stream read failure: 2xx + empty body would fall through
// both branches below and nil-deref downstream in the merge.
if fetchErr != nil {
errs[i] = fetchErr
errTs[i] = el[i]
return nil
}

Expand All @@ -737,7 +756,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
if ferr != nil {
logger.Error("proxy object unmarshaling failed",
logging.Pairs{"detail": ferr.Error()})
errs[i] = ferr
errTs[i] = el[i]
return nil
}
uncachedValueCount.Add(nts.ValueCount())
Expand All @@ -748,7 +767,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
appendLock.Unlock()
mts[i] = nts
} else if resp.StatusCode != http.StatusOK {
errs[i] = tpe.ErrUnexpectedUpstreamResponse
errTs[i] = el[i]
Copy link
Copy Markdown
Contributor

@crandles crandles May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when are we using these error values? (it doesn't look like we are, is that intended? would expect we log these somewhere). previously we were returning errors.Join(errs...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the use of errors in favor of returning the failed extents in the headers. The errors are still being logged, in particular UnexpectedUpstreamResponse and the unmarshaling errors. The only case that is not currently logged is the error coming from the Fetch function call.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error from the Fetch function is also logged within the function itself, e.g.:

if err != nil {
		logger.Error("error reading body from http response",
			logging.Pairs{"url": pr.URL.String(), "detail": err.Error()})
		return body, resp, 0, err
}

var b []byte
var s string
if resp.Body != nil {
Expand Down Expand Up @@ -784,5 +803,29 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade
})
}
eg.Wait()
return mts, uncachedValueCount.Load(), mresp, errors.Join(errs...)

fullFaults := false
trimmedList := trimEmptyExtents(errTs)
if trimmedList.Len() == el.Len() {
fullFaults = true
}

return mts, uncachedValueCount.Load(), mresp, trimmedList, fullFaults
}

func trimEmptyExtents(failedExtents timeseries.ExtentList) timeseries.ExtentList {
trimmedList := make(timeseries.ExtentList, len(failedExtents))
emptyExtent := timeseries.Extent{}

var cursor int
for _, extent := range failedExtents {
if extent == emptyExtent {
continue
}

trimmedList[cursor] = extent
cursor++
}

return trimmedList[:cursor]
}
Loading
Loading