Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/resin/app_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ func (a *resinApp) buildNetworkServers(engine *state.StateEngine) error {
ProxyToken: a.envCfg.ProxyToken,
Router: a.topoRuntime.router,
Pool: a.topoRuntime.pool,
PlatformLookup: a.topoRuntime.pool,
Health: a.topoRuntime.pool,
Events: proxyEvents,
MetricsSink: a.metricsManager,
Expand Down
9 changes: 7 additions & 2 deletions internal/api/handler_requestlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/Resinat/Resin/internal/proxy"
"github.com/Resinat/Resin/internal/requestlog"
)

Expand Down Expand Up @@ -309,8 +310,10 @@ type logListItem struct {
RespBodyLen int `json:"resp_body_len"`
ReqHeadersTruncated bool `json:"req_headers_truncated"`
ReqBodyTruncated bool `json:"req_body_truncated"`
RespHeadersTruncated bool `json:"resp_headers_truncated"`
RespBodyTruncated bool `json:"resp_body_truncated"`
RespHeadersTruncated bool `json:"resp_headers_truncated"`
RespBodyTruncated bool `json:"resp_body_truncated"`
RetryAttempts int `json:"retry_attempts"`
RetryDetails []proxy.RetryDetail `json:"retry_details"`
}

func toLogListItem(s requestlog.LogSummary) logListItem {
Expand Down Expand Up @@ -347,6 +350,8 @@ func toLogListItem(s requestlog.LogSummary) logListItem {
ReqBodyTruncated: s.ReqBodyTruncated,
RespHeadersTruncated: s.RespHeadersTruncated,
RespBodyTruncated: s.RespBodyTruncated,
RetryAttempts: s.RetryAttempts,
RetryDetails: s.RetryDetails,
}
}

Expand Down
5 changes: 5 additions & 0 deletions internal/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type EnvConfig struct {
DefaultPlatformReverseProxyEmptyAccountBehavior string
DefaultPlatformReverseProxyFixedAccountHeader string
DefaultPlatformAllocationPolicy string
DefaultPlatformMaxRetries int
ProbeTimeout time.Duration
ResourceFetchTimeout time.Duration
ProxyTransportMaxIdleConns int
Expand Down Expand Up @@ -107,6 +108,7 @@ func LoadEnvConfig() (*EnvConfig, error) {
"RESIN_DEFAULT_PLATFORM_ALLOCATION_POLICY",
string(platform.AllocationPolicyBalanced),
)
cfg.DefaultPlatformMaxRetries = envInt("RESIN_DEFAULT_PLATFORM_MAX_RETRIES", 0, &errs)
cfg.ProbeTimeout = envDuration("RESIN_PROBE_TIMEOUT", 15*time.Second, &errs)
cfg.ResourceFetchTimeout = envDuration("RESIN_RESOURCE_FETCH_TIMEOUT", 30*time.Second, &errs)
cfg.ProxyTransportMaxIdleConns = envInt("RESIN_PROXY_TRANSPORT_MAX_IDLE_CONNS", 1024, &errs)
Expand Down Expand Up @@ -225,6 +227,9 @@ func LoadEnvConfig() (*EnvConfig, error) {
platform.AllocationPolicyPreferIdleIP,
))
}
if cfg.DefaultPlatformMaxRetries < 0 {
errs = append(errs, "RESIN_DEFAULT_PLATFORM_MAX_RETRIES must be >= 0")
}
if cfg.ProbeTimeout <= 0 {
errs = append(errs, "RESIN_PROBE_TIMEOUT must be positive")
}
Expand Down
1 change: 1 addition & 0 deletions internal/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Platform struct {
ReverseProxyEmptyAccountBehavior string `json:"reverse_proxy_empty_account_behavior"`
ReverseProxyFixedAccountHeader string `json:"reverse_proxy_fixed_account_header"`
AllocationPolicy string `json:"allocation_policy"`
MaxRetries int `json:"max_retries"`
UpdatedAtNs int64 `json:"updated_at_ns"`
}

Expand Down
3 changes: 3 additions & 0 deletions internal/platform/model_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewConfiguredPlatform(
emptyAccountBehavior string,
fixedAccountHeader string,
allocationPolicy string,
maxRetries int,
) *Platform {
normalizedFixedHeaders, fixedHeaders, err := NormalizeFixedAccountHeaders(fixedAccountHeader)
if err != nil {
Expand All @@ -61,6 +62,7 @@ func NewConfiguredPlatform(
plat.ReverseProxyFixedAccountHeader = normalizedFixedHeaders
plat.ReverseProxyFixedAccountHeaders = append([]string(nil), fixedHeaders...)
plat.AllocationPolicy = ParseAllocationPolicy(allocationPolicy)
plat.MaxRetries = maxRetries
return plat
}

Expand Down Expand Up @@ -116,5 +118,6 @@ func BuildFromModel(mp model.Platform) (*Platform, error) {
emptyAccountBehavior,
fixedHeader,
mp.AllocationPolicy,
mp.MaxRetries,
), nil
}
1 change: 1 addition & 0 deletions internal/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Platform struct {
ReverseProxyFixedAccountHeader string
ReverseProxyFixedAccountHeaders []string
AllocationPolicy AllocationPolicy
MaxRetries int

// Routable view & its lock.
// viewMu serializes both FullRebuild and NotifyDirty.
Expand Down
12 changes: 11 additions & 1 deletion internal/proxy/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ type RequestFinishedEvent struct {
DurationNs int64
}

// RetryDetail records the outcome of a single failed retry attempt.
type RetryDetail struct {
NodeHash string `json:"node_hash"`
NodeTag string `json:"node_tag"`
ErrKind string `json:"err_kind"`
ErrMsg string `json:"err_msg"`
}

// RequestLogEntry captures per-request details for the structured request log.
// Used by the requestlog subsystem (Phase 8).
type RequestLogEntry struct {
Expand All @@ -63,7 +71,9 @@ type RequestLogEntry struct {
UpstreamErrKind string // normalized error family
UpstreamErrno string // normalized errno, when available
UpstreamErrMsg string // sanitized upstream error message
IngressBytes int64 // bytes from upstream to client (header + body)
RetryAttempts int // number of upstream retry attempts (0 = no retry)
RetryDetails []RetryDetail // per-attempt failure details (len == RetryAttempts)
IngressBytes int64 // bytes from upstream to client (header + body)
EgressBytes int64 // bytes from client to upstream (header + body)

// Optional detail payload (mainly for reverse proxy request logging).
Expand Down
215 changes: 135 additions & 80 deletions internal/proxy/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type ForwardProxyConfig struct {
ProxyToken string
Router *routing.Router
Pool outbound.PoolAccessor
PlatformLookup PlatformLookup
Health HealthRecorder
Events EventEmitter
MetricsSink MetricsEventSink
Expand All @@ -37,6 +38,7 @@ type ForwardProxy struct {
token string
router *routing.Router
pool outbound.PoolAccessor
platLook PlatformLookup
health HealthRecorder
events EventEmitter
metricsSink MetricsEventSink
Expand All @@ -60,6 +62,7 @@ func NewForwardProxy(cfg ForwardProxyConfig) *ForwardProxy {
token: cfg.ProxyToken,
router: cfg.Router,
pool: cfg.Pool,
platLook: cfg.PlatformLookup,
health: cfg.Health,
events: ev,
metricsSink: cfg.MetricsSink,
Expand Down Expand Up @@ -240,71 +243,105 @@ func (p *ForwardProxy) handleHTTP(w http.ResponseWriter, r *http.Request) {

lifecycle := newRequestLifecycle(p.events, r, ProxyTypeForward, false)
lifecycle.setTarget(r.Host, r.URL.String())
defer lifecycle.finish()
lifecycle.setAccount(account)

routed, routeErr := resolveRoutedOutbound(p.router, p.pool, platName, account, r.Host)
if routeErr != nil {
lifecycle.setProxyError(routeErr)
lifecycle.setHTTPStatus(routeErr.HTTPCode)
writeProxyError(w, routeErr)
return
}
lifecycle.setRouteResult(routed.Route)
go p.health.RecordLatency(routed.Route.NodeHash, netutil.ExtractDomain(r.Host), nil)
maxRetries := platformMaxRetries(p.platLook, platName)
var finalAttempt int
defer func() {
lifecycle.setRetryAttempts(finalAttempt)
lifecycle.finish()
}()

transport := p.outboundHTTPTransport(routed)
outReq := prepareForwardOutboundRequest(r)
lifecycle.addEgressBytes(headerWireLen(outReq.Header))
var egressBodyCounter *countingReadCloser
if outReq.Body != nil && outReq.Body != http.NoBody {
egressBodyCounter = newCountingReadCloser(outReq.Body)
outReq.Body = egressBodyCounter
// Buffer request body upfront so it can be replayed on retry.
var bodyBytes []byte
if maxRetries > 0 && r.Body != nil && r.Body != http.NoBody {
var err error
bodyBytes, err = io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
lifecycle.setProxyError(ErrInternalError)
lifecycle.setHTTPStatus(ErrInternalError.HTTPCode)
writeProxyError(w, ErrInternalError)
return
}
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
r.ContentLength = int64(len(bodyBytes))
}

// Forward the request.
resp, err := transport.RoundTrip(outReq)
if egressBodyCounter != nil {
lifecycle.addEgressBytes(egressBodyCounter.Total())
}
if err != nil {
proxyErr := classifyUpstreamError(err)
if proxyErr == nil {
// context.Canceled — skip health recording, close silently.
// Request ended due to client-side cancellation before upstream
// response; treat as net-ok in request log semantics.
lifecycle.setNetOK(true)
for attempt := 0; ; attempt++ {
finalAttempt = attempt
routed, routeErr := resolveRoutedOutbound(p.router, p.pool, platName, account, r.Host)
if routeErr != nil {
lifecycle.setProxyError(routeErr)
lifecycle.setHTTPStatus(routeErr.HTTPCode)
writeProxyError(w, routeErr)
return
}
lifecycle.setProxyError(proxyErr)
lifecycle.setUpstreamError("forward_roundtrip", err)
lifecycle.setHTTPStatus(proxyErr.HTTPCode)
go p.health.RecordResult(routed.Route.NodeHash, false)
writeProxyError(w, proxyErr)
return
}
defer resp.Body.Close()

lifecycle.setHTTPStatus(resp.StatusCode)
lifecycle.setNetOK(true)

// Copy end-to-end response headers and body.
lifecycle.addIngressBytes(copyEndToEndHeaders(w.Header(), resp.Header))
w.WriteHeader(resp.StatusCode)
copiedBytes, copyErr := io.Copy(w, resp.Body)
lifecycle.addIngressBytes(copiedBytes)
if copyErr != nil {
if shouldRecordForwardCopyFailure(r, copyErr) {
lifecycle.setProxyError(ErrUpstreamRequestFailed)
lifecycle.setUpstreamError("forward_upstream_to_client_copy", copyErr)
lifecycle.setNetOK(false)
lifecycle.setRouteResult(routed.Route)
go p.health.RecordLatency(routed.Route.NodeHash, netutil.ExtractDomain(r.Host), nil)

transport := p.outboundHTTPTransport(routed)
outReq := prepareForwardOutboundRequest(r)
if attempt > 0 && bodyBytes != nil {
outReq.Body = io.NopCloser(bytes.NewReader(bodyBytes))
outReq.ContentLength = int64(len(bodyBytes))
}
lifecycle.addEgressBytes(headerWireLen(outReq.Header))
var egressBodyCounter *countingReadCloser
if outReq.Body != nil && outReq.Body != http.NoBody {
egressBodyCounter = newCountingReadCloser(outReq.Body)
outReq.Body = egressBodyCounter
}

resp, err := transport.RoundTrip(outReq)
if egressBodyCounter != nil {
lifecycle.addEgressBytes(egressBodyCounter.Total())
}
if err != nil {
proxyErr := classifyUpstreamError(err)
if proxyErr == nil {
lifecycle.setNetOK(true)
return
}
go p.health.RecordResult(routed.Route.NodeHash, false)

if attempt < maxRetries && isRetryableUpstreamError(proxyErr) {
detail := summarizeUpstreamError(err)
lifecycle.addRetryDetail(routed.Route.NodeHash.Hex(), routed.Route.NodeTag, detail.Kind, detail.Message)
if bodyBytes != nil {
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
continue
}

lifecycle.setProxyError(proxyErr)
lifecycle.setUpstreamError("forward_roundtrip", err)
lifecycle.setHTTPStatus(proxyErr.HTTPCode)
writeProxyError(w, proxyErr)
return
}
defer resp.Body.Close()

lifecycle.setHTTPStatus(resp.StatusCode)
lifecycle.setNetOK(true)

lifecycle.addIngressBytes(copyEndToEndHeaders(w.Header(), resp.Header))
w.WriteHeader(resp.StatusCode)
copiedBytes, copyErr := io.Copy(w, resp.Body)
lifecycle.addIngressBytes(copiedBytes)
if copyErr != nil {
if shouldRecordForwardCopyFailure(r, copyErr) {
lifecycle.setProxyError(ErrUpstreamRequestFailed)
lifecycle.setUpstreamError("forward_upstream_to_client_copy", copyErr)
lifecycle.setNetOK(false)
go p.health.RecordResult(routed.Route.NodeHash, false)
}
return
}

go p.health.RecordResult(routed.Route.NodeHash, true)
return
}

// Full body transfer succeeded — count as network success even for 5xx HTTP.
go p.health.RecordResult(routed.Route.NodeHash, true)
}

func (p *ForwardProxy) handleCONNECT(w http.ResponseWriter, r *http.Request) {
Expand All @@ -317,39 +354,57 @@ func (p *ForwardProxy) handleCONNECT(w http.ResponseWriter, r *http.Request) {

lifecycle := newRequestLifecycle(p.events, r, ProxyTypeForward, true)
lifecycle.setTarget(target, "")
defer lifecycle.finish()
lifecycle.setAccount(account)

routed, routeErr := resolveRoutedOutbound(p.router, p.pool, platName, account, target)
if routeErr != nil {
lifecycle.setProxyError(routeErr)
lifecycle.setHTTPStatus(routeErr.HTTPCode)
writeProxyError(w, routeErr)
return
}
lifecycle.setRouteResult(routed.Route)

// Wrap the dialed connection with tlsLatencyConn for passive TLS latency.
maxRetries := platformMaxRetries(p.platLook, platName)
domain := netutil.ExtractDomain(target)
nodeHashRaw := routed.Route.NodeHash
go p.health.RecordLatency(nodeHashRaw, domain, nil)
var finalAttempt int
defer func() {
lifecycle.setRetryAttempts(finalAttempt)
lifecycle.finish()
}()

rawConn, err := routed.Outbound.DialContext(r.Context(), "tcp", M.ParseSocksaddr(target))
if err != nil {
proxyErr := classifyConnectError(err)
if proxyErr == nil {
// context.Canceled before CONNECT response — no health penalty,
// but mark log as net-ok.
lifecycle.setNetOK(true)
var routed routedOutbound
var rawConn net.Conn
for attempt := 0; ; attempt++ {
finalAttempt = attempt
var routeErr *ProxyError
routed, routeErr = resolveRoutedOutbound(p.router, p.pool, platName, account, target)
if routeErr != nil {
lifecycle.setProxyError(routeErr)
lifecycle.setHTTPStatus(routeErr.HTTPCode)
writeProxyError(w, routeErr)
return
}
lifecycle.setProxyError(proxyErr)
lifecycle.setUpstreamError("connect_dial", err)
lifecycle.setHTTPStatus(proxyErr.HTTPCode)
go p.health.RecordResult(nodeHashRaw, false)
writeProxyError(w, proxyErr)
return
lifecycle.setRouteResult(routed.Route)
go p.health.RecordLatency(routed.Route.NodeHash, domain, nil)

var err error
rawConn, err = routed.Outbound.DialContext(r.Context(), "tcp", M.ParseSocksaddr(target))
if err != nil {
proxyErr := classifyConnectError(err)
if proxyErr == nil {
lifecycle.setNetOK(true)
return
}
go p.health.RecordResult(routed.Route.NodeHash, false)

if attempt < maxRetries && isRetryableUpstreamError(proxyErr) {
detail := summarizeUpstreamError(err)
lifecycle.addRetryDetail(routed.Route.NodeHash.Hex(), routed.Route.NodeTag, detail.Kind, detail.Message)
continue
}

lifecycle.setProxyError(proxyErr)
lifecycle.setUpstreamError("connect_dial", err)
lifecycle.setHTTPStatus(proxyErr.HTTPCode)
writeProxyError(w, proxyErr)
return
}
break
}

nodeHashRaw := routed.Route.NodeHash
recordConnectResult := func(ok bool) {
lifecycle.setNetOK(ok)
go p.health.RecordResult(nodeHashRaw, ok)
Expand Down
Loading