diff --git a/crates/tui/src/core/engine/approval.rs b/crates/tui/src/core/engine/approval.rs index ac04900b4..b0f866cc1 100644 --- a/crates/tui/src/core/engine/approval.rs +++ b/crates/tui/src/core/engine/approval.rs @@ -5,10 +5,14 @@ //! or whenever a tool requests live user input (`await_user_input`). Channels //! and engine state stay private to the parent module. +use std::time::Duration; + use crate::core::events::Event; use crate::tools::spec::ToolError; use crate::tools::user_input::{UserInputRequest, UserInputResponse}; +const USER_INPUT_TIMEOUT: Duration = Duration::from_secs(300); + use super::Engine; #[derive(Debug, Clone)] @@ -123,22 +127,43 @@ impl Engine { format!("Request cancelled while awaiting user input{suffix}"), )); } - decision = self.rx_user_input.recv() => { - let Some(decision) = decision else { - return Err(ToolError::execution_failed( - "User input channel closed".to_string(), - )); - }; - match decision { - UserInputDecision::Submitted { id, response } if id == tool_id => { - return Ok(response); + result = tokio::time::timeout(USER_INPUT_TIMEOUT, self.rx_user_input.recv()) => { + match result { + Ok(Some(decision)) => { + match decision { + UserInputDecision::Submitted { id, response } if id == tool_id => { + return Ok(response); + } + UserInputDecision::Cancelled { id } if id == tool_id => { + return Err(ToolError::execution_failed( + "User input cancelled".to_string(), + )); + } + _ => continue, + } } - UserInputDecision::Cancelled { id } if id == tool_id => { + Ok(None) => { return Err(ToolError::execution_failed( - "User input cancelled".to_string(), + "User input channel closed".to_string(), + )); + } + Err(_) => { + let _ = self + .tx_event + .send(Event::Status { + message: format!( + "User input timed out after {}s", + USER_INPUT_TIMEOUT.as_secs() + ), + }) + .await; + return Err(ToolError::execution_failed( + format!( + "User input timed out after {}s", + USER_INPUT_TIMEOUT.as_secs() + ), )); } - _ => continue, } } } diff --git a/crates/tui/src/runtime_api.rs b/crates/tui/src/runtime_api.rs index 20110cc45..da316accd 100644 --- a/crates/tui/src/runtime_api.rs +++ b/crates/tui/src/runtime_api.rs @@ -296,6 +296,25 @@ struct DecideApprovalResponse { delivered: bool, } +#[derive(Debug, Deserialize)] +struct SubmitUserInputBody { + answers: Vec, +} + +#[derive(Debug, Deserialize)] +struct UserInputAnswerBody { + id: String, + label: String, + value: String, +} + +#[derive(Debug, Serialize)] +struct SubmitUserInputResponse { + ok: bool, + input_id: String, + delivered: bool, +} + #[derive(Debug, Serialize)] struct RuntimeInfoResponse { bind_host: String, @@ -500,6 +519,10 @@ pub fn build_router(state: RuntimeApiState) -> Router { .route("/v1/threads/{id}/compact", post(compact_thread)) .route("/v1/threads/{id}/events", get(stream_thread_events)) .route("/v1/approvals/{approval_id}", post(decide_approval)) + .route( + "/v1/user-input/{thread_id}/{input_id}", + post(submit_user_input), + ) .route("/v1/tasks", get(list_tasks).post(create_task)) .route("/v1/tasks/{id}", get(get_task)) .route("/v1/tasks/{id}/cancel", post(cancel_task)) @@ -984,6 +1007,34 @@ async fn decide_approval( })) } +async fn submit_user_input( + State(state): State, + Path((thread_id, input_id)): Path<(String, String)>, + Json(req): Json, +) -> Result, ApiError> { + use crate::tools::user_input::{UserInputAnswer, UserInputResponse}; + let answers: Vec = req + .answers + .into_iter() + .map(|a| UserInputAnswer { + id: a.id, + label: a.label, + value: a.value, + }) + .collect(); + let response = UserInputResponse { answers }; + let delivered = state + .runtime_threads + .submit_user_input(&thread_id, &input_id, response) + .await + .map_err(map_thread_err)?; + Ok(Json(SubmitUserInputResponse { + ok: true, + input_id, + delivered, + })) +} + async fn runtime_info(State(state): State) -> Json { Json(RuntimeInfoResponse { bind_host: state.bind_host.clone(), diff --git a/crates/tui/src/runtime_threads.rs b/crates/tui/src/runtime_threads.rs index 1a08473d6..1058dda77 100644 --- a/crates/tui/src/runtime_threads.rs +++ b/crates/tui/src/runtime_threads.rs @@ -833,6 +833,30 @@ impl RuntimeThreadManager { } } + pub async fn submit_user_input( + &self, + thread_id: &str, + input_id: &str, + response: crate::tools::user_input::UserInputResponse, + ) -> Result { + let active = self.active.lock().await; + let Some(state) = active.engines.get(thread_id) else { + bail!("thread '{thread_id}' not found"); + }; + state.engine.submit_user_input(input_id, response).await?; + Ok(true) + } + + #[allow(dead_code)] + pub async fn cancel_user_input(&self, thread_id: &str, input_id: &str) -> Result { + let active = self.active.lock().await; + let Some(state) = active.engines.get(thread_id) else { + bail!("thread '{thread_id}' not found"); + }; + state.engine.cancel_user_input(input_id).await?; + Ok(true) + } + #[allow(dead_code)] pub fn pending_approvals_count(&self) -> usize { self.pending_approvals @@ -2782,6 +2806,19 @@ impl RuntimeThreadManager { } } } + EngineEvent::UserInputRequired { id, request } => { + self.emit_event( + &thread_id, + Some(&turn_id), + None, + "user_input.required", + json!({ + "id": id, + "request": request, + }), + ) + .await?; + } EngineEvent::Status { message } => { let item = TurnItemRecord { schema_version: CURRENT_RUNTIME_SCHEMA_VERSION,