Skip to content

Commit f6a7599

Browse files
committed
changes from review
1 parent cd63100 commit f6a7599

8 files changed

Lines changed: 512 additions & 88 deletions

File tree

cmd/thv-operator/controllers/virtualmcpserver_deployment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func (r *VirtualMCPServerReconciler) deploymentForVirtualMCPServer(
197197
vmcpLivenessInitialDelay, vmcpLivenessPeriod, vmcpLivenessTimeout, vmcpLivenessFailures,
198198
),
199199
ReadinessProbe: ctrlutil.BuildHealthProbe(
200-
"/health", "http",
200+
"/readyz", "http",
201201
vmcpReadinessInitialDelay, vmcpReadinessPeriod, vmcpReadinessTimeout, vmcpReadinessFailures,
202202
),
203203
SecurityContext: containerSecurityContext,

pkg/vmcp/server/server.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sync"
2222
"time"
2323

24+
"github.com/mark3labs/mcp-go/mcp"
2425
"github.com/mark3labs/mcp-go/server"
2526

2627
"github.com/stacklok/toolhive/pkg/audit"
@@ -491,6 +492,24 @@ func New(
491492
srv.handleSessionRegistration(ctx, session)
492493
})
493494

495+
// Register OnBeforeListTools hook for lazy session tool injection.
496+
//
497+
// When a session is reconstructed from Redis on a different pod (cross-pod sharing),
498+
// the SDK's per-session tool store is empty because OnRegisterSession only fires
499+
// during Initialize, which the client doesn't re-send to pod B. This hook lazily
500+
// injects the tools from the VMCP session manager into the ephemeral SDK session
501+
// before handleListTools reads from the per-session tool store.
502+
hooks.AddBeforeListTools(func(ctx context.Context, _ any, _ *mcp.ListToolsRequest) {
503+
srv.lazyInjectSessionTools(ctx)
504+
})
505+
506+
// Register OnBeforeCallTool hook for the same reason as OnBeforeListTools.
507+
// A client may call a tool directly without first calling tools/list, so we
508+
// also need to ensure the tool handlers are registered before the call is routed.
509+
hooks.AddBeforeCallTool(func(ctx context.Context, _ any, _ *mcp.CallToolRequest) {
510+
srv.lazyInjectSessionTools(ctx)
511+
})
512+
494513
// Disarm the close-on-error guard: Server is fully constructed.
495514
closeStorageOnErr = false
496515
return srv, nil
@@ -1008,6 +1027,40 @@ func setSessionToolsDirect(session server.ClientSession, tools []server.ServerTo
10081027
return nil
10091028
}
10101029

1030+
// lazyInjectSessionTools injects tools into the SDK ephemeral session for sessions
1031+
// that were reconstructed from Redis on a different pod (cross-pod session sharing).
1032+
//
1033+
// When a client connects to pod B with an existing session ID (established on pod A),
1034+
// the SDK creates an ephemeral session with no tools because OnRegisterSession only fires
1035+
// during Initialize, which the client doesn't re-send to pod B. This method is called
1036+
// from OnBeforeListTools and OnBeforeCallTool hooks to lazily inject the tools before
1037+
// the SDK handler reads from the per-session tool store.
1038+
//
1039+
// For sessions initialized on this pod (normal case), tools are already in the store
1040+
// (set by setSessionToolsDirect during OnRegisterSession); this method is a no-op.
1041+
func (s *Server) lazyInjectSessionTools(ctx context.Context) {
1042+
sess := server.ClientSessionFromContext(ctx)
1043+
if sess == nil {
1044+
return
1045+
}
1046+
sessionWithTools, ok := sess.(server.SessionWithTools)
1047+
if !ok {
1048+
return
1049+
}
1050+
if len(sessionWithTools.GetSessionTools()) > 0 {
1051+
return // tools already registered (normal pod-local case)
1052+
}
1053+
sessionID := sess.SessionID()
1054+
adaptedTools, err := s.vmcpSessionMgr.GetAdaptedTools(sessionID)
1055+
if err != nil || len(adaptedTools) == 0 {
1056+
slog.Debug("lazyInjectSessionTools: no tools available for session", "session_id", sessionID)
1057+
return
1058+
}
1059+
if err := setSessionToolsDirect(sess, adaptedTools); err != nil {
1060+
slog.Warn("lazyInjectSessionTools: failed to inject tools", "session_id", sessionID, "error", err)
1061+
}
1062+
}
1063+
10111064
// handleSessionRegistration processes a new MCP session registration.
10121065
// It fires AFTER the session is registered in the SDK.
10131066
func (s *Server) handleSessionRegistration(

pkg/vmcp/server/sessionmanager/horizontal_scaling_integration_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,18 +271,16 @@ func TestHorizontalScaling_AllBackendsFailOnRestore(t *testing.T) {
271271
// Use a stoppable backend so we can shut it down mid-test.
272272
backend, stopBackend := startStoppableMCPBackend(t, "backend-alpha", "echo")
273273

274-
sm := newTestManagerWithSharedStorage(t, storage, []*vmcp.Backend{backend})
275-
sessionID := createSession(t, sm, nil)
274+
smWriter := newTestManagerWithSharedStorage(t, storage, []*vmcp.Backend{backend})
275+
sessionID := createSession(t, smWriter, nil)
276276

277277
// Stop the backend — RestoreSession will be unable to reconnect.
278278
stopBackend()
279279

280-
// Evict from the local cache so the next Get takes the restore path.
281-
sm.sessions.Delete(sessionID)
282-
283-
// GetMultiSession must return a session (not false/nil) even though the
284-
// backend is unreachable — the session comes back with an empty tool list.
285-
sess, ok := sm.GetMultiSession(sessionID)
280+
// Use a fresh manager: its cache is empty, so GetMultiSession takes the
281+
// restore path without needing to explicitly evict the session.
282+
smReader := newTestManagerWithSharedStorage(t, storage, []*vmcp.Backend{backend})
283+
sess, ok := smReader.GetMultiSession(sessionID)
286284
require.True(t, ok, "GetMultiSession must return ok=true even when backends are unreachable")
287285
require.NotNil(t, sess)
288286
assert.Empty(t, sess.Tools(), "routing table must be empty when no backend reconnected")
@@ -321,7 +319,8 @@ func TestHorizontalScaling_BackendExpiry_SkipsExpiredOnRestore(t *testing.T) {
321319
require.True(t, toolNames["tool-alpha"], "session A must have tool-alpha before expiry")
322320
require.True(t, toolNames["tool-beta"], "session A must have tool-beta before expiry")
323321

324-
// NotifyBackendExpired updates Redis to remove backend-beta and evicts from pod A's cache.
322+
// NotifyBackendExpired updates Redis to remove backend-beta; the node-local cache
323+
// entry is evicted lazily on the next GetMultiSession when checkSession detects drift.
325324
smA.NotifyBackendExpired(sessionID, backendB.ID)
326325

327326
// Pod C: fresh Manager, same storage and both backends in registry.

pkg/vmcp/server/sessionmanager/session_manager_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ func TestSessionManager_Terminate(t *testing.T) {
610610
}).Times(1)
611611

612612
registry := newFakeRegistry()
613-
sm, storage := newTestSessionManager(t, factory, registry)
613+
sm, _ := newTestSessionManager(t, factory, registry)
614614

615615
sessionID := sm.Generate()
616616
require.NotEmpty(t, sessionID)
@@ -619,10 +619,12 @@ func TestSessionManager_Terminate(t *testing.T) {
619619
require.NoError(t, err)
620620
require.NotNil(t, createdSess)
621621

622-
// Seed MetadataKeyTokenHash so Terminate takes Phase 2 (storage.Delete + cache Remove).
623-
require.NoError(t, storage.Upsert(context.Background(), sessionID, tokenHashMeta))
622+
// CreateSession already persists tokenHashMeta via sess.GetMetadata(),
623+
// so Terminate will take the Phase 2 path (storage.Delete) without
624+
// any additional seeding.
624625

625-
// Terminate deletes from storage and removes from cache; onEvict fires Close().
626+
// Terminate deletes from storage; the cache entry is evicted lazily on
627+
// the next GetMultiSession call when checkSession detects ErrSessionNotFound.
626628
isNotAllowed, err := sm.Terminate(sessionID)
627629
require.NoError(t, err)
628630
assert.False(t, isNotAllowed)

test/e2e/thv-operator/virtualmcp/helpers.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ import (
3434
"github.com/stacklok/toolhive/test/e2e/thv-operator/testutil"
3535
)
3636

37+
// Shared test constants used across all e2e test files in this package.
38+
const (
39+
defaultNamespace = "default"
40+
e2eTimeout = 5 * time.Minute
41+
e2ePollInterval = 2 * time.Second
42+
)
43+
3744
// WaitForVirtualMCPServerReady waits for a VirtualMCPServer to reach Ready status
3845
// and ensures at least one associated pod is actually running and ready.
3946
// This is used when waiting for a single expected pod (e.g., one replica deployment).
@@ -828,7 +835,7 @@ func GetVMCPNodePort(
828835
}
829836

830837
// Verify the HTTP server is ready to handle requests
831-
if err := checkHTTPHealthReady(nodePort, 2*time.Second); err != nil {
838+
if err := checkHTTPHealthReady(nodePort); err != nil {
832839
return fmt.Errorf("nodePort %d accessible but HTTP server not ready: %w", nodePort, err)
833840
}
834841

@@ -847,25 +854,21 @@ func checkPortAccessible(nodePort int32, timeout time.Duration) error {
847854
if err != nil {
848855
return fmt.Errorf("port %d not accessible: %w", nodePort, err)
849856
}
850-
// Port is accessible - close connection (ignore errors as port accessibility is confirmed)
851857
_ = conn.Close()
852858
return nil
853859
}
854860

855861
// checkHTTPHealthReady verifies the HTTP server is ready by checking the /health endpoint.
856862
// This is more reliable than just TCP check as it ensures the application is serving requests.
857-
func checkHTTPHealthReady(nodePort int32, timeout time.Duration) error {
858-
httpClient := &http.Client{Timeout: timeout}
863+
func checkHTTPHealthReady(nodePort int32) error {
864+
httpClient := &http.Client{Timeout: 2 * time.Second}
859865
url := fmt.Sprintf("http://localhost:%d/health", nodePort)
860866

861867
resp, err := httpClient.Get(url)
862868
if err != nil {
863869
return fmt.Errorf("health check failed for port %d: %w", nodePort, err)
864870
}
865-
defer func() {
866-
// Error ignored in test cleanup
867-
_ = resp.Body.Close()
868-
}()
871+
defer func() { _ = resp.Body.Close() }()
869872

870873
if resp.StatusCode != http.StatusOK {
871874
return fmt.Errorf("health check returned status %d for port %d", resp.StatusCode, nodePort)

0 commit comments

Comments
 (0)