Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2e24c43
feat: add HTTP client request metrics for all KEDA scaler outbound re…
aliaqel-stripe Apr 12, 2026
b6b64e0
fix(aws): initialize SigV4 transport once to avoid per-request alloca…
aliaqel-stripe Apr 12, 2026
011208c
fix(scaling): use fresh context with correct scaler labels for retry …
aliaqel-stripe Apr 12, 2026
e7079f0
refactor(metrics): move httpStatusCodeLabel to metricscollectors.go t…
aliaqel-stripe Apr 12, 2026
1cacc25
test: add missing tests for transport instrumentation, status code la…
aliaqel-stripe Apr 12, 2026
a5636e3
test(scaling): verify buildScalerRequestCtx injects correct context k…
aliaqel-stripe Apr 13, 2026
f00fc01
refactor(metrics): rename to keda_scaler_http_* and only record durin…
aliaqel-stripe Apr 13, 2026
efa0a76
fix(metrics): use accurate log message for OTel instrument registrati…
aliaqel-stripe Apr 13, 2026
a786ee7
chore: update copyright year to 2026 on new files
aliaqel-stripe Apr 13, 2026
90c3e85
chore: use "scaler metric collection" instead of "scaler metric fetch"
aliaqel-stripe Apr 13, 2026
0e9bdc5
feat: reduce histogram label cardinality to scaler + status_code
aliaqel-stripe Apr 13, 2026
9c79fce
fix: gofmt and CHANGELOG sort order for static checks
aliaqel-stripe Apr 13, 2026
5c23149
ci: retrigger static checks
aliaqel-stripe Apr 13, 2026
4c3778d
Merge branch 'main' into feat/http-client-metrics
aliaqel-stripe Apr 20, 2026
2164203
fix: address PR review feedback on HTTP client metrics
aliaqel-stripe Apr 22, 2026
aa5a23c
revert: keep CreateHTTPTransport* names, no rename needed
aliaqel-stripe Apr 22, 2026
f131cc1
refactor: rename CreateHTTPTransport* to CreateRT* to reflect RoundTr…
aliaqel-stripe Apr 22, 2026
ba9aada
docs: update RecordHTTPClientRequest comment per Copilot suggestion
aliaqel-stripe Apr 22, 2026
c0efe49
refactor: move HTTP client metrics helpers into metricscollector
aliaqel-stripe Apr 23, 2026
179604c
refactor: colocate HTTP client metrics helpers
aliaqel-stripe Apr 23, 2026
39950af
Fix http roundtripper test bodyclose issues
aliaqel-stripe Apr 23, 2026
502805c
Merge branch 'main' into feat/http-client-metrics
aliaqel-stripe Apr 23, 2026
7b0abbf
Rename roundtripper locals to rt
aliaqel-stripe Apr 23, 2026
111c39e
Rename HTTP client transport local to rt
aliaqel-stripe Apr 23, 2026
17a36c1
Merge branch 'main' into feat/http-client-metrics
aliaqel-stripe Apr 23, 2026
0c742ca
Merge branch 'main' into feat/http-client-metrics
aliaqel-stripe May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

- **General**: Add CRD-level validation markers (Minimum, MinLength, MinItems, Enum) for ScaledObject, ScaledJob, ScaleTriggers, and TriggerAuthentication API types ([#7533](https://github.com/kedacore/keda/pull/7533))
- **General**: Add `--leader-election-id` flag to allow configuring the leader election Lease name ([#7564](https://github.com/kedacore/keda/issues/7564))
- **General**: Add scaler HTTP request metrics (`keda_scaler_http_requests_total`, `keda_scaler_http_request_duration_seconds`) for outbound HTTP requests made during scaler metric collection ([#6600](https://github.com/kedacore/keda/issues/6600))
- **General**: Allow more control of TLS versions & ciphers via `KEDA_HTTP_TLS_CIPHER_LIST`, `KEDA_SERVICE_TLS_CIPHER_LIST` and `KEDA_SERVICE_MIN_TLS_VERSION` env vars ([#7617](https://github.com/kedacore/keda/pull/7617))
- **General**: Make APIService cert injections optional ([#7559](https://github.com/kedacore/keda/pull/7559))
- **AWS Scalers**: Add support for AWS External ID in TriggerAuthentication podIdentity for all AWS scalers (SQS, Kinesis, DynamoDB, CloudWatch, etc.) to enable cross-account access scenarios ([#6921](https://github.com/kedacore/keda/issues/6921))
Expand Down
108 changes: 108 additions & 0 deletions pkg/metricscollector/http_roundtripper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Copyright 2026 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metricscollector

import (
"context"
"net/http"
"time"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

// contextKey is an unexported type for context keys defined in this package,
// preventing collisions with keys from other packages.
type contextKey string

const (
// ScalerContextKey is the context key used to attach the scaler type name
// (e.g. "prometheus", "redis") to an outbound HTTP request so that metrics
// observers can include it as a dimension.
ScalerContextKey contextKey = "scaler"

// TriggerNameContextKey is the context key used to attach the user-defined
// trigger name to an outbound HTTP request.
TriggerNameContextKey contextKey = "trigger_name"

// MetricNameContextKey is the context key used to attach the metric name
// being queried to an outbound HTTP request.
MetricNameContextKey contextKey = "metric_name"

// NamespaceContextKey is the context key used to attach the namespace of the
// ScaledObject/ScaledJob that owns the scaler making the request.
NamespaceContextKey contextKey = "namespace"

// ScaledResourceContextKey is the context key used to attach the name of the
// ScaledObject/ScaledJob that owns the scaler making the request.
ScaledResourceContextKey contextKey = "scaled_resource"
)

// InstrumentedRoundTripper wraps an http.RoundTripper and records outbound
// HTTP request metrics after every completed round-trip. It reads known
// context keys from the request context to populate metric dimensions. It
// does not buffer or inspect the response body.
type InstrumentedRoundTripper struct {
next http.RoundTripper
}

// NewInstrumentedRoundTripper wraps next with a RoundTripper that records
// HTTP request metrics after every request. If next is nil,
// http.DefaultTransport is used.
func NewInstrumentedRoundTripper(next http.RoundTripper) http.RoundTripper {
if next == nil {
next = http.DefaultTransport
}
return &InstrumentedRoundTripper{next: next}
}

func (r *InstrumentedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := r.next.RoundTrip(req)
duration := time.Since(start).Seconds()

ctx := req.Context()
scaler, scalerOK := ctx.Value(ScalerContextKey).(string)
triggerName, triggerOK := ctx.Value(TriggerNameContextKey).(string)
metricName, metricOK := ctx.Value(MetricNameContextKey).(string)
namespace, nsOK := ctx.Value(NamespaceContextKey).(string)
scaledResource, srOK := ctx.Value(ScaledResourceContextKey).(string)

// Only record metrics for scaler metric collection requests, identified by the
// presence of all five context keys injected by BuildScalerRequestCtx.
// Other HTTP calls (e.g. during scaler initialization) are not recorded.
if !scalerOK || !triggerOK || !metricOK || !nsOK || !srOK {
return resp, err
}

if err != nil {
RecordHTTPClientRequest(duration, 0, true, scaler, triggerName, metricName, namespace, scaledResource)
return nil, err
}
RecordHTTPClientRequest(duration, resp.StatusCode, false, scaler, triggerName, metricName, namespace, scaledResource)
return resp, nil
Comment thread
wozniakjan marked this conversation as resolved.
}

// BuildScalerRequestCtx attaches scaler metadata used by HTTP client
// instrumentation to the outbound request context.
func BuildScalerRequestCtx(ctx context.Context, config scalersconfig.ScalerConfig, metricName string) context.Context {
requestCtx := context.WithValue(ctx, ScalerContextKey, config.TriggerType)
requestCtx = context.WithValue(requestCtx, TriggerNameContextKey, config.TriggerName)
requestCtx = context.WithValue(requestCtx, MetricNameContextKey, metricName)
requestCtx = context.WithValue(requestCtx, NamespaceContextKey, config.ScalableObjectNamespace)
requestCtx = context.WithValue(requestCtx, ScaledResourceContextKey, config.ScalableObjectName)
return requestCtx
}
203 changes: 203 additions & 0 deletions pkg/metricscollector/http_roundtripper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
Copyright 2026 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metricscollector

import (
"context"
"fmt"
"io"
"net/http"
"strings"
"testing"

. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

type mockRoundTripper struct {
resp *http.Response
err error
}

func (m *mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if m.resp != nil && m.resp.Request == nil {
m.resp.Request = req
}

return m.resp, m.err
}

func fakeResponse(statusCode int) *http.Response {
return &http.Response{
StatusCode: statusCode,
Body: io.NopCloser(strings.NewReader("")),
}
}

func withPromCollector(t *testing.T) {
t.Helper()

previousCollectors := collectors
collectors = []MetricsCollector{&PromMetrics{}}
t.Cleanup(func() {
collectors = previousCollectors
})
}

func TestInstrumentedRoundTripper_RecordsSuccessfulResponses(t *testing.T) {
testCases := []struct {
name string
statusCode int
}{
{name: "2xx response", statusCode: http.StatusOK},
{name: "4xx response", statusCode: http.StatusBadRequest},
{name: "5xx response", statusCode: http.StatusInternalServerError},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
RegisterTestingT(t)
withPromCollector(t)

scalerName := fmt.Sprintf("prometheus-%d", tc.statusCode)
triggerName := fmt.Sprintf("trigger-%d", tc.statusCode)
metricName := fmt.Sprintf("metric-%d", tc.statusCode)
scaledResource := fmt.Sprintf("so-%d", tc.statusCode)

response := fakeResponse(tc.statusCode)
rt := NewInstrumentedRoundTripper(&mockRoundTripper{resp: response})

ctx := context.Background()
ctx = context.WithValue(ctx, ScalerContextKey, scalerName)
ctx = context.WithValue(ctx, TriggerNameContextKey, triggerName)
ctx = context.WithValue(ctx, MetricNameContextKey, metricName)
ctx = context.WithValue(ctx, NamespaceContextKey, "default")
ctx = context.WithValue(ctx, ScaledResourceContextKey, scaledResource)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://example.com", nil)
Expect(err).To(BeNil())

resp, err := rt.RoundTrip(req)
Expect(err).To(BeNil())
Expect(resp).NotTo(BeNil())
Expect(resp).To(Equal(response))
defer resp.Body.Close()

counterValue, err := httpClientRequestsTotal.
GetMetricWithLabelValues("default", scaledResource, scalerName, triggerName, metricName, fmt.Sprintf("%d", tc.statusCode))
Expect(err).To(BeNil())
Expect(counterValue).NotTo(BeNil())
m := &dto.Metric{}
err = counterValue.Write(m)
Expect(err).To(BeNil())
Expect(m.Counter.GetValue()).To(BeNumerically("==", 1))

durationHistogram, err := httpClientRequestDuration.
GetMetricWithLabelValues(scalerName, fmt.Sprintf("%d", tc.statusCode))
Expect(err).To(BeNil())
Expect(durationHistogram).NotTo(BeNil())
err = durationHistogram.(prometheus.Metric).Write(m)
Expect(err).To(BeNil())
Expect(m.Histogram.GetSampleCount()).To(BeNumerically("==", 1))
})
}
}

func TestInstrumentedRoundTripper_RecordsTransportErrors(t *testing.T) {
RegisterTestingT(t)
withPromCollector(t)

scalerName := "prometheus-transport-error"
triggerName := "trigger-transport-error"
metricName := "metric-transport-error"
scaledResource := "so-transport-error"

transportErr := io.ErrUnexpectedEOF
rt := NewInstrumentedRoundTripper(&mockRoundTripper{err: transportErr})

ctx := context.Background()
ctx = context.WithValue(ctx, ScalerContextKey, scalerName)
ctx = context.WithValue(ctx, TriggerNameContextKey, triggerName)
ctx = context.WithValue(ctx, MetricNameContextKey, metricName)
ctx = context.WithValue(ctx, NamespaceContextKey, "default")
ctx = context.WithValue(ctx, ScaledResourceContextKey, scaledResource)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://example.com", nil)
Expect(err).To(BeNil())

resp, err := rt.RoundTrip(req)
if resp != nil {
defer resp.Body.Close()
}
Expect(err).To(Equal(transportErr))
Expect(resp).To(BeNil())

counterValue, err := httpClientRequestsTotal.
GetMetricWithLabelValues("default", scaledResource, scalerName, triggerName, metricName, "error")
Expect(err).To(BeNil())
Expect(counterValue).NotTo(BeNil())
m := &dto.Metric{}
err = counterValue.Write(m)
Expect(err).To(BeNil())
Expect(m.Counter.GetValue()).To(BeNumerically("==", 1))

durationHistogram, err := httpClientRequestDuration.
GetMetricWithLabelValues(scalerName, "error")
Expect(err).To(BeNil())
Expect(durationHistogram).NotTo(BeNil())
err = durationHistogram.(prometheus.Metric).Write(m)
Expect(err).To(BeNil())
Expect(m.Histogram.GetSampleCount()).To(BeNumerically("==", 1))
}

func TestInstrumentedRoundTripper_ResponseReturnedUnmodified(t *testing.T) {
RegisterTestingT(t)

response := fakeResponse(http.StatusAccepted)
rt := NewInstrumentedRoundTripper(&mockRoundTripper{resp: response})

req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://example.com", nil)
Expect(err).To(BeNil())

got, err := rt.RoundTrip(req)
Expect(err).To(BeNil())
Expect(got).To(Equal(response))
defer got.Body.Close()
}

func TestInstrumentedRoundTripper_NilNextUsesDefault(t *testing.T) {
RegisterTestingT(t)

rt := NewInstrumentedRoundTripper(nil)
Expect(fmt.Sprintf("%T", rt)).To(Equal("*metricscollector.InstrumentedRoundTripper"))
}

func TestInstrumentedRoundTripper_ScalerContextKey_Missing(t *testing.T) {
RegisterTestingT(t)
response := fakeResponse(200)
rt := NewInstrumentedRoundTripper(&mockRoundTripper{resp: response})

req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://example.com", nil)
Expect(err).To(BeNil())

resp, err := rt.RoundTrip(req)
Expect(err).To(BeNil())
Expect(resp).NotTo(BeNil())
Expect(resp).To(Equal(response))
defer resp.Body.Close()
}
23 changes: 23 additions & 0 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package metricscollector

import (
"strconv"
"time"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
Expand Down Expand Up @@ -79,6 +80,12 @@ type MetricsCollector interface {

// RecordCloudEventQueueStatus record the number of cloudevents that are waiting for emitting
RecordCloudEventQueueStatus(namespace string, value int)

// RecordHTTPClientRequest records the duration and outcome of an outbound HTTP request
// made by one of KEDA's internal HTTP clients. scaler, triggerName, metricName,
// namespace, and scaledResource are provided explicitly by the caller; upstream
// instrumentation may derive them from context before invoking the collector.
RecordHTTPClientRequest(durationSeconds float64, statusCode int, isError bool, scaler, triggerName, metricName, namespace, scaledResource string)
}

func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetrics bool) {
Expand Down Expand Up @@ -205,6 +212,22 @@ func RecordCloudEventQueueStatus(namespace string, value int) {
}
}

// RecordHTTPClientRequest records the duration and outcome of an outbound HTTP request
// made by one of KEDA's internal HTTP clients. Called by InstrumentedRoundTripper in
// the util package after extracting label values from the request context.
func RecordHTTPClientRequest(durationSeconds float64, statusCode int, isError bool, scaler, triggerName, metricName, namespace, scaledResource string) {
for _, element := range collectors {
element.RecordHTTPClientRequest(durationSeconds, statusCode, isError, scaler, triggerName, metricName, namespace, scaledResource)
}
}

func httpStatusCodeLabel(code int, isError bool) string {
if isError {
return "error"
}
return strconv.Itoa(code)
}

// Returns the ServerMetrics object for GRPC Server metrics. Used to initialize the GRPC server with the proper intercepts
// Currently, only Prometheus metrics are supported.
func GetServerMetrics() *grpcprom.ServerMetrics {
Expand Down
Loading
Loading