feat(runtime): bridge user-input events and API to external GUI clients#2133
feat(runtime): bridge user-input events and API to external GUI clients#2133gaord wants to merge 5 commits into
Conversation
Add SSE event forwarding for UserInputRequired, REST endpoint for submitting user input responses, timeout protection for await_user_input, and fix interrupt_turn to clear active_turn immediately.
There was a problem hiding this comment.
Code Review
This pull request introduces a 300-second timeout for user input requests and adds a new API endpoint for submitting user input. It also updates the runtime thread manager to handle user input events and modifies turn interruption logic. Feedback suggests improving error handling consistency by using standard error mapping for missing threads and removing redundant turn finalization code that could cause duplicate events and data loss.
The monitor_turn loop already handles full turn finalization when the engine shuts down after cancellation, including saving turn status, usage, error, emitting turn.completed, and clearing active_turn. Having interrupt_turn also save turn status and emit turn.completed causes duplicate SSE events and loses usage/error data that monitor_turn would have captured from TurnComplete. Keep only the active_turn cleanup so the 409 error is resolved while monitor_turn remains the single source of truth for turn completion.
- Change 'not loaded' to 'not found' in submit_user_input and cancel_user_input so map_thread_err correctly maps to 404 - Use map_thread_err in submit_user_input API endpoint for consistent error response (404 for missing thread, 409 for conflict, etc.) instead of always returning 500
Clearing active_turn immediately breaks is_interrupt_requested detection in monitor_turn, causing turn status to be Completed instead of Interrupted. Let monitor_turn handle the cleanup after it detects the interrupt flag and performs full finalization with correct status, usage, and error.
|
@Hmbown anything to do with this? |
bceef27 to
b656453
Compare
| result = tokio::time::timeout(USER_INPUT_TIMEOUT, self.rx_user_input.recv()) => { | ||
| match result { | ||
| Ok(Some(decision)) => { | ||
| match decision { | ||
| UserInputDecision::Submitted { id, response } if id == tool_id => { | ||
| return Ok(response); | ||
| } | ||
| UserInputDecision::Cancelled { id } if id == tool_id => { | ||
| return Err(ToolError::execution_failed( | ||
| "User input cancelled".to_string(), | ||
| )); | ||
| } | ||
| _ => continue, | ||
| } | ||
| } | ||
| UserInputDecision::Cancelled { id } if id == tool_id => { | ||
| Ok(None) => { | ||
| return Err(ToolError::execution_failed( | ||
| "User input cancelled".to_string(), | ||
| "User input channel closed".to_string(), | ||
| )); | ||
| } | ||
| Err(_) => { | ||
| let _ = self | ||
| .tx_event | ||
| .send(Event::Status { | ||
| message: format!( | ||
| "User input timed out after {}s", | ||
| USER_INPUT_TIMEOUT.as_secs() | ||
| ), | ||
| }) | ||
| .await; | ||
| return Err(ToolError::execution_failed( | ||
| format!( | ||
| "User input timed out after {}s", | ||
| USER_INPUT_TIMEOUT.as_secs() | ||
| ), | ||
| )); | ||
| } | ||
| _ => continue, | ||
| } | ||
| } |
There was a problem hiding this comment.
Timeout resets on every mismatched decision
tokio::time::timeout(USER_INPUT_TIMEOUT, ...) is constructed fresh on each loop iteration. When a UserInputDecision arrives for a different tool_id (the _ => continue branch), the loop restarts and the 5-minute clock resets. This means the total wait is unbounded: any stream of unrelated decisions (e.g., a concurrent user-input tool call, or a stale channel message from a prior request) keeps deferring the timeout indefinitely, defeating the disconnected-GUI protection.
The fix is to start the sleep outside the loop so it counts down from a single point in time, then select on it as a separate arm.
| async fn submit_user_input( | ||
| State(state): State<RuntimeApiState>, | ||
| Path((thread_id, input_id)): Path<(String, String)>, | ||
| Json(req): Json<SubmitUserInputBody>, | ||
| ) -> Result<Json<SubmitUserInputResponse>, ApiError> { | ||
| use crate::tools::user_input::{UserInputAnswer, UserInputResponse}; | ||
| let answers: Vec<UserInputAnswer> = req | ||
| .answers | ||
| .into_iter() | ||
| .map(|a| UserInputAnswer { | ||
| id: a.id, | ||
| label: a.label, | ||
| value: a.value, | ||
| }) | ||
| .collect(); | ||
| let response = UserInputResponse { answers }; | ||
| let delivered = state | ||
| .runtime_threads | ||
| .submit_user_input(&thread_id, &input_id, response) | ||
| .await | ||
| .map_err(map_thread_err)?; | ||
| Ok(Json(SubmitUserInputResponse { | ||
| ok: true, | ||
| input_id, | ||
| delivered, | ||
| })) | ||
| } |
There was a problem hiding this comment.
Stale or wrong
input_id silently returns 200 OK
submit_user_input sends the decision onto the channel and always returns Ok(true). If a GUI POSTs with a wrong or already-expired input_id, the decision is silently discarded by the _ => continue branch in await_user_input — the caller gets {"ok":true,"delivered":true} even though no waiting tool received it.
This diverges from the approval flow, where deliver_external_approval checks whether a pending entry exists and returns false (surfaced as HTTP 404) when none is found. Without a similar pending-input registry, a GUI client has no way to detect that it POSTed too late or used the wrong ID, which can cause silent hangs on the GUI side.
| #[allow(dead_code)] | ||
| pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result<bool> { |
There was a problem hiding this comment.
cancel_user_input is dead code with no REST endpoint, meaning GUIs have no way to signal that the user dismissed the input dialog. This leaves the engine waiting the full 5 minutes on a cancelled interaction. Consider either exposing a DELETE /v1/user-input/{thread_id}/{input_id} endpoint or removing the #[allow(dead_code)] suppression until the endpoint is wired up.
| #[allow(dead_code)] | |
| pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result<bool> { | |
| pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result<bool> { |
| Ok(Json(SubmitUserInputResponse { | ||
| ok: true, | ||
| input_id, | ||
| delivered, | ||
| })) |
There was a problem hiding this comment.
The
delivered field is structurally always true here — submit_user_input either returns Ok(true) or propagates an error (in which case the handler already returned early). This misleads API consumers who may interpret it as confirmation that the answer was actually consumed by a waiting tool.
| Ok(Json(SubmitUserInputResponse { | |
| ok: true, | |
| input_id, | |
| delivered, | |
| })) | |
| Ok(Json(SubmitUserInputResponse { | |
| ok: true, | |
| input_id, | |
| delivered: true, | |
| })) |
|
Independent review (Devin): Tested merge against current What ships: Issues to fix before merge:
v0.8.48 (#2256) is a workspace consolidation (zero behavioral changes) and merges cleanly alongside this PR. |
Summary
The TUI engine already emits
EngineEvent::UserInputRequiredand theEnginehandle exposessubmit_user_input/cancel_user_input, but the runtime API layer (used by external GUI clients like VSCode extensions) was missing the plumbing to propagate these events or accept responses. This meantrequest_user_inputtool calls would hang indefinitely in GUI mode with no dialog appearing.Changes
1.
approval.rs— Timeout protection forawait_user_inputUSER_INPUT_TIMEOUT) around therx_user_input.recv()callStatusevent and returns aToolError2.
runtime_threads.rs— Event forwarding + manager methods + interrupt fixEngineEvent::UserInputRequiredin the event loop, emitting a"user_input.required"SSE event with the input ID and full request payload (questions, options)submit_user_input()andcancel_user_input()onRuntimeThreadManagerthat delegate to theEnginehandleinterrupt_turn()now immediately clearsactive_turnand emits"turn.completed"so the thread accepts new messages after/interrupt— this prevents persistent 409 "Thread already has an active turn" errors3.
runtime_api.rs— REST endpoint for user input submissionPOST /v1/user-input/{thread_id}/{input_id}endpoint{ "answers": [{ "id": "...", "label": "...", "value": "..." }] }RuntimeThreadManager::submit_user_input()Flow
Testing
cargo check -p codewhale-tuipassescargo clippy -p codewhale-tui --all-targetspasses (no new warnings)Related
approval.requiredSSE +POST /v1/approvals/{id}) that already works for tool authorization dialogsGreptile Summary
This PR wires the existing
request_user_inputtool through the runtime API layer so external GUI clients can display dialogs and submit responses, mirroring the existing approval flow. It also adds a 5-minute timeout toawait_user_inputto prevent a disconnected client from stalling the agent loop.approval.rs: Wrapsrx_user_input.recv()withtokio::time::timeout; however, the timeout future is re-constructed inside the loop body, so any mismatched decision resets the 5-minute clock rather than counting against a single deadline.runtime_api.rs: AddsPOST /v1/user-input/{thread_id}/{input_id}for GUI response delivery, but unlike the approval endpoint it always returns 200 OK — there is no 404 when theinput_idis stale or wrong, leaving callers unable to detect a failed delivery.runtime_threads.rs: ForwardsEngineEvent::UserInputRequiredas an SSE event and addssubmit_user_input/cancel_user_inputonRuntimeThreadManager; the cancel path is currently dead code with no REST endpoint.Confidence Score: 3/5
The core event-forwarding and SSE plumbing is straightforward, but two correctness issues in the new code paths could cause the timeout protection to be bypassed and leave GUI clients unable to detect failed deliveries.
The timeout in
await_user_inputis re-created on every loop iteration, so mismatched channel messages keep resetting the 5-minute guard — the disconnected-GUI protection the PR is meant to add can be effectively neutralized. The new REST endpoint also always responds withdelivered: trueregardless of whether theinput_idmatched a live request, making it impossible for a GUI to know it posted too late.approval.rs(timeout logic) andruntime_api.rs(delivery confirmation) need the most attention before this is merged.Important Files Changed
await_user_inputviatokio::time::timeout, but the timeout is re-created inside the loop, so any mismatched decision resets the clock and the guard can be bypassed indefinitely.POST /v1/user-input/{thread_id}/{input_id}endpoint; always returns 200 OK even for stale/wronginput_ids, diverging from the approval flow which returns 404 for unknown IDs.submit_user_input/cancel_user_inputmanager methods and SSE forwarding forEngineEvent::UserInputRequired;cancel_user_inputis dead code with no exposed endpoint.Sequence Diagram
sequenceDiagram participant Tool as Tool (request_user_input) participant Engine as Engine (approval.rs) participant RTM as RuntimeThreadManager participant SSE as SSE Stream participant GUI as External GUI Client participant API as Runtime API Tool->>Engine: await_user_input(tool_id, request) Engine->>SSE: "Event::UserInputRequired { id, request }" RTM->>SSE: emit "user_input.required" SSE event SSE->>GUI: "{ id, request: { questions, options } }" GUI->>API: "POST /v1/user-input/{thread_id}/{input_id}" API->>RTM: submit_user_input(thread_id, input_id, response) RTM->>Engine: engine.submit_user_input(input_id, response) Engine->>Engine: rx_user_input.recv() matches tool_id Engine-->>Tool: Ok(UserInputResponse) API-->>GUI: "{ ok: true, delivered: true }" Note over Engine: If GUI disconnects, timeout fires after 5 min Engine->>SSE: "Event::Status { timed out }" Engine-->>Tool: Err(ToolError: timed out)Reviews (1): Last reviewed commit: "style: fix rustfmt formatting for user-i..." | Re-trigger Greptile