Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/alb.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,31 @@ The User Router does not rotate through or fanout to a Pool of Backends like the

You can configure a User Router ALB's backend destinations to be other ALBs with mechanisms that utilize healthchecked pools.

## Bounding Per-Member Response Captures

ALB mechanisms that fan out (TSM, FR, FGR, NLM) buffer each pool member's response in memory before merging or selecting a winner. Without a cap, one misbehaving upstream returning an oversized body can OOM the proxy -- an N-way fanout multiplies that by N.

Trickster applies a default cap of **256 MiB** per response. A member whose body exceeds the cap is treated as a partial failure: the merged response carries an `X-Trickster-Result: phit` marker and the `trickster_alb_fanout_failures_total{mechanism, reason="truncated"}` metric increments.

Override the cap at the backend or ALB level:

```yaml
backends:
default:
max_capture_bytes: 67108864 # 64 MiB, applies to all backends (Prometheus, ClickHouse, ALB members, etc.)

prom-alb-tsm:
provider: alb
alb:
mechanism: tsm
max_capture_bytes: 16777216 # 16 MiB, ALB-specific override
pool:
- prom01
- prom02
```

The ALB-level value takes precedence over the backend-level value, which in turn takes precedence over the 256 MiB default.

## Maintaining Healthy Pools With Automated Health Check Integrations

Health Checks are configured per-Backend as described in the [Health documentation](./health.md). Each Backend's health checker will notify all ALB pools of which it is a member when its health status changes, so long as it has been configured with a [health check interval](./health#example+health+check+configuration+for+use+in+alb) for automated checking. When an ALB is notified that the state of a pool member has changed, the ALB will reconstruct its list of healthy pool members before serving the next request.
Expand Down
10 changes: 10 additions & 0 deletions examples/conf/example.full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ backends:
# # max_object_size_bytes defines the largest byte size an object may be before it is uncacheable due to size. default is 524288 (512k)
# max_object_size_bytes: 524288

# # max_capture_bytes caps the per-response in-memory capture buffer used by ALB fanout and Prometheus
# # transform/label handlers. A member whose body exceeds the cap is treated as a partial failure
# # (X-Trickster-Result: phit) rather than truncating the merged response silently. default is 268435456 (256 MiB)
# max_capture_bytes: 268435456

# # These next 7 settings only apply to Time Series backends

# # backfill_tolerance prevents new datapoints that fall within the tolerance window (relative to time.Now) from being permanently
Expand Down Expand Up @@ -557,6 +562,11 @@ backends:
# # -1 includes all backends, regardless of reporting state
# # default is 0
# healthy_floor: 0

# # max_capture_bytes overrides the backend-level max_capture_bytes for this ALB's fanout members.
# # Set this when the ALB's expected response shape differs from the backend default. When 0 (the
# # default), the parent Backend's max_capture_bytes is used, falling back to 268435456 (256 MiB).
# max_capture_bytes: 16777216 # 16 MiB
# fgr: # First Good Response mechanism options, only applicable when mechanism is set to fgr
# # status_codes is a list of status codes considered 'good' when using the fgr mechanism
# # when this is not set, any response code < 400 is considered good. Use this setting to
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.uber.org/goleak v1.3.0
golang.org/x/crypto v0.50.0
golang.org/x/net v0.53.0
golang.org/x/sync v0.20.0
Expand Down Expand Up @@ -148,6 +149,7 @@ require (
github.com/kkHAIKE/contextcheck v1.1.6 // indirect
github.com/kulti/thelper v0.7.1 // indirect
github.com/kunwardeep/paralleltest v1.0.15 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lasiar/canonicalheader v1.1.2 // indirect
github.com/ldez/exptostd v0.4.5 // indirect
github.com/ldez/gomoddirectives v0.8.0 // indirect
Expand Down
67 changes: 67 additions & 0 deletions integration/alb_tsm_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ func TestALB_TSM_Scale(t *testing.T) {
t.Logf("%d bytes, %s", len(body), hdr.Get("X-Trickster-Result"))
})

t.Run("bad_encoding_advertised_as_gzip", func(t *testing.T) {
// Upstream claims gzip but sends plaintext; DecompressResponseBody
// fails. The merge surfaces the partial-failure marker but the
// remaining 49 backends still produce a successful merged response.
resetAll()
fakes[0].setBehavior(behaviorBadEncoding())
pr, hdr := queryTricksterProm(t, listenAddr, backendName, "/api/v1/query_range", rangeParams())
require.Equal(t, "success", pr.Status)
var qd promQueryData
require.NoError(t, json.Unmarshal(pr.Data, &qd))
var series []json.RawMessage
require.NoError(t, json.Unmarshal(qd.Result, &series))
require.NotEmpty(t, series)
t.Logf("%d series, %s", len(series), hdr.Get("X-Trickster-Result"))
})

t.Run("truncating_upstream_does_not_poison_cache", func(t *testing.T) {
resetAll()
params := rangeParams()
Expand Down Expand Up @@ -367,6 +383,49 @@ func TestALB_TSM_Scale(t *testing.T) {
}
})

t.Run("post_concurrent_clients_safe", func(t *testing.T) {
// POST query_range fans out N clones of the same parent request. The
// body must be re-readable per clone or the upstreams see truncated
// requests. Race the request body cache under -race to catch any
// unsynchronized r.Body mutation in the clone path.
resetAll()
form := rangeParams().Encode()
u := "http://" + listenAddr + "/" + backendName + "/api/v1/query_range"
const clients = 25
var wg sync.WaitGroup
errCh := make(chan error, clients)
for range clients {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := http.Post(u, "application/x-www-form-urlencoded", strings.NewReader(form))
if err != nil {
errCh <- err
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
errCh <- fmt.Errorf("status %d", resp.StatusCode)
return
}
body, _ := io.ReadAll(resp.Body)
var pr promResponse
if err := json.Unmarshal(body, &pr); err != nil {
errCh <- fmt.Errorf("invalid json: %w", err)
return
}
if pr.Status != "success" {
errCh <- fmt.Errorf("non-success: %s", pr.Status)
}
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
t.Fatal(err)
}
})

t.Run("error_body_passthrough", func(t *testing.T) {
resetAll()
for _, f := range fakes {
Expand Down Expand Up @@ -528,6 +587,7 @@ func behaviorBadShape() *promBehavior { return &promBehavior{mode:
func behaviorTruncate() *promBehavior { return &promBehavior{mode: "truncate"} }
func behaviorSlow(d time.Duration) *promBehavior { return &promBehavior{mode: "ok", delay: d} }
func behaviorErrJSON() *promBehavior { return &promBehavior{mode: "errjson"} }
func behaviorBadEncoding() *promBehavior { return &promBehavior{mode: "badencoding"} }
func behaviorLabelValuesKB(kb int) *promBehavior {
return &promBehavior{mode: "labelvalues", seriesKB: kb}
}
Expand Down Expand Up @@ -587,6 +647,13 @@ func (f *fakeProm) handleRange(w http.ResponseWriter, _ *http.Request) {
_ = conn.Close()
}
return
case "badencoding":
// Advertise gzip but send plaintext. The TSM gather path's
// DecompressResponseBody will fail, surfacing as a partial-failure.
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Encoding", "gzip")
_, _ = w.Write(buildMatrixBody(f.label))
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write(buildMatrixBody(f.label))
Expand Down
3 changes: 3 additions & 0 deletions pkg/backends/alb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func NewClient(name string, o *bo.Options, router http.Handler,
}
c.Backend = b
if o != nil && o.ALBOptions != nil {
if o.ALBOptions.MaxCaptureBytes == 0 {
o.ALBOptions.MaxCaptureBytes = o.MaxCaptureBytes
}
m, err := registry.New(o.ALBOptions.MechanismName,
o.ALBOptions, factories)
if err != nil {
Expand Down
211 changes: 211 additions & 0 deletions pkg/backends/alb/mech/fanout/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright 2018 The Trickster Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package fanout is the shared primitive for ALB mechanisms that scatter a
// single inbound request to N pool members and gather their responses. It
// centralizes the bug-prone parts of fanout that each mechanism otherwise
// re-implements: body priming, context propagation, capture-buffer bounding,
// panic recovery, concurrency limiting, and metric attribution.
//
// Mechanisms that wait for all members and decide afterwards (NLM, TSM) call
// All. Mechanisms that race for a first-good response (FR) own their own
// goroutine orchestration but use PrepareClone for the per-goroutine setup.
package fanout

import (
"context"
"net/http"

"github.com/trickstercache/trickster/v2/pkg/backends/alb/mech"
"github.com/trickstercache/trickster/v2/pkg/backends/alb/pool"
"github.com/trickstercache/trickster/v2/pkg/observability/logging"
"github.com/trickstercache/trickster/v2/pkg/observability/logging/logger"
"github.com/trickstercache/trickster/v2/pkg/observability/metrics"
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
"golang.org/x/sync/errgroup"
)

// Result holds one pool member's outcome from a fanout call. Results are
// returned slot-indexed: Result[i] corresponds to targets[i] from the
// original Targets slice. Slots whose target was nil have Failed == true.
type Result struct {
// Index is the slot position in the original Targets slice.
Index int
// Request is the cloned request handed to the member's handler.
// Mechanism code can read its Resources (e.g. TSM's MergeFunc /
// MergeRespondFunc / TS) in post-gather inspection or via OnResult.
Request *http.Request
// Capture is the bounded response writer that captured the member's
// reply. Nil if the slot failed before serving.
Capture *capture.CaptureResponseWriter
// Failed is true when the slot did not produce a usable response.
// Reasons: clone error, panic in the member's handler, or capture
// truncation (the upstream exceeded MaxCaptureBytes). Mechanism code
// uses this to surface partial-failure signals.
Failed bool
// Err carries a clone or transport error, if any. A recovered panic
// is reflected only in Failed; the panic value is logged + metered
// inside the fanout goroutine.
Err error
}

// Config configures one fanout call.
type Config struct {
// Mechanism is the short name ("fr", "nlm", "tsm", "tsm/avg-sum", ...)
// used in panic recovery logs and the fanout_failures_total metric.
Mechanism string
Comment thread
crandles marked this conversation as resolved.
Outdated
// ConcurrencyLimit caps in-flight member calls. 0 means unlimited.
ConcurrencyLimit int
// MaxCaptureBytes caps each member's response body capture. 0 uses
// capture.DefaultMaxBytes.
MaxCaptureBytes int
// Resources, if non-nil, returns the Resources to attach to each
// cloned request before the member's handler sees it. Nil resources
// is a valid return value.
Resources func(idx int) *request.Resources
// Context, if non-nil, transforms the parent context.Context before
// each clone receives it. NLM uses this to call tctx.ClearResources;
// most mechanisms can leave it nil.
Context func(parent context.Context) context.Context
// OnResult, if non-nil, is called inside the fanout goroutine after
// the member's handler returns and before the goroutine exits. Use
// this for per-slot side effects that should run in parallel with
// other in-flight members (e.g. TSM merges into a shared accumulator).
// OnResult must be safe for concurrent invocation. The supplied
// Result is the same one that will appear in the All return slice.
OnResult func(idx int, r *Result)
}

// All scatters parent to every target and gathers slot-ordered Results.
//
// The caller MUST have primed parent's body (via PrimeBody or equivalent)
// before invoking All; otherwise concurrent CloneWithoutResources calls
// will race on r.Body / rsc.RequestBody for POST/PUT/PATCH requests.
//
// Each clone is given a per-slot Resources value (if Config.Resources is
// set), a derived context (Config.Context if set; ctx otherwise), and a
// capture.CaptureResponseWriter bounded by Config.MaxCaptureBytes.
//
// A panic in any member's handler is recovered, logged, and counted via
// the trickster_alb_fanout_failures_total metric; the slot's Failed field
// is set so the caller can surface partial-failure to its response.
//
// All returns when every spawned goroutine has finished. The returned
// slice has len(targets) entries; results[i].Index == i. The error is the
// first non-nil error from any goroutine (typically a clone failure); per-
// slot errors are also recorded in results[i].Err. The primitive logs +
// meters every failure regardless; callers can use the returned error to
// propagate through their own errgroup, render a fatal response, etc.
func All(ctx context.Context, parent *http.Request, targets pool.Targets, cfg Config) ([]Result, error) {
l := len(targets)
results := make([]Result, l)
if l == 0 {
return results, nil
}

var eg errgroup.Group
if cfg.ConcurrencyLimit > 0 {
eg.SetLimit(cfg.ConcurrencyLimit)
}

for i := range l {
if targets[i] == nil {
results[i] = Result{Index: i, Failed: true}
continue
}
eg.Go(func() error {
results[i].Index = i
defer mech.RecoverFanoutPanic(cfg.Mechanism, i, func() {
results[i].Failed = true
results[i].Capture = nil
})

r2, crw, err := PrepareClone(ctx, parent, i, cfg)
if err != nil {
results[i].Failed = true
results[i].Err = err
metrics.ALBFanoutFailures.WithLabelValues(cfg.Mechanism, "clone").Inc()
return err
}
results[i].Request = r2
results[i].Capture = crw

targets[i].Handler().ServeHTTP(crw, r2)
if crw.Truncated() {
results[i].Failed = true
metrics.ALBFanoutFailures.WithLabelValues(cfg.Mechanism, "truncated").Inc()
}
if cfg.OnResult != nil {
cfg.OnResult(i, &results[i])
}
return nil
})
}

err := eg.Wait()
if err != nil {
logger.Warn("alb fanout gather failure", logging.Pairs{
"mech": cfg.Mechanism, "error": err,
})
}
return results, err
}

// PrimeBody ensures parent has a Resources value and a cached body so
// fanout goroutines can clone it concurrently without racing on r.Body.
// Returns the (possibly new) request; callers must use the returned value
// as their parent for the subsequent fanout call. No-op for GET/HEAD.
func PrimeBody(parent *http.Request) (*http.Request, error) {
if request.GetResources(parent) == nil {
parent = request.SetResources(parent, &request.Resources{})
}
if _, err := request.GetBody(parent); err != nil {
return parent, err
}
return parent, nil
}

// PrepareClone produces one safe, capture-wrapped clone of parent suitable
// for handing to a pool member's handler. Mechanisms that own their own
// goroutine orchestration (FR) call this to avoid re-implementing the
// clone + ctx + resources + bounded capture sequence.
//
// PrepareClone does NOT prime parent's body; callers using their own
// goroutine pool must call request.GetBody on parent before spawning, or
// the per-clone GetBody calls will race on r.Body / rsc.RequestBody.
func PrepareClone(ctx context.Context, parent *http.Request, idx int, cfg Config) (*http.Request, *capture.CaptureResponseWriter, error) {
r2, err := request.CloneWithoutResources(parent)
if err != nil {
return nil, nil, err
}
cloneCtx := ctx
if cfg.Context != nil {
cloneCtx = cfg.Context(ctx)
}
r2 = r2.WithContext(cloneCtx)
if cfg.Resources != nil {
if rsc := cfg.Resources(idx); rsc != nil {
r2 = request.SetResources(r2, rsc)
}
}
maxBytes := cfg.MaxCaptureBytes
if maxBytes == 0 {
maxBytes = capture.DefaultMaxBytes
}
crw := capture.NewCaptureResponseWriterWithLimit(maxBytes)
return r2, crw, nil
}
Loading
Loading