diff --git a/pkg/proxy/engines/deltaproxycache.go b/pkg/proxy/engines/deltaproxycache.go index e7c513b22..bfc95fca3 100644 --- a/pkg/proxy/engines/deltaproxycache.go +++ b/pkg/proxy/engines/deltaproxycache.go @@ -130,7 +130,7 @@ func finalizeDPCResponse( w http.ResponseWriter, r *http.Request, rsc *request.Resources, rts timeseries.Timeseries, rh http.Header, sc int, cacheStatus status.LookupStatus, ffStatus string, elapsed float64, - missRanges timeseries.ExtentList, uncachedValueCount int64, + missRanges, failed timeseries.ExtentList, uncachedValueCount int64, key string, o *bo.Options, rlo *timeseries.RequestOptions, modeler *timeseries.Modeler, wireBody []byte, ) { @@ -153,7 +153,7 @@ func finalizeDPCResponse( // Respond to the user. Using the response headers from a Delta Response, // so as to not map conflict with cacheData on WriteCache logDeltaRoutine(dpStatus) - recordDPCResult(r, cacheStatus, sc, r.URL.Path, ffStatus, elapsed, missRanges, rh) + recordDPCResult(r, cacheStatus, sc, r.URL.Path, ffStatus, elapsed, missRanges, failed, rh) rsc.TS = rts Respond(w, 0, rh, nil) // body and code are nil so this only sets appropriate headers; no writes @@ -280,13 +280,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim v, sfErr, _ := dpcGroup.Do(sfKey, func() (any, error) { isExecutor = true // buildErrorResult constructs a dpcResult for error responses. - buildErrorResult := func(sc int, h http.Header, body []byte) *dpcResult { + buildErrorResult := func(sc int, h http.Header, body []byte, fext timeseries.ExtentList) *dpcResult { return &dpcResult{ - statusCode: sc, - headers: h, - body: body, - elapsed: float64(time.Since(now).Seconds()), - cacheStatus: status.LookupStatusProxyError, + statusCode: sc, + headers: h, + body: body, + elapsed: float64(time.Since(now).Seconds()), + cacheStatus: status.LookupStatusProxyError, + failedExtents: fext, } } @@ -295,12 +296,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim var elapsed time.Duration var cacheStatus status.LookupStatus var missRanges, cvr timeseries.ExtentList + var failedExts timeseries.ExtentList + var severFault bool doc, cacheStatus, _, err = QueryCache(ctx, cache, key, nil, modeler.CacheUnmarshaler) if cacheStatus == status.LookupStatusKeyMiss && errors.Is(err, tc.ErrKNF) { - cts, doc, elapsed, err = fetchTimeseries(pr, trq, client, modeler) - if err != nil { - return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body), nil + cts, doc, elapsed, failedExts, severFault = fetchTimeseries(pr, trq, client, modeler) + if failedExts != nil && severFault { + return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body, failedExts), nil } } else { if doc == nil { @@ -312,9 +315,9 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim logger.Error("cache object unmarshaling failed", logging.Pairs{"key": key, "backendName": client.Name(), "detail": err.Error()}) go cache.Remove(key) - cts, doc, elapsed, err = fetchTimeseries(pr, trq, client, modeler) - if err != nil { - return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body), nil + cts, doc, elapsed, failedExts, severFault = fetchTimeseries(pr, trq, client, modeler) + if failedExts != nil && severFault { + return buildErrorResult(doc.StatusCode, doc.SafeHeaderClone(), doc.Body, failedExts), nil } // entry was removed and data came from origin; don't inherit the pre-recovery status cacheStatus = status.LookupStatusKeyMiss @@ -375,12 +378,13 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim frsc.TimeRangeQuery = trq var mts timeseries.List var mresp *http.Response + fetchHeaders := http.Header(doc.Headers).Clone() - mts, _, mresp, ferr := fetchExtents(missRanges, frsc, + mts, _, mresp, failedExts, severFault = fetchExtents(missRanges, frsc, fetchHeaders, client, pr, modeler.WireUnmarshalerReader, span) - if ferr != nil { + if failedExts != nil && severFault { return buildErrorResult(mresp.StatusCode, mresp.Header.Clone(), - func() []byte { b, _ := io.ReadAll(mresp.Body); return b }()), nil + func() []byte { b, _ := io.ReadAll(mresp.Body); return b }(), failedExts), nil } doc.Headers = fetchHeaders // Merge the new delta timeseries into the cached timeseries @@ -478,6 +482,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim uncachedValueCount: uncachedValueCount, cacheStatus: cacheStatus, missRanges: missRanges, + failedExtents: failedExts, }, nil }) @@ -500,7 +505,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim if result.cacheStatus == status.LookupStatusProxyError { rh := result.headers.Clone() recordDPCResult(r, status.LookupStatusProxyError, result.statusCode, - r.URL.Path, "", result.elapsed, nil, rh) + r.URL.Path, "", result.elapsed, nil, result.failedExtents, rh) Respond(w, result.statusCode, rh, bytes.NewReader(result.body)) return } @@ -524,14 +529,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim rts := result.rts.Clone() finalizeDPCResponse(w, r, rsc, rts, rh, sc, cacheStatus, result.ffStatus, result.elapsed, result.missRanges, - result.uncachedValueCount, key, o, rlo, modeler, nil) + result.failedExtents, result.uncachedValueCount, key, o, rlo, modeler, nil) return } // normal path: serve the pre-marshaled wire bytes directly finalizeDPCResponse(w, r, rsc, result.rts, rh, sc, cacheStatus, result.ffStatus, result.elapsed, result.missRanges, - result.uncachedValueCount, key, o, rlo, modeler, result.wireBody) + result.failedExtents, result.uncachedValueCount, key, o, rlo, modeler, result.wireBody) return } @@ -542,11 +547,14 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim cacheStatus = status.LookupStatusPurge go cache.Remove(key) var cts timeseries.Timeseries - cts, doc, elapsed, err = fetchTimeseries(pr, trq, client, modeler) - if err != nil { + var failedExts timeseries.ExtentList + var severFault bool + + cts, doc, elapsed, failedExts, severFault = fetchTimeseries(pr, trq, client, modeler) + if failedExts != nil && severFault { h := doc.SafeHeaderClone() recordDPCResult(r, status.LookupStatusProxyError, doc.StatusCode, - r.URL.Path, "", elapsed.Seconds(), nil, h) + r.URL.Path, "", elapsed.Seconds(), nil, failedExts, h) Respond(w, doc.StatusCode, h, bytes.NewReader(doc.Body)) return } @@ -562,7 +570,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim sc := doc.StatusCode finalizeDPCResponse(w, r, rsc, rts, rh, sc, - cacheStatus, ffStatus, elapsed.Seconds(), missRanges, uncachedValueCount, + cacheStatus, ffStatus, elapsed.Seconds(), missRanges, failedExts, uncachedValueCount, key, o, rlo, modeler, nil) } @@ -576,10 +584,12 @@ var dpcEncodingProfile = &profile.Profile{ SupportedHeaderVal: providers.AllSupportedWebProviders, } -func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery, - client backends.TimeseriesBackend, modeler *timeseries.Modeler) (timeseries.Timeseries, - *HTTPDocument, time.Duration, error, -) { +func fetchTimeseries( + pr *proxyRequest, + trq *timeseries.TimeRangeQuery, + client backends.TimeseriesBackend, + modeler *timeseries.Modeler, +) (timeseries.Timeseries, *HTTPDocument, time.Duration, timeseries.ExtentList, bool) { rsc := pr.rsc.Clone() o := rsc.BackendOptions pc := rsc.PathConfig @@ -598,13 +608,13 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery, pr.upstreamRequest = request.SetResources(pr.upstreamRequest.WithContext(ctx), rsc) start := time.Now() - mts, _, resp, err := fetchExtents(timeseries.ExtentList{trq.Extent}.Splice(trq.Step, + mts, _, resp, failedExts, faultStatus := fetchExtents(timeseries.ExtentList{trq.Extent}.Splice(trq.Step, o.MaxShardSizeTime, o.ShardStep, o.MaxShardSizePoints), rsc, http.Header{}, client, pr, modeler.WireUnmarshalerReader, nil) // elaspsed measures only the time spent making origin requests var elapsed time.Duration - if err == nil { + if failedExts == nil { elapsed = time.Since(start) } @@ -617,7 +627,7 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery, Headers: resp.Header, } - if err != nil { + if failedExts != nil && faultStatus { // Capture the upstream error body so collapsed singleflight waiters // and negative-cache entries see the origin's error detail instead // of an empty body. @@ -639,7 +649,7 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery, d.Body = b } } - return nil, d, time.Duration(0), err + return nil, d, time.Duration(0), failedExts, faultStatus } var ts timeseries.Timeseries @@ -650,15 +660,19 @@ func fetchTimeseries(pr *proxyRequest, trq *timeseries.TimeRangeQuery, ts.Merge(true, mts[1:]...) } - return ts, d, elapsed, nil + return ts, d, elapsed, failedExts, false } -func recordDPCResult(r *http.Request, cacheStatus status.LookupStatus, - httpStatus int, path, ffStatus string, elapsed float64, - needed timeseries.ExtentList, header http.Header, +func recordDPCResult( + r *http.Request, + cacheStatus status.LookupStatus, + httpStatus int, + path, ffStatus string, + elapsed float64, + needed, failed timeseries.ExtentList, header http.Header, ) { recordResults(r, "DeltaProxyCache", cacheStatus, httpStatus, path, ffStatus, - elapsed, needed, header) + elapsed, needed, failed, header) } func getDecoderReader(resp *http.Response) io.Reader { @@ -675,16 +689,21 @@ func getDecoderReader(resp *http.Response) io.Reader { } // this will concurrently fetch provided requested extents -func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Header, - client backends.TimeseriesBackend, pr *proxyRequest, wur timeseries.UnmarshalerReaderFunc, +func fetchExtents( + el timeseries.ExtentList, + rsc *request.Resources, + h http.Header, + client backends.TimeseriesBackend, + pr *proxyRequest, + wur timeseries.UnmarshalerReaderFunc, span trace.Span, -) (timeseries.List, int64, *http.Response, error) { +) (timeseries.List, int64, *http.Response, timeseries.ExtentList, bool) { var uncachedValueCount atomic.Int64 var appendLock, respLock sync.Mutex - errs := make([]error, len(el)) // the list of time series created from the responses mts := make(timeseries.List, len(el)) + errTs := make(timeseries.ExtentList, len(el)) // the meta-response aggregating all upstream responses mresp := &http.Response{Header: h} @@ -728,7 +747,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade // Mid-stream read failure: 2xx + empty body would fall through // both branches below and nil-deref downstream in the merge. if fetchErr != nil { - errs[i] = fetchErr + errTs[i] = el[i] return nil } @@ -737,7 +756,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade if ferr != nil { logger.Error("proxy object unmarshaling failed", logging.Pairs{"detail": ferr.Error()}) - errs[i] = ferr + errTs[i] = el[i] return nil } uncachedValueCount.Add(nts.ValueCount()) @@ -748,7 +767,7 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade appendLock.Unlock() mts[i] = nts } else if resp.StatusCode != http.StatusOK { - errs[i] = tpe.ErrUnexpectedUpstreamResponse + errTs[i] = el[i] var b []byte var s string if resp.Body != nil { @@ -784,5 +803,29 @@ func fetchExtents(el timeseries.ExtentList, rsc *request.Resources, h http.Heade }) } eg.Wait() - return mts, uncachedValueCount.Load(), mresp, errors.Join(errs...) + + fullFaults := false + trimmedList := trimEmptyExtents(errTs) + if trimmedList.Len() == el.Len() { + fullFaults = true + } + + return mts, uncachedValueCount.Load(), mresp, trimmedList, fullFaults +} + +func trimEmptyExtents(failedExtents timeseries.ExtentList) timeseries.ExtentList { + trimmedList := make(timeseries.ExtentList, len(failedExtents)) + emptyExtent := timeseries.Extent{} + + var cursor int + for _, extent := range failedExtents { + if extent == emptyExtent { + continue + } + + trimmedList[cursor] = extent + cursor++ + } + + return trimmedList[:cursor] } diff --git a/pkg/proxy/engines/deltaproxycache_test.go b/pkg/proxy/engines/deltaproxycache_test.go index 73bd7632b..9e840b1c7 100644 --- a/pkg/proxy/engines/deltaproxycache_test.go +++ b/pkg/proxy/engines/deltaproxycache_test.go @@ -565,6 +565,111 @@ func TestDeltaProxyCacheRequestPartialHit(t *testing.T) { } } +// TestDeltaProxyCacheRequestPartialHitWithFailedExtents verifies that when +// a partial hit occurs and the upstream request for the missing fragment fails, +// the failed extents are properly tracked and reported in the response header. +func TestDeltaProxyCacheRequestPartialHitWithFailedExtents(t *testing.T) { + ts, w, r, rsc, err := setupTestHarnessDPC() + if err != nil { + t.Error(err) + } + defer ts.Close() + + client := rsc.BackendClient.(*TestClient) + o := rsc.BackendOptions + rsc.CacheConfig.Provider = "test" + + client.RangeCacheKey = "test-range-key-phit-failed" + client.InstantCacheKey = "test-instant-key-phit-failed" + + o.FastForwardDisable = true + + step := time.Duration(300) * time.Second + now := time.Now() + end := now.Add(-time.Duration(12) * time.Hour) + + extr := timeseries.Extent{Start: end.Add(-time.Duration(18) * time.Hour), End: end} + extn := timeseries.Extent{Start: normalizeTime(extr.Start, step), End: normalizeTime(extr.End, step)} + + // First request: populate cache with successful data + expected, _, _ := mockprom.GetTimeSeriesData(queryReturnsOKNoLatency, extn.Start, extn.End, step) + + u := r.URL + u.Path = "/prometheus/api/v1/query_range" + u.RawQuery = fmt.Sprintf("step=%d&start=%d&end=%d&query=%s&rk=%s&ik=%s", int(step.Seconds()), + extr.Start.Unix(), extr.End.Unix(), queryReturnsOKNoLatency, client.RangeCacheKey, client.InstantCacheKey) + + client.QueryRangeHandler(w, r) + resp := w.Result() + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Error(err) + } + + err = testStringMatch(string(bodyBytes), expected) + if err != nil { + t.Error(err) + } + + err = testStatusCodeMatch(resp.StatusCode, http.StatusOK) + if err != nil { + t.Error(err) + } + + err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "kmiss"}) + if err != nil { + t.Error(err) + } + + // Give time for the object to be written to cache + time.Sleep(time.Millisecond * 10) + + // Second request: partial hit - extend the upper range + // This should cause a partial hit where we need to fetch the new upper fragment. + // But we'll use a query that fails (queryReturnsBadGateway) for this fragment. + phitStart := normalizeTime(extr.End.Add(step), step) + extr.End = extr.End.Add(time.Duration(1) * time.Hour) + extn.End = normalizeTime(extr.End, step) + + // The extent that we should fetch for the partial hit + extentToFetch := timeseries.Extent{Start: phitStart, End: extn.End} + expectedFailed := "[" + timeseries.ExtentList{extentToFetch}.String() + "]" + + // Use queryReturnsBadGateway which will return 502 for the upper fragment + u.RawQuery = fmt.Sprintf("step=%d&start=%d&end=%d&query=%s&rk=%s&ik=%s", int(step.Seconds()), + extr.Start.Unix(), extr.End.Unix(), queryReturnsBadGateway, client.RangeCacheKey, client.InstantCacheKey) + + r.URL = u + time.Sleep(time.Millisecond * 10) + + w = httptest.NewRecorder() + client.QueryRangeHandler(w, r) + resp = w.Result() + + // The response should be 502 because the partial fetch failed + err = testStatusCodeMatch(resp.StatusCode, http.StatusBadGateway) + if err != nil { + t.Error(err) + } + + // Check that we have a proxy-error status + err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "proxy-error"}) + if err != nil { + t.Error(err) + } + + // Verify that failed extents are present in the header + err = testResultHeaderPartMatch(resp.Header, map[string]string{"failed": expectedFailed}) + if err != nil { + t.Error(err) + } + + // Log the header for debugging purposes + resultHdr := resp.Header.Get(headers.NameTricksterResult) + t.Logf("Result Header: %s", resultHdr) +} + func TestDeltayProxyCacheRequestDeltaFetchError(t *testing.T) { ts, w, r, rsc, err := setupTestHarnessDPC() if err != nil { diff --git a/pkg/proxy/engines/httpproxy.go b/pkg/proxy/engines/httpproxy.go index e622f8a86..fc4937edd 100644 --- a/pkg/proxy/engines/httpproxy.go +++ b/pkg/proxy/engines/httpproxy.go @@ -139,7 +139,7 @@ func DoProxy(w io.Writer, r *http.Request, closeResponse bool) *http.Response { if resp != nil && rsc != nil && (rsc.IsMergeMember || rsc.TSTransformer != nil) { rsc.Response = resp recordResults(r, "HTTPProxy", cacheStatusCode, resp.StatusCode, - r.URL.Path, "", elapsed.Seconds(), nil, resp.Header) + r.URL.Path, "", elapsed.Seconds(), nil, nil, resp.Header) } return resp @@ -328,12 +328,18 @@ func setStatusHeader(httpStatus int, header http.Header) status.LookupStatus { if httpStatus >= http.StatusBadRequest { st = status.LookupStatusProxyError } - headers.SetResultsHeader(header, "HTTPProxy", st.String(), "", nil) + headers.SetResultsHeader(header, "HTTPProxy", st.String(), "", nil, nil) return st } -func recordResults(r *http.Request, engine string, cacheStatus status.LookupStatus, - statusCode int, path, ffStatus string, elapsed float64, extents timeseries.ExtentList, +func recordResults( + r *http.Request, + engine string, + cacheStatus status.LookupStatus, + statusCode int, + path, ffStatus string, + elapsed float64, + extents, failed timeseries.ExtentList, header http.Header, ) { rsc := request.GetResources(r) @@ -349,5 +355,5 @@ func recordResults(r *http.Request, engine string, cacheStatus status.LookupStat metrics.ProxyRequestDuration.WithLabelValues(lvs...).Observe(elapsed) } } - headers.SetResultsHeader(header, engine, s, ffStatus, extents) + headers.SetResultsHeader(header, engine, s, ffStatus, extents, failed) } diff --git a/pkg/proxy/engines/objectproxycache.go b/pkg/proxy/engines/objectproxycache.go index e1e7ee3e6..3abc6cbf3 100644 --- a/pkg/proxy/engines/objectproxycache.go +++ b/pkg/proxy/engines/objectproxycache.go @@ -587,6 +587,6 @@ func recordOPCResult(pr *proxyRequest, cacheStatus status.LookupStatus, httpStat path string, elapsed float64, header http.Header, ) { pr.mapLock.Lock() - recordResults(pr.Request, "ObjectProxyCache", cacheStatus, httpStatus, path, "", elapsed, nil, header) + recordResults(pr.Request, "ObjectProxyCache", cacheStatus, httpStatus, path, "", elapsed, nil, nil, header) pr.mapLock.Unlock() } diff --git a/pkg/proxy/engines/proxy_request.go b/pkg/proxy/engines/proxy_request.go index bf754ad79..d621223d6 100644 --- a/pkg/proxy/engines/proxy_request.go +++ b/pkg/proxy/engines/proxy_request.go @@ -378,7 +378,7 @@ func (pr *proxyRequest) stripConditionalHeaders() { func (pr *proxyRequest) writeResponseHeader() { pr.mapLock.Lock() - headers.SetResultsHeader(pr.upstreamResponse.Header, "ObjectProxyCache", pr.cacheStatus.String(), "", nil) + headers.SetResultsHeader(pr.upstreamResponse.Header, "ObjectProxyCache", pr.cacheStatus.String(), "", nil, nil) pr.mapLock.Unlock() } diff --git a/pkg/proxy/engines/singleflight.go b/pkg/proxy/engines/singleflight.go index 2c17fddef..e1f3606f9 100644 --- a/pkg/proxy/engines/singleflight.go +++ b/pkg/proxy/engines/singleflight.go @@ -80,4 +80,5 @@ type dpcResult struct { uncachedValueCount int64 cacheStatus status.LookupStatus missRanges timeseries.ExtentList + failedExtents timeseries.ExtentList // populated only is case of failed extents } diff --git a/pkg/proxy/headers/result_header.go b/pkg/proxy/headers/result_header.go index da58d2abb..b39d9a784 100644 --- a/pkg/proxy/headers/result_header.go +++ b/pkg/proxy/headers/result_header.go @@ -30,6 +30,7 @@ type ResultHeaderParts struct { Engine string Status string Fetched timeseries.ExtentList + FailedFetch timeseries.ExtentList FastForwardStatus string } @@ -45,15 +46,18 @@ func (p ResultHeaderParts) String() string { if p.FastForwardStatus != "" { sb.WriteString("; ffstatus=" + p.FastForwardStatus) } + if len(p.FailedFetch) > 0 { + sb.WriteString("; failed=[" + p.FailedFetch.String() + "]") + } return sb.String() } // SetResultsHeader adds a response header summarizing Trickster's handling of the HTTP request -func SetResultsHeader(headers http.Header, engine, status, ffstatus string, fetched timeseries.ExtentList) { +func SetResultsHeader(headers http.Header, engine, status, ffstatus string, fetched timeseries.ExtentList, failedFetched timeseries.ExtentList) { if headers == nil || engine == "" { return } - p := ResultHeaderParts{Engine: engine, Status: status, Fetched: fetched, FastForwardStatus: ffstatus} + p := ResultHeaderParts{Engine: engine, Status: status, Fetched: fetched, FailedFetch: failedFetched, FastForwardStatus: ffstatus} headers.Set(NameTricksterResult, p.String()) } @@ -96,6 +100,16 @@ func MergeResultHeaderVals(h1, h2 string) string { copy(merged[len(r1.Fetched):], r2.Fetched) r1.Fetched = r1.Fetched.Compress(0) } + + if len(r1.FailedFetch) == 0 { + r1.FailedFetch = r2.FailedFetch + } else if len(r2.FailedFetch) > 0 { + merged := make(timeseries.ExtentList, len(r1.FailedFetch)+len(r2.FailedFetch)) + copy(merged, r1.FailedFetch) + copy(merged[len(r1.FailedFetch):], r2.FailedFetch) + r1.FailedFetch = r1.FailedFetch.Compress(0) + } + return r1.String() } @@ -120,7 +134,7 @@ func parseResultHeaderVals(h string) ResultHeaderParts { if val != "" { r.FastForwardStatus = val } - case "fetched": + case "fetched", "failed": val = strings.NewReplacer("[", "", "]", "").Replace(val) fparts := strings.Split(val, ";") el := make(timeseries.ExtentList, len(fparts)) @@ -142,7 +156,12 @@ func parseResultHeaderVals(h string) ResultHeaderParts { k++ } } - r.Fetched = el[:k] + + if key == "fetched" { + r.Fetched = el[:k] + } else { + r.FailedFetch = el[:k] + } } } } diff --git a/pkg/proxy/headers/result_header_test.go b/pkg/proxy/headers/result_header_test.go index f7630a151..1357fa698 100644 --- a/pkg/proxy/headers/result_header_test.go +++ b/pkg/proxy/headers/result_header_test.go @@ -27,7 +27,7 @@ import ( func TestSetResultsHeader(t *testing.T) { h := http.Header{} SetResultsHeader(h, "test-engine", "test-status", "test-ffstatus", - timeseries.ExtentList{timeseries.Extent{Start: time.Unix(1, 0), End: time.Unix(2, 0)}}) + timeseries.ExtentList{timeseries.Extent{Start: time.Unix(1, 0), End: time.Unix(2, 0)}}, nil) const expected = "engine=test-engine; status=test-status; fetched=[1000-2000]; ffstatus=test-ffstatus" if h.Get(NameTricksterResult) != expected { t.Errorf("expected %s got %s", expected, h.Get(NameTricksterResult)) @@ -37,7 +37,7 @@ func TestSetResultsHeader(t *testing.T) { func TestSetResultsHeaderEmtpy(t *testing.T) { h := http.Header{} SetResultsHeader(h, "", "test-status", "test-ffstatus", - timeseries.ExtentList{timeseries.Extent{Start: time.Unix(1, 0), End: time.Unix(2, 0)}}) + timeseries.ExtentList{timeseries.Extent{Start: time.Unix(1, 0), End: time.Unix(2, 0)}}, nil) if len(h) > 0 { t.Errorf("Expected header length of %d", 0) }