Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions framework/configstore/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6944,3 +6944,170 @@ func (s *RDBConfigStore) RotateOAuth2RefreshToken(ctx context.Context, oldID str
return nil
})
}

// GetOAuth2RefreshTokenByHashAny returns a refresh token row including revoked
// ones. Used for stolen-token detection: if a revoked token is presented we can
// identify its family and revoke all descendants (RFC 9700 §2.2.2).
func (s *RDBConfigStore) GetOAuth2RefreshTokenByHashAny(ctx context.Context, hash string) (*tables.TableOAuth2RefreshToken, error) {
var rt tables.TableOAuth2RefreshToken
err := s.DB().WithContext(ctx).Where("token_hash = ?", hash).First(&rt).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get oauth2 refresh token (any): %w", err)
}
return &rt, nil
}

// RevokeOAuth2RefreshTokensByFamilyID revokes all active refresh tokens sharing
// the same family ID. Called when a revoked token is re-presented, indicating
// the token family has been compromised (RFC 9700 §2.2.2).
func (s *RDBConfigStore) RevokeOAuth2RefreshTokensByFamilyID(ctx context.Context, familyID string) error {
now := time.Now()
return s.DB().WithContext(ctx).
Model(&tables.TableOAuth2RefreshToken{}).
Where("family_id = ? AND revoked_at IS NULL", familyID).
Update("revoked_at", &now).Error
}

// RevokeOAuth2RefreshTokensByMode revokes all active refresh tokens for a given
// bf_mode. Used to invalidate session-mode grants when EnforceAuthOnInference
// is toggled on, or to bulk-revoke all grants of a specific type.
func (s *RDBConfigStore) RevokeOAuth2RefreshTokensByMode(ctx context.Context, bfMode string) error {
now := time.Now()
return s.DB().WithContext(ctx).
Model(&tables.TableOAuth2RefreshToken{}).
Where("bf_mode = ? AND revoked_at IS NULL", bfMode).
Update("revoked_at", &now).Error
}

// SweepOAuth2RefreshTokens deletes revoked refresh tokens older than the given
// duration. Active tokens are never swept — only revoked ones that are past
// their retention window.
func (s *RDBConfigStore) SweepOAuth2RefreshTokens(ctx context.Context, revokedOlderThan time.Duration) (int64, error) {
// A non-positive retention would put the cutoff at (or after) now, deleting
// nearly every revoked token — including the rows kept for stolen-token replay
// detection. Treat it as "don't sweep" rather than wiping the retention window.
if revokedOlderThan <= 0 {
return 0, nil
}
cutoff := time.Now().Add(-revokedOlderThan)
result := s.DB().WithContext(ctx).
Where("revoked_at IS NOT NULL AND revoked_at < ?", cutoff).
Delete(&tables.TableOAuth2RefreshToken{})
return result.RowsAffected, result.Error
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// SweepOrphanedOAuth2Clients deletes dynamically-registered clients that no
// longer back any refresh token and were registered before the grace cutoff.
//
// Clients are minted per OAuth flow via Dynamic Client Registration, so they
// accumulate unbounded without this. A client is safe to drop once it owns no
// refresh token rows at all: either it never completed a flow (abandoned
// registration) or every grant it issued has since been revoked and aged out.
// This must run after the revoked-token sweep so a client whose tokens are
// still within their retention window — and thus still needed for refresh-token
// reuse detection — keeps its rows and is not collected prematurely.
//
// The grace cutoff protects a client mid-handshake (authorization code issued
// but not yet exchanged for tokens), which legitimately owns no tokens yet;
// registeredOlderThan must exceed the authorization code TTL.
func (s *RDBConfigStore) SweepOrphanedOAuth2Clients(ctx context.Context, registeredOlderThan time.Duration) (int64, error) {
cutoff := time.Now().Add(-registeredOlderThan)
result := s.DB().WithContext(ctx).
Where("created_at < ? AND NOT EXISTS (?)", cutoff,
s.DB().Model(&tables.TableOAuth2RefreshToken{}).
Select("1").
Where("oauth2_refresh_tokens.client_id = oauth2_clients.client_id")).
Delete(&tables.TableOAuth2Client{})
return result.RowsAffected, result.Error
}

// OAuth2SessionRow is the wire shape for a single downstream grant in the
// Connected Clients list.
type OAuth2SessionRow struct {
ID string `json:"id"`
ClientID string `json:"client_id"`
ClientName string `json:"client_name,omitempty"`
BfMode string `json:"bf_mode"`
BfSub string `json:"bf_sub"`
BfSubDisplay string `json:"bf_sub_display,omitempty"` // human-readable: VK name for vk mode
Scope string `json:"scope"`
CreatedAt time.Time `json:"created_at"`
LastUsedAt *time.Time `json:"last_used_at,omitempty"`
}

// ListOAuth2Sessions returns active (non-revoked) refresh token rows, joined
// with their client names from oauth2_clients and VK names from
// governance_virtual_keys for vk-mode grants. Uses ScopedDB so callers can
// inject row-visibility predicates via the context.
func (s *RDBConfigStore) ListOAuth2Sessions(ctx context.Context) ([]OAuth2SessionRow, error) {
rows := []struct {
tables.TableOAuth2RefreshToken
ClientName string `gorm:"column:client_name"`
VKName string `gorm:"column:vk_name"`
}{}
err := s.ScopedDB(ctx).
Table("oauth2_refresh_tokens rt").
Select("rt.*, c.client_name, vk.name as vk_name").
Joins("LEFT JOIN oauth2_clients c ON c.client_id = rt.client_id").
Joins("LEFT JOIN governance_virtual_keys vk ON vk.id = rt.bf_sub AND rt.bf_mode = 'vk'").
Where("rt.revoked_at IS NULL").
Order("rt.created_at DESC").
Scan(&rows).Error
if err != nil {
return nil, fmt.Errorf("list oauth2 sessions: %w", err)
}
out := make([]OAuth2SessionRow, 0, len(rows))
for _, r := range rows {
row := OAuth2SessionRow{
ID: r.ID,
ClientID: r.ClientID,
ClientName: r.ClientName,
BfMode: r.BfMode,
BfSub: r.BfSub,
Scope: r.Scope,
CreatedAt: r.CreatedAt,
LastUsedAt: r.LastUsedAt,
}
// Populate human-readable display name for VK mode.
if r.BfMode == "vk" && r.VKName != "" {
row.BfSubDisplay = r.VKName
}
out = append(out, row)
}
return out, nil
}

// RevokeOAuth2Session revokes a specific refresh token by ID (for use from the
// Connected Clients UI). Returns ErrNotFound when the ID does not exist.
func (s *RDBConfigStore) RevokeOAuth2Session(ctx context.Context, id string) error {
now := time.Now()
result := s.DB().WithContext(ctx).
Model(&tables.TableOAuth2RefreshToken{}).
Where("id = ? AND revoked_at IS NULL", id).
Update("revoked_at", &now)
if result.Error != nil {
return fmt.Errorf("revoke oauth2 session: %w", result.Error)
}
if result.RowsAffected == 0 {
return ErrNotFound
}
return nil
}

// GetOAuth2SessionByID returns a single active refresh token row by ID.
// Uses ScopedDB so row-visibility predicates injected into the context apply.
// Returns ErrNotFound when the ID does not exist or is already revoked.
func (s *RDBConfigStore) GetOAuth2SessionByID(ctx context.Context, id string) (*tables.TableOAuth2RefreshToken, error) {
var rt tables.TableOAuth2RefreshToken
err := s.ScopedDB(ctx).Where("id = ? AND revoked_at IS NULL", id).First(&rt).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("get oauth2 session: %w", err)
}
return &rt, nil
}
21 changes: 21 additions & 0 deletions framework/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,12 +675,33 @@ type ConfigStore interface {

// OAuth2 refresh tokens
GetOAuth2RefreshTokenByHash(ctx context.Context, hash string) (*tables.TableOAuth2RefreshToken, error)
// GetOAuth2RefreshTokenByHashAny returns the row including revoked tokens,
// used to detect token reuse attacks and trigger family revocation.
GetOAuth2RefreshTokenByHashAny(ctx context.Context, hash string) (*tables.TableOAuth2RefreshToken, error)
// ConsumeOAuth2AuthorizeRequest atomically marks the authorize request as
// code_issued and creates the refresh token — if either fails the client can retry.
ConsumeOAuth2AuthorizeRequest(ctx context.Context, requestID string, rt *tables.TableOAuth2RefreshToken) error
// RotateOAuth2RefreshToken atomically revokes the old token and creates the
// new one — if either fails the old token stays active and the client can retry.
RotateOAuth2RefreshToken(ctx context.Context, oldID string, newRT *tables.TableOAuth2RefreshToken) error
// RevokeOAuth2RefreshTokensByFamilyID revokes all active tokens in a family
// when a stolen-token reuse is detected (RFC 9700 §2.2.2).
RevokeOAuth2RefreshTokensByFamilyID(ctx context.Context, familyID string) error
// RevokeOAuth2RefreshTokensByMode revokes all active tokens for a given mode.
RevokeOAuth2RefreshTokensByMode(ctx context.Context, bfMode string) error
// SweepOAuth2RefreshTokens deletes revoked tokens older than the given duration.
SweepOAuth2RefreshTokens(ctx context.Context, revokedOlderThan time.Duration) (int64, error)
// SweepOrphanedOAuth2Clients deletes registered clients that back no refresh
// token and were registered before the grace cutoff. Run after the refresh
// token sweep so clients are not collected while their tokens are still
// retained for reuse detection.
SweepOrphanedOAuth2Clients(ctx context.Context, registeredOlderThan time.Duration) (int64, error)
// ListOAuth2Sessions returns active downstream grants for the Connected Clients UI.
ListOAuth2Sessions(ctx context.Context) ([]OAuth2SessionRow, error)
// GetOAuth2SessionByID returns a single active grant row for permission checks.
GetOAuth2SessionByID(ctx context.Context, id string) (*tables.TableOAuth2RefreshToken, error)
// RevokeOAuth2Session revokes a specific downstream grant by refresh token ID.
RevokeOAuth2Session(ctx context.Context, id string) error

// Cleanup
Close(ctx context.Context) error
Expand Down
10 changes: 9 additions & 1 deletion framework/mcp_headers/sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type CredentialSweepWorker struct {
expiredFlowEvery time.Duration
stopCh chan struct{}
stopOnce sync.Once
cancel context.CancelFunc
logger schemas.Logger
}

Expand All @@ -57,7 +58,9 @@ func NewCredentialSweepWorker(provider *Provider, orphanRetention time.Duration,

// Start begins the sweep worker in a background goroutine.
func (w *CredentialSweepWorker) Start(ctx context.Context) {
go w.run(ctx)
runCtx, cancel := context.WithCancel(ctx)
w.cancel = cancel
go w.run(runCtx)
if w.logger != nil {
w.logger.Info("Per-user headers sweep worker started (orphan=%s, retention=%s, expired_flow=%s)",
w.orphanSweepEvery, w.orphanRetention, w.expiredFlowEvery)
Expand All @@ -68,6 +71,11 @@ func (w *CredentialSweepWorker) Start(ctx context.Context) {
// panics from redundant shutdown paths.
func (w *CredentialSweepWorker) Stop() {
w.stopOnce.Do(func() {
// Cancel any in-flight sweep so a blocked DB call unwinds promptly,
// then signal run() to exit its ticker loop.
if w.cancel != nil {
w.cancel()
}
close(w.stopCh)
if w.logger != nil {
w.logger.Info("Per-user headers credential sweep worker stopped")
Expand Down
20 changes: 18 additions & 2 deletions framework/oauth2/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type TokenRefreshWorker struct {
lookAheadWindow time.Duration // How far ahead to look for expiring tokens
stopCh chan struct{}
stopOnce sync.Once
cancel context.CancelFunc
logger schemas.Logger
}

Expand All @@ -35,7 +36,9 @@ func NewTokenRefreshWorker(provider *OAuth2Provider, logger schemas.Logger) *Tok

// Start begins the token refresh worker in a background goroutine
func (w *TokenRefreshWorker) Start(ctx context.Context) {
go w.run(ctx)
runCtx, cancel := context.WithCancel(ctx)
w.cancel = cancel
go w.run(runCtx)
if w.logger != nil {
w.logger.Info("Token refresh worker started")
}
Expand All @@ -46,6 +49,11 @@ func (w *TokenRefreshWorker) Start(ctx context.Context) {
// can't panic by re-closing the channel.
func (w *TokenRefreshWorker) Stop() {
w.stopOnce.Do(func() {
// Cancel any in-flight refresh so a blocked DB call unwinds promptly,
// then signal run() to exit its ticker loop.
if w.cancel != nil {
w.cancel()
}
close(w.stopCh)
if w.logger != nil {
w.logger.Info("Token refresh worker stopped")
Expand Down Expand Up @@ -154,6 +162,7 @@ type PerUserOAuthSweepWorker struct {
orphanRetention time.Duration
stopCh chan struct{}
stopOnce sync.Once
cancel context.CancelFunc
logger schemas.Logger
}

Expand All @@ -178,7 +187,9 @@ func NewPerUserOAuthSweepWorker(provider *OAuth2Provider, orphanRetention time.D

// Start begins the sweep worker in a background goroutine.
func (w *PerUserOAuthSweepWorker) Start(ctx context.Context) {
go w.run(ctx)
runCtx, cancel := context.WithCancel(ctx)
w.cancel = cancel
go w.run(runCtx)
if w.logger != nil {
w.logger.Info("Per-user OAuth sweep worker started (flow=%s, orphan=%s, retention=%s)",
w.flowSweepEvery, w.orphanSweepEvery, w.orphanRetention)
Expand All @@ -189,6 +200,11 @@ func (w *PerUserOAuthSweepWorker) Start(ctx context.Context) {
// panics when called from multiple shutdown paths.
func (w *PerUserOAuthSweepWorker) Stop() {
w.stopOnce.Do(func() {
// Cancel any in-flight sweep so a blocked DB call unwinds promptly,
// then signal run() to exit its ticker loop.
if w.cancel != nil {
w.cancel()
}
close(w.stopCh)
if w.logger != nil {
w.logger.Info("Per-user OAuth sweep worker stopped")
Expand Down
10 changes: 9 additions & 1 deletion framework/temptoken/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type SweepWorker struct {
sweepInterval time.Duration
stopCh chan struct{}
stopOnce sync.Once
cancel context.CancelFunc
logger schemas.Logger
}

Expand All @@ -44,7 +45,9 @@ func NewSweepWorker(service *Service, logger schemas.Logger) *SweepWorker {

// Start begins the sweep loop in a background goroutine.
func (w *SweepWorker) Start(ctx context.Context) {
go w.run(ctx)
runCtx, cancel := context.WithCancel(ctx)
w.cancel = cancel
go w.run(runCtx)
if w.logger != nil {
w.logger.Info("temp-token sweep worker started (interval=%s)", w.sweepInterval)
}
Expand All @@ -54,6 +57,11 @@ func (w *SweepWorker) Start(ctx context.Context) {
// panics from redundant shutdown paths.
func (w *SweepWorker) Stop() {
w.stopOnce.Do(func() {
// Cancel any in-flight sweep so a blocked DB call unwinds promptly,
// then signal run() to exit its ticker loop.
if w.cancel != nil {
w.cancel()
}
close(w.stopCh)
if w.logger != nil {
w.logger.Info("temp-token sweep worker stopped")
Expand Down
42 changes: 39 additions & 3 deletions transports/bifrost-http/handlers/oauth2_issuance.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,31 @@ func (h *OAuth2IssuanceHandler) handleTokenRefresh(ctx *fasthttp.RequestCtx) {

tokenHash := hashSHA256Hex(refreshToken)
rt, err := h.store.ConfigStore.GetOAuth2RefreshTokenByHash(ctx, tokenHash)
if err != nil || rt == nil {
if errors.Is(err, configstore.ErrNotFound) {
sendOAuthError(ctx, fasthttp.StatusBadRequest, "invalid_grant", "refresh token not found or revoked")
if errors.Is(err, configstore.ErrNotFound) {
// Token not found in the active set — check if it was previously issued
// and revoked. A revoked token being re-presented indicates the token
// family may be compromised (stolen token used before rotation). Revoke
// all active tokens in the family to limit the damage (RFC 9700 §2.2.2).
// Fail closed: if the lookup or the family revocation errors, surface
// server_error rather than reporting invalid_grant while leaving a
// potentially compromised family usable.
revoked, lookupErr := h.store.ConfigStore.GetOAuth2RefreshTokenByHashAny(ctx, tokenHash)
switch {
case lookupErr != nil && !errors.Is(lookupErr, configstore.ErrNotFound):
sendOAuthError(ctx, fasthttp.StatusInternalServerError, "server_error", "failed to verify refresh token revocation state")
return
case revoked != nil:
if revokeErr := h.store.ConfigStore.RevokeOAuth2RefreshTokensByFamilyID(ctx, revoked.FamilyID); revokeErr != nil {
sendOAuthError(ctx, fasthttp.StatusInternalServerError, "server_error", "failed to revoke refresh token family")
return
}
}
// Unknown token or a revoked one we just contained — either way the grant
// is not usable.
sendOAuthError(ctx, fasthttp.StatusBadRequest, "invalid_grant", "refresh token not found or revoked")
return
}
if err != nil {
sendOAuthError(ctx, fasthttp.StatusInternalServerError, "server_error", "failed to look up refresh token")
return
}
Expand All @@ -349,6 +369,22 @@ func (h *OAuth2IssuanceHandler) handleTokenRefresh(ctx *fasthttp.RequestCtx) {
return
}

// bf_sub liveness check: for VK-mode tokens, verify the VK still exists
// and is active. A deleted or disabled VK should not be able to silently
// obtain new access tokens via refresh. A transient lookup failure must stay
// retriable (server_error) — only a missing/inactive VK invalidates the grant.
if schemas.MCPAuthMode(rt.BfMode) == schemas.MCPAuthModeVK && h.store.ConfigStore != nil {
vk, vkErr := h.store.ConfigStore.GetVirtualKey(ctx, rt.BfSub)
if vkErr != nil && !errors.Is(vkErr, configstore.ErrNotFound) {
sendOAuthError(ctx, fasthttp.StatusInternalServerError, "server_error", "failed to verify virtual key")
return
}
if errors.Is(vkErr, configstore.ErrNotFound) || vk == nil || !vk.IsActiveValue() {
sendOAuthError(ctx, fasthttp.StatusBadRequest, "invalid_grant", "virtual key is no longer active")
return
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

// RFC 8707: resource (audience URI) is distinct from scope. When the client
// omits it on refresh, carry forward the original resource captured at
// authorization — never substitute the scope string. When the client does
Expand Down
Loading
Loading