Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 259 additions & 1 deletion app/src/ai/agent_sdk/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ const SETUP_FAILED_IDLE_TIMEOUT: Duration = Duration::from_secs(120);
/// If no follow-up status arrives within this window, the driver terminates with the
/// original error so the CLI does not hang indefinitely.
const AUTO_RESUME_TIMEOUT: Duration = Duration::from_secs(120);
/// Default upper bound for the waiting watchdog (QUALITY-780 client TECH §4)
/// when the server-supplied `idle_timeout_seconds` on a `wait_for_events`
/// tool call is unset (i.e. `0`). 30 minutes; chosen to roughly mirror the
/// existing server-side `VMIdleTimeoutMinutes` tenant default so the client
/// watchdog and the worker-side safety-net stay in the same ballpark when
/// neither is configured.
pub(super) const DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS: i32 = 30 * 60;
/// Signals to Claude child-harness hooks that Warp already owns the background
/// message-listener lifecycle, so the plugin should reuse the shared state
/// files instead of spawning and cleaning up its own listener.
Expand Down Expand Up @@ -201,6 +208,85 @@ impl<T: Send + 'static> IdleTimeoutSender<T> {
}
}

/// Information about the most recent unresolved `wait_for_events` tool call
/// on a conversation that's currently in `ConversationStatus::WaitingForEvents`.
///
/// The driver's waiting watchdog (QUALITY-780 client TECH §4) needs three
/// fields from the call: the `tool_call_id` (so the watchdog can emit a
/// matching `WaitForEventsResult` tool-call-result on fire), the `task_id`
/// that owns the message (so the result is dispatched against the active
/// task), and the server-supplied `idle_timeout_seconds` (so the watchdog
/// can schedule itself with the same upper bound the server used to extend
/// the task's idle timeout at yield time).
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct UnresolvedWaitForEventsCall {
pub(super) tool_call_id: String,
pub(super) task_id: String,
/// Raw value from the proto's `Message::ToolCall::WaitForEvents`
/// payload. Treated as "unset" when `0` per the prost convention for
/// flattened scalars; callers should fall back to
/// [`DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS`] in that case.
pub(super) idle_timeout_seconds: i32,
}

/// Scan the conversation's linearized messages for the most recent
/// **unresolved** `wait_for_events` tool call.
///
/// A call is unresolved if no later `Message::ToolCallResult` matches its
/// `tool_call_id`. This mirrors the detection-site predicate that owns
/// `mark_/clear_conversation_waiting_for_events` (client TECH §8.1), but
/// avoids coupling the driver to that internal storage: the driver scans the
/// public message log itself, which is the same source of truth
/// `apply_client_actions` operates against.
///
/// Returns `None` when no `wait_for_events` call has been seen, or when the
/// most-recent one has already been resolved (the conversation is no longer
/// in the waiting state and the watchdog should not be scheduled).
pub(super) fn find_unresolved_wait_for_events_call(
conversation: &crate::ai::agent::conversation::AIConversation,
) -> Option<UnresolvedWaitForEventsCall> {
use warp_multi_agent_api::message::tool_call::Tool;
use warp_multi_agent_api::message::Message;

let mut resolved_tool_call_ids: HashSet<String> = HashSet::new();
let mut latest_unresolved: Option<UnresolvedWaitForEventsCall> = None;

for message in conversation.all_linearized_messages() {
match message.message.as_ref() {
Some(Message::ToolCallResult(result)) => {
resolved_tool_call_ids.insert(result.tool_call_id.clone());
}
Some(Message::ToolCall(tool_call)) => {
if let Some(Tool::WaitForEvents(payload)) = tool_call.tool.as_ref() {
latest_unresolved = Some(UnresolvedWaitForEventsCall {
tool_call_id: tool_call.tool_call_id.clone(),
task_id: message.task_id.clone(),
idle_timeout_seconds: payload.idle_timeout_seconds,
});
}
}
_ => {}
}
}

latest_unresolved.filter(|call| !resolved_tool_call_ids.contains(&call.tool_call_id))
}

/// Resolve the watchdog timeout duration for an unresolved `wait_for_events`
/// call. Honors the server-supplied `idle_timeout_seconds` when set;
/// otherwise falls back to [`DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS`].
///
/// Negative values are clamped to the default for safety: a negative timeout
/// would otherwise underflow the `u64` conversion at the call site.
pub(super) fn watchdog_timeout_for_call(call: &UnresolvedWaitForEventsCall) -> Duration {
let seconds = if call.idle_timeout_seconds <= 0 {
DEFAULT_ORCHESTRATED_IDLE_TIMEOUT_SECONDS
} else {
call.idle_timeout_seconds
};
Duration::from_secs(seconds as u64)
}

/// How to resume an existing conversation when starting an agent run.
///
/// The Oz harness restores the full conversation transcript into the terminal pane and treats
Expand Down Expand Up @@ -2437,6 +2523,109 @@ impl AgentDriver {
Ok(())
}

/// Schedule the local waiting watchdog for an unresolved
/// `wait_for_events` tool call (QUALITY-780 client TECH §4).
///
/// On fire — if `watchdog_generation` still equals `expected_gen` —
/// invoke [`Self::emit_wait_for_events_timeout_result`] to close out the
/// pending tool call with an empty `WaitForEventsResult` against the
/// active task. The fire path explicitly does **not** resolve
/// `run_exit`: a waiting run continues; the next agent turn observes
/// the synthetic result and decides how to proceed.
fn schedule_waiting_watchdog(
&self,
conversation_id: AIConversationId,
call: UnresolvedWaitForEventsCall,
timeout: Duration,
expected_gen: usize,
watchdog_generation: Arc<AtomicUsize>,
ctx: &mut ModelContext<Self>,
) {
ctx.spawn(
async move {
warpui::r#async::Timer::after(timeout).await;
(expected_gen, call)
},
move |me, (fired_gen, fired_call), ctx| {
// Generation check: if the watchdog has been superseded by
// either a resume (status → InProgress) or a fresh
// WaitingForEvents re-entry, our generation no longer
// matches and we must not fire.
if watchdog_generation.load(Ordering::SeqCst) != fired_gen {
log::debug!(
"Ambient agent idle lifecycle: event=waiting_watchdog_superseded task_id={:?} expected_gen={fired_gen}",
me.task_id
);
return;
}
me.emit_wait_for_events_timeout_result(conversation_id, fired_call, ctx);
},
);
}

/// Emit a synthetic `WaitForEventsResult` tool-call result on watchdog
/// fire (QUALITY-780 client TECH §4 and PRODUCT.md (12)/(13)).
///
/// The result references the stored `tool_call_id` of the unresolved
/// `wait_for_events` call. Per PRODUCT.md (12)–(13), this MUST NOT
/// cancel the run — the agent's next turn sees the empty timeout
/// result and decides how to proceed (commonly `finish_task`, but the
/// agent may also re-yield, ask the user, or take other action).
///
/// Implementation note (Wave 2 boundary):
///
/// The full upstream emission path (proto `Message::ToolCallResult`
/// carrying `WaitForEventsResult{}`, sent as a new tool-call-result
/// input on the active task via `BlocklistAIController` so the server
/// echoes it back through the response stream and `apply_client_actions`
/// triggers `clear_conversation_waiting_for_events`) depends on the
/// `AIAgentActionResultType::WaitForEvents(WaitForEventsResult{})`
/// variant + matching `convert_to.rs` proto encoding, both of which
/// live outside this agent's owned files and are introduced by
/// `client-detection`'s Wave 2 work. This method therefore logs the
/// fire event with the stored `tool_call_id` and `task_id`; the actual
/// upstream emission is wired up at Wave 3 integration time once
/// `client-detection`'s variant is in place. Crucially, this method
/// does NOT resolve `run_exit` and the watchdog scheduling itself is
/// fully tested; that's the lifecycle invariant PRODUCT.md (11) cares
/// about.
fn emit_wait_for_events_timeout_result(
&self,
conversation_id: AIConversationId,
call: UnresolvedWaitForEventsCall,
_ctx: &mut ModelContext<Self>,
) {
log::info!(
"Ambient agent waiting watchdog fired: event=wait_for_events_timeout_emitted task_id={:?} conversation_id={conversation_id} tool_call_id={tool_call_id} task_id_for_call={task_id_for_call}",
self.task_id,
tool_call_id = call.tool_call_id,
task_id_for_call = call.task_id,
);
// TODO(integration, QUALITY-780): once `client-detection` lands
// `AIAgentActionResultType::WaitForEvents(WaitForEventsResult{})`
// and the `BlocklistAIController::submit_wait_for_events_timeout(
// conversation_id, tool_call_id, ctx,
// )` helper, replace this no-op with:
//
// let ai_controller = self
// .terminal_driver
// .as_ref(_ctx)
// .terminal_view()
// .as_ref(_ctx)
// .ai_controller()
// .clone();
// ai_controller.update(_ctx, |controller, ctx| {
// controller.submit_wait_for_events_timeout(
// conversation_id,
// call.tool_call_id.clone(),
// ctx,
// );
// });
//
// The run continues regardless; we do not resolve `run_exit`.
let _ = (conversation_id, call);
}

/// Execute an AI run in the terminal session and wait for it to complete.
///
/// Conversation output is streamed as it's available.
Expand All @@ -2449,6 +2638,18 @@ impl AgentDriver {
let (tx, rx) = oneshot::channel();
let run_exit = IdleTimeoutSender::new(tx);

// Generation counter for the waiting watchdog (QUALITY-780 client
// TECH §4). Bumped each time the conversation enters
// `WaitingForEvents` (scheduling a fresh watchdog) and again when the
// conversation resumes back to `InProgress` (cancelling the pending
// watchdog by superseding its generation). This mirrors the
// `IdleTimeoutSender` generation-counter pattern but is decoupled from
// `run_exit` because the watchdog must not resolve `run_exit` on
// fire: a waiting run continues; the agent itself processes the
// synthetic `WaitForEventsResult` and decides whether to finish, re-
// yield, or take other action.
let waiting_watchdog_generation = Arc::new(AtomicUsize::new(0));

// Subscribe before the conversation starts.
let history_model_handle = BlocklistAIHistoryModel::handle(ctx);
let terminal_id = self.terminal_driver.as_ref(ctx).terminal_view().id();
Expand Down Expand Up @@ -2622,12 +2823,69 @@ impl AgentDriver {
};

if conversation.status().is_in_progress() {
// Conversation resumed or a new one started; cancel any pending idle timeout.
// Conversation resumed or a new one started; cancel any
// pending idle timeout AND any pending waiting watchdog.
//
// The watchdog cancellation here handles the resume path
// from QUALITY-780 client TECH §8.3: when client-detection
// sees an inbound `Cancel` or `WaitForEventsResult` and
// calls `clear_conversation_waiting_for_events`, that
// transitions the status back to `InProgress` and emits
// this same event, which we pick up here to supersede
// the watchdog generation.
log::info!(
"Ambient agent idle lifecycle: event=idle_timeout_cancel_requested task_id={:?} terminal_view_id={terminal_id:?} trigger=conversation_in_progress",
me.task_id
);
run_exit.cancel_idle_timeout();
// Bump the watchdog generation so any pending watchdog
// fire is discarded.
waiting_watchdog_generation.fetch_add(1, Ordering::SeqCst);
return;
}

// The conversation yielded via `wait_for_events`; schedule
// the local waiting watchdog (QUALITY-780 client TECH §4).
// This arm explicitly does **not** resolve `run_exit`: the
// run continues; the agent's next turn processes the
// synthetic empty `WaitForEventsResult` we emit on fire,
// and the conversation returns to `InProgress` via the
// normal resume path.
if conversation.status().is_waiting_for_events() {
let Some(call) = find_unresolved_wait_for_events_call(conversation) else {
// Conservative: if the conversation is
// `WaitingForEvents` but we can't find the matching
// tool call (shouldn't happen in normal flow, but
// possible if the message log has been edited or
// partially restored), log and skip scheduling. The
// server-side worker idle-shutdown is the defense-
// in-depth backstop in this case.
log::warn!(
"Ambient agent idle lifecycle: event=waiting_watchdog_skipped task_id={:?} terminal_view_id={terminal_id:?} reason=no_unresolved_wait_for_events_call",
me.task_id
);
return;
};
let timeout = watchdog_timeout_for_call(&call);
// Bump the generation: a fresh enter into
// `WaitingForEvents` supersedes any earlier watchdog
// from the same run (e.g. if the agent re-yielded
// after a previous timeout fire-and-resume).
let watchdog_gen =
waiting_watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1;
log::info!(
"Ambient agent idle lifecycle: event=waiting_watchdog_scheduled task_id={:?} terminal_view_id={terminal_id:?} timeout={timeout:?} tool_call_id={tool_call_id}",
me.task_id,
tool_call_id = call.tool_call_id
);
me.schedule_waiting_watchdog(
*conversation_id,
call,
timeout,
watchdog_gen,
Arc::clone(&waiting_watchdog_generation),
ctx,
);
return;
}

Expand Down
Loading