diff --git a/app/src/ai/agent_sdk/driver.rs b/app/src/ai/agent_sdk/driver.rs index 3a6c89a417..e737d13084 100644 --- a/app/src/ai/agent_sdk/driver.rs +++ b/app/src/ai/agent_sdk/driver.rs @@ -112,6 +112,13 @@ const SETUP_FAILED_IDLE_TIMEOUT: Duration = Duration::from_secs(120); /// If no follow-up status arrives within this window, the driver terminates with the /// original error so the CLI does not hang indefinitely. const AUTO_RESUME_TIMEOUT: Duration = Duration::from_secs(120); +/// Default upper bound for the waiting watchdog (QUALITY-780 client TECH §4) +/// when the server-supplied `idle_timeout_seconds` on a `wait_for_events` +/// tool call is unset (i.e. `0`). 30 minutes; chosen to roughly mirror the +/// existing server-side `VMIdleTimeoutMinutes` tenant default so the client +/// watchdog and the worker-side safety-net stay in the same ballpark when +/// neither is configured. +pub(super) const DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS: i32 = 30 * 60; /// Signals to Claude child-harness hooks that Warp already owns the background /// message-listener lifecycle, so the plugin should reuse the shared state /// files instead of spawning and cleaning up its own listener. @@ -201,6 +208,85 @@ impl IdleTimeoutSender { } } +/// Information about the most recent unresolved `wait_for_events` tool call +/// on a conversation that's currently in `ConversationStatus::WaitingForEvents`. +/// +/// The driver's waiting watchdog (QUALITY-780 client TECH §4) needs three +/// fields from the call: the `tool_call_id` (so the watchdog can emit a +/// matching `WaitForEventsResult` tool-call-result on fire), the `task_id` +/// that owns the message (so the result is dispatched against the active +/// task), and the server-supplied `idle_timeout_seconds` (so the watchdog +/// can schedule itself with the same upper bound the server used to extend +/// the task's idle timeout at yield time). +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) struct UnresolvedWaitForEventsCall { + pub(super) tool_call_id: String, + pub(super) task_id: String, + /// Raw value from the proto's `Message::ToolCall::WaitForEvents` + /// payload. Treated as "unset" when `0` per the prost convention for + /// flattened scalars; callers should fall back to + /// [`DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS`] in that case. + pub(super) idle_timeout_seconds: i32, +} + +/// Scan the conversation's linearized messages for the most recent +/// **unresolved** `wait_for_events` tool call. +/// +/// A call is unresolved if no later `Message::ToolCallResult` matches its +/// `tool_call_id`. This mirrors the detection-site predicate that owns +/// `mark_/clear_conversation_waiting_for_events` (client TECH §8.1), but +/// avoids coupling the driver to that internal storage: the driver scans the +/// public message log itself, which is the same source of truth +/// `apply_client_actions` operates against. +/// +/// Returns `None` when no `wait_for_events` call has been seen, or when the +/// most-recent one has already been resolved (the conversation is no longer +/// in the waiting state and the watchdog should not be scheduled). +pub(super) fn find_unresolved_wait_for_events_call( + conversation: &crate::ai::agent::conversation::AIConversation, +) -> Option { + use warp_multi_agent_api::message::tool_call::Tool; + use warp_multi_agent_api::message::Message; + + let mut resolved_tool_call_ids: HashSet = HashSet::new(); + let mut latest_unresolved: Option = None; + + for message in conversation.all_linearized_messages() { + match message.message.as_ref() { + Some(Message::ToolCallResult(result)) => { + resolved_tool_call_ids.insert(result.tool_call_id.clone()); + } + Some(Message::ToolCall(tool_call)) => { + if let Some(Tool::WaitForEvents(payload)) = tool_call.tool.as_ref() { + latest_unresolved = Some(UnresolvedWaitForEventsCall { + tool_call_id: tool_call.tool_call_id.clone(), + task_id: message.task_id.clone(), + idle_timeout_seconds: payload.idle_timeout_seconds, + }); + } + } + _ => {} + } + } + + latest_unresolved.filter(|call| !resolved_tool_call_ids.contains(&call.tool_call_id)) +} + +/// Resolve the watchdog timeout duration for an unresolved `wait_for_events` +/// call. Honors the server-supplied `idle_timeout_seconds` when set; +/// otherwise falls back to [`DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS`]. +/// +/// Negative values are clamped to the default for safety: a negative timeout +/// would otherwise underflow the `u64` conversion at the call site. +pub(super) fn watchdog_timeout_for_call(call: &UnresolvedWaitForEventsCall) -> Duration { + let seconds = if call.idle_timeout_seconds <= 0 { + DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS + } else { + call.idle_timeout_seconds + }; + Duration::from_secs(seconds as u64) +} + /// How to resume an existing conversation when starting an agent run. /// /// The Oz harness restores the full conversation transcript into the terminal pane and treats @@ -2437,6 +2523,109 @@ impl AgentDriver { Ok(()) } + /// Schedule the local waiting watchdog for an unresolved + /// `wait_for_events` tool call (QUALITY-780 client TECH §4). + /// + /// On fire — if `watchdog_generation` still equals `expected_gen` — + /// invoke [`Self::emit_wait_for_events_timeout_result`] to close out the + /// pending tool call with an empty `WaitForEventsResult` against the + /// active task. The fire path explicitly does **not** resolve + /// `run_exit`: a waiting run continues; the next agent turn observes + /// the synthetic result and decides how to proceed. + fn schedule_waiting_watchdog( + &self, + conversation_id: AIConversationId, + call: UnresolvedWaitForEventsCall, + timeout: Duration, + expected_gen: usize, + watchdog_generation: Arc, + ctx: &mut ModelContext, + ) { + ctx.spawn( + async move { + warpui::r#async::Timer::after(timeout).await; + (expected_gen, call) + }, + move |me, (fired_gen, fired_call), ctx| { + // Generation check: if the watchdog has been superseded by + // either a resume (status → InProgress) or a fresh + // WaitingForEvents re-entry, our generation no longer + // matches and we must not fire. + if watchdog_generation.load(Ordering::SeqCst) != fired_gen { + log::debug!( + "Ambient agent idle lifecycle: event=waiting_watchdog_superseded task_id={:?} expected_gen={fired_gen}", + me.task_id + ); + return; + } + me.emit_wait_for_events_timeout_result(conversation_id, fired_call, ctx); + }, + ); + } + + /// Emit a synthetic `WaitForEventsResult` tool-call result on watchdog + /// fire (QUALITY-780 client TECH §4 and PRODUCT.md (12)/(13)). + /// + /// The result references the stored `tool_call_id` of the unresolved + /// `wait_for_events` call. Per PRODUCT.md (12)–(13), this MUST NOT + /// cancel the run — the agent's next turn sees the empty timeout + /// result and decides how to proceed (commonly `finish_task`, but the + /// agent may also re-yield, ask the user, or take other action). + /// + /// Implementation note (Wave 2 boundary): + /// + /// The full upstream emission path (proto `Message::ToolCallResult` + /// carrying `WaitForEventsResult{}`, sent as a new tool-call-result + /// input on the active task via `BlocklistAIController` so the server + /// echoes it back through the response stream and `apply_client_actions` + /// triggers `clear_conversation_waiting_for_events`) depends on the + /// `AIAgentActionResultType::WaitForEvents(WaitForEventsResult{})` + /// variant + matching `convert_to.rs` proto encoding, both of which + /// live outside this agent's owned files and are introduced by + /// `client-detection`'s Wave 2 work. This method therefore logs the + /// fire event with the stored `tool_call_id` and `task_id`; the actual + /// upstream emission is wired up at Wave 3 integration time once + /// `client-detection`'s variant is in place. Crucially, this method + /// does NOT resolve `run_exit` and the watchdog scheduling itself is + /// fully tested; that's the lifecycle invariant PRODUCT.md (11) cares + /// about. + fn emit_wait_for_events_timeout_result( + &self, + conversation_id: AIConversationId, + call: UnresolvedWaitForEventsCall, + _ctx: &mut ModelContext, + ) { + log::info!( + "Ambient agent waiting watchdog fired: event=wait_for_events_timeout_emitted task_id={:?} conversation_id={conversation_id} tool_call_id={tool_call_id} task_id_for_call={task_id_for_call}", + self.task_id, + tool_call_id = call.tool_call_id, + task_id_for_call = call.task_id, + ); + // TODO(integration, QUALITY-780): once `client-detection` lands + // `AIAgentActionResultType::WaitForEvents(WaitForEventsResult{})` + // and the `BlocklistAIController::submit_wait_for_events_timeout( + // conversation_id, tool_call_id, ctx, + // )` helper, replace this no-op with: + // + // let ai_controller = self + // .terminal_driver + // .as_ref(_ctx) + // .terminal_view() + // .as_ref(_ctx) + // .ai_controller() + // .clone(); + // ai_controller.update(_ctx, |controller, ctx| { + // controller.submit_wait_for_events_timeout( + // conversation_id, + // call.tool_call_id.clone(), + // ctx, + // ); + // }); + // + // The run continues regardless; we do not resolve `run_exit`. + let _ = (conversation_id, call); + } + /// Execute an AI run in the terminal session and wait for it to complete. /// /// Conversation output is streamed as it's available. @@ -2449,6 +2638,18 @@ impl AgentDriver { let (tx, rx) = oneshot::channel(); let run_exit = IdleTimeoutSender::new(tx); + // Generation counter for the waiting watchdog (QUALITY-780 client + // TECH §4). Bumped each time the conversation enters + // `WaitingForEvents` (scheduling a fresh watchdog) and again when the + // conversation resumes back to `InProgress` (cancelling the pending + // watchdog by superseding its generation). This mirrors the + // `IdleTimeoutSender` generation-counter pattern but is decoupled from + // `run_exit` because the watchdog must not resolve `run_exit` on + // fire: a waiting run continues; the agent itself processes the + // synthetic `WaitForEventsResult` and decides whether to finish, re- + // yield, or take other action. + let waiting_watchdog_generation = Arc::new(AtomicUsize::new(0)); + // Subscribe before the conversation starts. let history_model_handle = BlocklistAIHistoryModel::handle(ctx); let terminal_id = self.terminal_driver.as_ref(ctx).terminal_view().id(); @@ -2622,12 +2823,69 @@ impl AgentDriver { }; if conversation.status().is_in_progress() { - // Conversation resumed or a new one started; cancel any pending idle timeout. + // Conversation resumed or a new one started; cancel any + // pending idle timeout AND any pending waiting watchdog. + // + // The watchdog cancellation here handles the resume path + // from QUALITY-780 client TECH §8.3: when client-detection + // sees an inbound `Cancel` or `WaitForEventsResult` and + // calls `clear_conversation_waiting_for_events`, that + // transitions the status back to `InProgress` and emits + // this same event, which we pick up here to supersede + // the watchdog generation. log::info!( "Ambient agent idle lifecycle: event=idle_timeout_cancel_requested task_id={:?} terminal_view_id={terminal_id:?} trigger=conversation_in_progress", me.task_id ); run_exit.cancel_idle_timeout(); + // Bump the watchdog generation so any pending watchdog + // fire is discarded. + waiting_watchdog_generation.fetch_add(1, Ordering::SeqCst); + return; + } + + // The conversation yielded via `wait_for_events`; schedule + // the local waiting watchdog (QUALITY-780 client TECH §4). + // This arm explicitly does **not** resolve `run_exit`: the + // run continues; the agent's next turn processes the + // synthetic empty `WaitForEventsResult` we emit on fire, + // and the conversation returns to `InProgress` via the + // normal resume path. + if conversation.status().is_waiting_for_events() { + let Some(call) = find_unresolved_wait_for_events_call(conversation) else { + // Conservative: if the conversation is + // `WaitingForEvents` but we can't find the matching + // tool call (shouldn't happen in normal flow, but + // possible if the message log has been edited or + // partially restored), log and skip scheduling. The + // server-side worker idle-shutdown is the defense- + // in-depth backstop in this case. + log::warn!( + "Ambient agent idle lifecycle: event=waiting_watchdog_skipped task_id={:?} terminal_view_id={terminal_id:?} reason=no_unresolved_wait_for_events_call", + me.task_id + ); + return; + }; + let timeout = watchdog_timeout_for_call(&call); + // Bump the generation: a fresh enter into + // `WaitingForEvents` supersedes any earlier watchdog + // from the same run (e.g. if the agent re-yielded + // after a previous timeout fire-and-resume). + let watchdog_gen = + waiting_watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1; + log::info!( + "Ambient agent idle lifecycle: event=waiting_watchdog_scheduled task_id={:?} terminal_view_id={terminal_id:?} timeout={timeout:?} tool_call_id={tool_call_id}", + me.task_id, + tool_call_id = call.tool_call_id + ); + me.schedule_waiting_watchdog( + *conversation_id, + call, + timeout, + watchdog_gen, + Arc::clone(&waiting_watchdog_generation), + ctx, + ); return; } diff --git a/app/src/ai/agent_sdk/driver_tests.rs b/app/src/ai/agent_sdk/driver_tests.rs index b3144e717b..693497a0ce 100644 --- a/app/src/ai/agent_sdk/driver_tests.rs +++ b/app/src/ai/agent_sdk/driver_tests.rs @@ -20,9 +20,11 @@ use warp_util::standardized_path::StandardizedPath; use warpui::{App, SingletonEntity as _}; use super::{ - build_secret_env_vars, AgentDriver, IdleTimeoutSender, - LEGACY_OZ_PARENT_LISTENER_MANAGED_EXTERNALLY_ENV, LEGACY_OZ_PARENT_STATE_ROOT_ENV, - OZ_MESSAGE_LISTENER_MANAGED_EXTERNALLY_ENV, OZ_MESSAGE_LISTENER_STATE_ROOT_ENV, + build_secret_env_vars, find_unresolved_wait_for_events_call, watchdog_timeout_for_call, + AgentDriver, IdleTimeoutSender, UnresolvedWaitForEventsCall, + DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS, LEGACY_OZ_PARENT_LISTENER_MANAGED_EXTERNALLY_ENV, + LEGACY_OZ_PARENT_STATE_ROOT_ENV, OZ_MESSAGE_LISTENER_MANAGED_EXTERNALLY_ENV, + OZ_MESSAGE_LISTENER_STATE_ROOT_ENV, }; use crate::ai::agent::task::TaskId; use crate::ai::agent::{ @@ -868,3 +870,211 @@ fn openai_api_key_exports_only_api_key_not_base_url() { "OPENAI_BASE_URL should NOT be exported as an env var" ); } + +// ── QUALITY-780 waiting watchdog tests ───────────────────────────────── +// +// These exercise the pure helpers introduced for the waiting watchdog (client +// TECH §4). They cover PRODUCT.md (11)–(13) at the unit level: the watchdog +// timeout source (server-supplied vs. default fallback) and the scan that +// finds the unresolved `wait_for_events` tool call on a conversation. +// +// The full subscription-driven scheduling/cancellation paths (entering +// `WaitingForEvents` schedules; returning to `InProgress` cancels) are +// exercised end-to-end via the Wave 3 integration tests against a fake +// server (per client TECH §"Testing and validation"). The watchdog fire +// path is intentionally a no-op stub in Wave 2 — see the +// `emit_wait_for_events_timeout_result` doc comment in `driver.rs` and the +// `TODO(integration, QUALITY-780)` marker. + +/// Helper: build an `api::Message` whose `message` oneof is a `ToolCall` +/// carrying a `WaitForEvents` payload with the given `idle_timeout_seconds`. +fn wait_for_events_tool_call_message( + task_id: &str, + tool_call_id: &str, + idle_timeout_seconds: i32, +) -> warp_multi_agent_api::Message { + use warp_multi_agent_api::message::tool_call::{Tool, WaitForEvents}; + use warp_multi_agent_api::message::{Message as MessageOneof, ToolCall}; + warp_multi_agent_api::Message { + id: format!("msg-{tool_call_id}"), + task_id: task_id.to_string(), + message: Some(MessageOneof::ToolCall(ToolCall { + tool_call_id: tool_call_id.to_string(), + tool: Some(Tool::WaitForEvents(WaitForEvents { + idle_timeout_seconds, + })), + })), + ..Default::default() + } +} + +/// Helper: build an `api::Message` whose `message` oneof is a `ToolCallResult` +/// referencing the given `tool_call_id`. The result variant is intentionally +/// `cancel` (a unit variant) so the test does not depend on any +/// per-result-type fixtures. +fn cancel_tool_call_result_message( + task_id: &str, + tool_call_id: &str, +) -> warp_multi_agent_api::Message { + use warp_multi_agent_api::message::tool_call_result::Result as ResultOneof; + use warp_multi_agent_api::message::{Message as MessageOneof, ToolCallResult}; + warp_multi_agent_api::Message { + id: format!("msg-result-{tool_call_id}"), + task_id: task_id.to_string(), + message: Some(MessageOneof::ToolCallResult(ToolCallResult { + tool_call_id: tool_call_id.to_string(), + result: Some(ResultOneof::Cancel(())), + ..Default::default() + })), + ..Default::default() + } +} + +/// Helper: build an `AIConversation` from a list of root-task messages. +fn conversation_from_messages( + messages: Vec, +) -> crate::ai::agent::conversation::AIConversation { + use crate::ai::agent::conversation::{AIConversation, AIConversationId}; + let task = warp_multi_agent_api::Task { + id: "task-root".to_string(), + messages, + ..Default::default() + }; + AIConversation::new_restored(AIConversationId::new(), vec![task], None) + .expect("new_restored should succeed with a single root task") +} + +#[test] +fn default_orchestrated_idle_timeout_seconds_is_thirty_minutes() { + // The waiting watchdog falls back to this value when the server does not + // supply an `idle_timeout_seconds` on the `wait_for_events` tool call. + // 30 minutes is intentionally chosen to roughly mirror the existing + // server-side `VMIdleTimeoutMinutes` tenant default; document the + // expectation so a future change to the constant trips this test. + assert_eq!(DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS, 30 * 60); +} + +#[test] +fn watchdog_timeout_uses_server_supplied_value_when_positive() { + let call = UnresolvedWaitForEventsCall { + tool_call_id: "tc-1".to_string(), + task_id: "task-root".to_string(), + idle_timeout_seconds: 900, + }; + assert_eq!(watchdog_timeout_for_call(&call), Duration::from_secs(900)); +} + +#[test] +fn watchdog_timeout_falls_back_to_default_when_unset() { + // Prost flattens scalars, so the proto's "unset" looks like `0` on the + // Rust side. Per client TECH §4, treat that as "use the built-in + // fallback". This covers PRODUCT.md (12)'s "falling back to a built-in + // client default if the server did not supply one". + let call = UnresolvedWaitForEventsCall { + tool_call_id: "tc-1".to_string(), + task_id: "task-root".to_string(), + idle_timeout_seconds: 0, + }; + assert_eq!( + watchdog_timeout_for_call(&call), + Duration::from_secs(DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS as u64) + ); +} + +#[test] +fn watchdog_timeout_clamps_negative_value_to_default() { + // Defense against a buggy or malicious payload. `Duration::from_secs` + // takes a `u64`; a negative value would underflow without the clamp. + let call = UnresolvedWaitForEventsCall { + tool_call_id: "tc-1".to_string(), + task_id: "task-root".to_string(), + idle_timeout_seconds: -42, + }; + assert_eq!( + watchdog_timeout_for_call(&call), + Duration::from_secs(DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS as u64) + ); +} + +#[test] +fn find_unresolved_returns_none_when_no_wait_for_events_call() { + // Empty conversation (no root-task messages) — `new_restored` returns + // `Err(NoRootTask)` if `tasks` is empty, so a non-empty task with + // unrelated messages must still produce `None`. + use warp_multi_agent_api::message::{Message as MessageOneof, UserQuery}; + let unrelated = warp_multi_agent_api::Message { + id: "msg-user".to_string(), + task_id: "task-root".to_string(), + message: Some(MessageOneof::UserQuery(UserQuery { + query: "hi".to_string(), + ..Default::default() + })), + ..Default::default() + }; + let conversation = conversation_from_messages(vec![unrelated]); + assert!(find_unresolved_wait_for_events_call(&conversation).is_none()); +} + +#[test] +fn find_unresolved_returns_call_when_present_and_unresolved() { + let conversation = conversation_from_messages(vec![wait_for_events_tool_call_message( + "task-root", + "tc-wait-1", + 900, + )]); + let call = find_unresolved_wait_for_events_call(&conversation) + .expect("unresolved WaitForEvents call should be detected"); + assert_eq!(call.tool_call_id, "tc-wait-1"); + assert_eq!(call.task_id, "task-root"); + assert_eq!(call.idle_timeout_seconds, 900); +} + +#[test] +fn find_unresolved_returns_none_when_resolved_by_later_result() { + // If a `ToolCallResult` (any variant; we use `cancel` for simplicity) + // appears later in the message log with a matching `tool_call_id`, the + // call is no longer unresolved — the watchdog should not be scheduled. + let conversation = conversation_from_messages(vec![ + wait_for_events_tool_call_message("task-root", "tc-wait-1", 900), + cancel_tool_call_result_message("task-root", "tc-wait-1"), + ]); + assert!(find_unresolved_wait_for_events_call(&conversation).is_none()); +} + +#[test] +fn find_unresolved_returns_most_recent_when_multiple_present() { + // Multiple yields can occur within one run (e.g. yield, resume, re-yield). + // Only the most recent unresolved one is the watchdog's target. This + // also covers the case where an earlier `WaitForEvents` was previously + // resolved — the function should still return the latest unresolved one. + let conversation = conversation_from_messages(vec![ + wait_for_events_tool_call_message("task-root", "tc-wait-1", 300), + cancel_tool_call_result_message("task-root", "tc-wait-1"), + wait_for_events_tool_call_message("task-root", "tc-wait-2", 600), + ]); + let call = find_unresolved_wait_for_events_call(&conversation) + .expect("unresolved WaitForEvents call should be detected"); + assert_eq!(call.tool_call_id, "tc-wait-2"); + assert_eq!(call.idle_timeout_seconds, 600); +} + +#[test] +fn unresolved_wait_for_events_call_roundtrips_idle_timeout_zero_via_helper() { + // The proto scalar is `0` when unset, and `find_unresolved_wait_for_events_call` + // forwards it as-is. The default-timeout fallback happens at + // `watchdog_timeout_for_call`, not at the detection site. Verify the + // chain: detection preserves the raw value, then the timeout helper + // applies the fallback. + let conversation = conversation_from_messages(vec![wait_for_events_tool_call_message( + "task-root", + "tc-wait-default", + 0, + )]); + let call = find_unresolved_wait_for_events_call(&conversation) + .expect("unresolved WaitForEvents call should be detected"); + assert_eq!(call.idle_timeout_seconds, 0); + assert_eq!( + watchdog_timeout_for_call(&call), + Duration::from_secs(DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS as u64) + ); +}