diff --git a/Cargo.lock b/Cargo.lock index c07cf78a..6ae0a936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7966,6 +7966,10 @@ dependencies = [ "tracing", ] +[[package]] +name = "op-succinct-common" +version = "4.3.1" + [[package]] name = "op-succinct-eigenda-client-utils" version = "0.1.0" @@ -8290,6 +8294,7 @@ dependencies = [ "log", "op-succinct-build-utils", "op-succinct-client-utils", + "op-succinct-common", "op-succinct-elfs", "op-succinct-fp", "op-succinct-host-utils", diff --git a/Cargo.toml b/Cargo.toml index 84e6b562..2b89f61a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "utils/client", + "utils/common", "utils/host", "utils/build", "utils/celestia/client", @@ -146,6 +147,7 @@ op-succinct-eigenda-host-utils = { path = "utils/eigenda/host" } celo-eigenda-registry = { path = "utils/eigenda/registry" } op-succinct-proof-utils = { path = "utils/proof" } op-succinct-signer-utils = { path = "utils/signer" } +op-succinct-common = { path = "utils/common" } op-succinct-signer-gcp-utils = { path = "utils/signer-gcp" } op-succinct-range-utils = { path = "programs/range/utils" } op-succinct-bindings = { path = "bindings" } diff --git a/scripts/utils/Cargo.toml b/scripts/utils/Cargo.toml index 5dbce135..c5b9ae81 100644 --- a/scripts/utils/Cargo.toml +++ b/scripts/utils/Cargo.toml @@ -83,6 +83,7 @@ env_logger = "0.11" rustls = "0.23.23" # local +op-succinct-common.workspace = true op-succinct-host-utils.workspace = true op-succinct-client-utils.workspace = true op-succinct-proof-utils.workspace = true diff --git a/scripts/utils/Dockerfile.game-monitor b/scripts/utils/Dockerfile.game-monitor index 99f4b7ef..2668973b 100644 --- a/scripts/utils/Dockerfile.game-monitor +++ b/scripts/utils/Dockerfile.game-monitor @@ -59,19 +59,10 @@ RUN apt-get update && apt-get install -y \ libssl-dev \ && rm -rf /var/lib/apt/lists/* -# Install Rust (needed by calls to cargo_metadata::MetadataCommand) -RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y -ENV PATH=/root/.cargo/bin:$PATH -RUN rustup install stable && rustup default stable - -# Copy toolchain file -COPY rust-toolchain.toml . -# This installs the nightly version from the file -RUN rustup show - # Copy the built binaries COPY --from=builder /app/target/release/cost-estimator /usr/local/bin/ COPY --from=builder /app/target/release/game-monitor /usr/local/bin/ -# Copy the entire workspace so we can run cargo_metadata::MetadataCommand to get the workspace root. -COPY . . +# Copy operational scripts +COPY --chmod=0755 scripts/utils/rerun-cost-estimator.sh . +COPY --chmod=0755 scripts/utils/fetch-game-logs.sh . diff --git a/scripts/utils/bin/game_monitor.rs b/scripts/utils/bin/game_monitor.rs index e7161df3..98a4be0e 100644 --- a/scripts/utils/bin/game_monitor.rs +++ b/scripts/utils/bin/game_monitor.rs @@ -1,11 +1,28 @@ +//! Game monitor daemon and operational tools. +//! +//! Watches for new fault-proof dispute games on L1 and runs a `cost-estimator` child process +//! for each in mock-proving mode. Implements a two-tier retry system: +//! +//! 1. **Primary**: fast retries with linear backoff (bounded by `--cost-estimator-retries`). +//! 2. **Background**: long-tail retries with exponential 4x backoff, only using spare execution +//! slots, evicted after `--background-retry-max-age-secs`. +//! +//! Running processes are monitored for anomalies (excessive runtime or log volume relative to +//! the median of recent completions) and killed if they exceed configurable multipliers. +//! +//! Progress (`last_contiguous` game index + background retry queue) is persisted to disk so +//! the daemon can resume after restarts. + +use alloy_eips::BlockId; use alloy_primitives::{Address, U256}; use alloy_provider::ProviderBuilder; use anyhow::{Context, Result}; -use clap::Parser; +use clap::{Parser, Subcommand}; use fault_proof::contract::{ DisputeGameFactory::DisputeGameFactoryInstance, OPSuccinctFaultDisputeGame, }; use log::{debug, error, info, warn}; +use op_succinct_common::SequenceTracker; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, VecDeque}, @@ -14,15 +31,15 @@ use std::{ io::Write, path::{Path, PathBuf}, process::{Child, Command, Stdio}, - time::{Duration, Instant}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use tokio::{ signal::unix::{signal, SignalKind}, time::sleep, }; +/// The dispute game type we monitor. Games with a different type are skipped. const GAME_TYPE: u32 = 42; -const MAX_RETRIES: u32 = 3; /// Kill a process if its log file is this many times larger than the median of peers. const LOG_VOLUME_KILL_MULTIPLIER: f64 = 10.0; /// Minimum number of completion history entries required to perform median comparison. @@ -31,9 +48,37 @@ const MEDIAN_THRESHOLD: usize = 3; /// processes. const RUNTIME_KILL_MULTIPLIER: f64 = 5.0; -/// Arguments for the game monitor. +/// Top-level CLI for the game-monitor binary. +/// +/// The binary historically exposed a single mode (the long-running daemon). It now multiplexes +/// between subcommands so that one-shot operational tools can ship in the same image. New +/// subcommands should be added to [`CliCommand`] rather than overloading [`RunArgs`]. #[derive(Debug, Clone, Parser)] -pub struct GameMonitorArgs { +#[command(version, about = "Game monitor daemon and operational tools")] +pub struct Cli { + #[command(subcommand)] + pub command: CliCommand, +} + +/// Subcommands exposed by the game-monitor binary. +/// +/// Named `CliCommand` rather than `Command` to avoid a clash with [`std::process::Command`], +/// which is used elsewhere in this binary to spawn cost-estimator child processes. +#[derive(Debug, Clone, Subcommand)] +pub enum CliCommand { + /// Run the game monitor daemon (default behaviour prior to the subcommand split). + Run(RunArgs), + /// Read an existing `progress.json`, generate `BackgroundRetry` entries for the given game + /// indexes, append them, and print the complete `ProgressState` as JSON to stdout. + /// + /// The daemon must be stopped while replacing `progress.json`; on next startup it will + /// load the updated entries and schedule them through the normal background-retry path. + GenBackgroundRetry(GenBackgroundRetryArgs), +} + +/// Arguments for the game monitor daemon (the `run` subcommand). +#[derive(Debug, Clone, Parser)] +pub struct RunArgs { /// The environment file to use. This file should contain the following environment variables: /// /// - DISPUTE_GAME_FACTORY_ADDRESS: The address of the dispute game factory contract. @@ -104,30 +149,115 @@ pub struct GameMonitorArgs { /// Path to the completion history file. Defaults to `/completion_history.json`. #[arg(long)] pub history_file: Option, + + /// Maximum number of retries for a failed cost estimator process before giving up. + #[arg(long, default_value = "1")] + pub cost_estimator_retries: u32, + + /// Batch size passed to the cost estimator, caps the per-execution chunk size to control SP1 + /// guest memory usage. Note smaller games still execute as a single chunk. + #[arg(long, default_value = "200")] + pub batch_size: u64, + + /// Path to the progress file. Defaults to `/progress.json`. + #[arg(long)] + pub progress_file: Option, + + /// Maximum age in seconds (relative to the L1 game creation timestamp) for background + /// retries. Once a background-queued game exceeds this age, it is evicted and no further + /// attempts are made. Default is 3.5 days (302400 seconds). + #[arg(long, default_value = "302400")] + pub background_retry_max_age_secs: u64, +} + +/// Arguments for the `gen-background-retry` subcommand. +/// +/// This subcommand is a one-shot tool that reads an existing `progress.json`, appends newly +/// generated [`BackgroundRetry`] entries, and writes the complete [`ProgressState`] to stdout. +/// Operators can inspect the output and, if satisfied, redirect it to replace the original file. +#[derive(Debug, Clone, Parser)] +pub struct GenBackgroundRetryArgs { + /// Path to the existing `progress.json` file. + /// + /// The file is parsed as a [`ProgressState`]. The new background-retry entries are appended + /// to whatever is already in `background_retries`, and the full state (including + /// `last_contiguous`) is emitted to stdout. The original file is never modified. + #[arg(long)] + pub progress_file: PathBuf, + + /// L1 RPC URL used to look up each game's on-chain creation timestamp. + /// + /// Falls back to the `L1_RPC` environment variable if not provided. This makes it easy to + /// run the tool inside a deployed pod where the daemon's env is already configured. + #[arg(long, env = "L1_RPC")] + pub l1_rpc: String, + + /// Address of the dispute game factory proxy on L1. + /// + /// Falls back to the `DISPUTE_GAME_FACTORY_ADDRESS` environment variable if not provided. + #[arg(long, env = "DISPUTE_GAME_FACTORY_ADDRESS")] + pub dispute_game_factory_address: Address, + + /// `last_wait` value (in seconds) recorded on every emitted entry. + /// + /// This is the wait that the daemon will quadruple on the next failure. Picking a sensible + /// value matters: the daemon's normal exponential schedule starts at + /// `initial_game_delay * 2 * max_retries * 4` (about 80 minutes with default settings) and + /// each subsequent failure multiplies it by 4. + #[arg(long)] + pub last_wait_secs: u64, + + /// One or more game indexes to emit background-retry entries for. + /// + /// Each index is fetched from the dispute game factory to populate `game_created_at`. The + /// command fails fast if any index can't be fetched or has the wrong game type, so a typo + /// is surfaced to the operator rather than silently producing a half-correct list. + #[arg(required = true, num_args = 1..)] + pub game_indexes: Vec, +} + +/// Distinguishes between primary scheduling (initial attempt + bounded retries on failure) and +/// background scheduling (long-tail retries that run only with spare capacity once primary +/// retries are exhausted). +#[derive(Clone, Copy, Debug)] +enum AttemptKind { + Primary { retries: u32 }, + Background { attempts: u32 }, } /// Represents a running cost estimator process for a game. struct RunningEstimator { started_at: Instant, process: Child, - log_file: PathBuf, + log_file: LogFile, block_range: u64, - retries: u32, + kind: AttemptKind, + /// L1 wall-clock time the game was created. Carried so that on failure we can populate a + /// `BackgroundRetry` with the correct creation timestamp. + game_created_at: SystemTime, } -/// A game index discovered from the factory, waiting for its delay to elapse before -/// fetching game details and spawning the cost estimator. +/// A game index discovered from the factory, once executable_at has passed the game can be +/// executed. The delay before execution helps to reduce infrastructure synchronisation problems, +/// such as what is the latest finalized block, and also provides a mechanism to delay re-execution +/// when there may be some temporary infrastructure outage. struct PendingGame { - discovered_at: Instant, + executable_at: Instant, game_index: u64, - retries: u32, + kind: AttemptKind, } +/// On-chain metadata for a single dispute game, fetched from L1 via [`fetch_game_data`]. struct GameData { game_index: u64, game_address: Address, + /// L2 block at which the game's execution range starts. start_block: u64, + /// L2 block at which the game's execution range ends. end_block: u64, + /// L1 wall-clock time at which the game was created on the dispute game factory. Used for + /// age-based eviction of background retries. + created_at: SystemTime, } impl GameData { @@ -136,12 +266,41 @@ impl GameData { } } +/// Deferred action determined during the process-scan phase of +/// [`MonitorState::cleanup_finished_processes`]. Actions are collected first and applied +/// afterwards to avoid mutating `running_processes` while iterating over it. enum ProcessAction { Success { duration: Duration, block_range: u64 }, Kill { reason: String }, Retry { reason: String }, } +/// Outcome of a single `spawn_game` invocation. Callers use this to decide whether to keep +/// pulling from their queue or to yield back to the outer poll loop. +#[must_use] +#[derive(Debug, PartialEq, Eq)] +enum SpawnOutcome { + /// Process started and inserted into `running_processes`. Queue-specific success cleanup + /// has already been performed inside `spawn_game`. + Spawned, + /// The game has the wrong type for this monitor; queue-specific cleanup has been performed. + WrongGameType, + /// Fetching game data from L1/L2 failed. Queues are unmodified so the entry will be retried + /// on the next poll. Callers typically `continue 'outer` to abandon the current iteration. + FetchFailed, +} + +/// Bundles the per-spawn configuration that doesn't change across loop iterations. Built once +/// in `main` and passed by reference to `MonitorState::spawn_game`. +struct SpawnContext<'a, P: alloy_provider::Provider + Clone> { + factory: &'a DisputeGameFactoryInstance

, + l1_provider: &'a P, + cost_estimator_binary_path: &'a Path, + batch_size: u64, + env_file: &'a Path, + logs_dir: &'a Path, +} + /// A record of a successfully completed cost estimator process, used for anomaly detection. #[derive(Clone, Debug, Serialize, Deserialize)] struct CompletionRecord { @@ -153,22 +312,77 @@ struct CompletionRecord { block_range: u64, } +/// A long-tail retry for a game whose primary retry budget has been exhausted. Background +/// retries run only with spare capacity and survive process restarts via the persisted +/// `ProgressState`. +#[derive(Clone, Debug, Serialize, Deserialize)] +struct BackgroundRetry { + game_index: u64, + /// L1 wall-clock time at which the game was created on the dispute game factory. Used to + /// enforce the maximum age before eviction. + game_created_at: SystemTime, + /// Wall-clock time at which the next attempt becomes eligible. `SystemTime` is used (rather + /// than `Instant`) so the value survives process restarts. + next_attempt_at: SystemTime, + /// Wait used before the most recent attempt; the next wait is `last_wait * 4`. + last_wait: Duration, + /// Number of background attempts performed so far for this game. + attempts: u32, +} + +/// The subset of daemon state that survives restarts, serialised to `progress.json`. +/// +/// `last_contiguous` is the highest game index such that every index up to and including it +/// has been processed (successfully or with retries exhausted). On startup the daemon resumes +/// from `last_contiguous + 1`. +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +struct ProgressState { + last_contiguous: u64, + #[serde(default)] + background_retries: Vec, +} + +/// The full mutable state of the running daemon. Not serialised directly; the persistable +/// subset is extracted into [`ProgressState`] by [`save_progress`](Self::save_progress). struct MonitorState { + /// Currently executing cost-estimator child processes, keyed by game index. running_processes: HashMap, + /// Games waiting for their `executable_at` delay to elapse before spawning. pending_games: VecDeque, + /// Games whose primary retries are exhausted, awaiting low-priority background attempts. + background_retries: VecDeque, + /// The next game index to discover from the factory (monotonically increasing). next_game_index: u64, + /// Delay applied to newly discovered games and used as the base for retry backoff. + initial_game_delay: Duration, + /// Sliding window of recent successful completions for anomaly detection. completion_history: VecDeque, + /// Absolute ceiling on cost-estimator runtime before it is killed. max_process_duration_secs: u64, + /// Max entries in `completion_history`; oldest are evicted when full. max_history_length: usize, + /// Number of primary retries before a game moves to background. + max_retries: u32, history_file: PathBuf, + /// Tracks out-of-order game completions to compute `last_contiguous`. + sequence_tracker: SequenceTracker, + progress_file: PathBuf, + /// Games older than this (from L1 creation time) are evicted from background retries. + background_retry_max_age: Duration, } impl MonitorState { + #[allow(clippy::too_many_arguments)] fn new( next_game_index: u64, + initial_game_delay: Duration, max_process_duration_secs: u64, max_history_length: usize, + max_retries: u32, history_file: PathBuf, + progress_file: PathBuf, + background_retries: VecDeque, + background_retry_max_age: Duration, ) -> Self { let completion_history = Self::load_history(&history_file, max_history_length); info!( @@ -179,14 +393,67 @@ impl MonitorState { Self { running_processes: HashMap::new(), pending_games: VecDeque::new(), + background_retries, next_game_index, + initial_game_delay, completion_history, max_process_duration_secs, max_history_length, + max_retries, history_file, + sequence_tracker: SequenceTracker::new(next_game_index), + progress_file, + background_retry_max_age, + } + } + + /// Load persisted progress from disk. Returns `None` if the file doesn't exist or can't + /// be parsed (a warning is logged in the latter case). + fn load_progress(path: &Path) -> Option { + let data = match fs::read_to_string(path) { + Ok(data) => data, + Err(_) => return None, + }; + match serde_json::from_str(&data) { + Ok(state) => Some(state), + Err(e) => { + warn!("Failed to parse progress file {}: {}", path.display(), e); + None + } + } + } + + /// Record a game as completed. If this causes `last_contiguous` to advance (i.e. there + /// are no more gaps below this index), progress is persisted to disk. + fn mark_game_completed(&mut self, game_index: u64) { + let old_end = self.sequence_tracker.end(); + self.sequence_tracker.add(game_index); + let new_end = self.sequence_tracker.end(); + if new_end != old_end { + info!("Last contiguous advanced from {} to {}", old_end, new_end); + self.save_progress(); + } + } + + /// Atomically write the current `last_contiguous` and `background_retries` to disk. + fn save_progress(&self) { + let state = ProgressState { + last_contiguous: self.sequence_tracker.end(), + background_retries: self.background_retries.iter().cloned().collect(), + }; + let data = match serde_json::to_string(&state) { + Ok(data) => data, + Err(e) => { + warn!("Failed to serialize progress state: {}", e); + return; + } + }; + if let Err(e) = atomic_write(&self.progress_file, data.as_bytes()) { + warn!("Failed to write progress to {}: {}", self.progress_file.display(), e); } } + /// Load completion history from disk, truncating to `max_length` if it has grown. fn load_history(path: &Path, max_length: usize) -> VecDeque { let data = match fs::read_to_string(path) { Ok(data) => data, @@ -205,10 +472,11 @@ impl MonitorState { records } + /// Atomically write the completion history to disk. fn save_history(&self) { match serde_json::to_string(&self.completion_history) { Ok(data) => { - if let Err(e) = fs::write(&self.history_file, data) { + if let Err(e) = atomic_write(&self.history_file, data.as_bytes()) { warn!( "Failed to write completion history to {}: {}", self.history_file.display(), @@ -222,6 +490,7 @@ impl MonitorState { } } + /// Append a completion record (evicting the oldest if at capacity) and persist to disk. fn push_completion(&mut self, record: CompletionRecord) { if self.max_history_length == 0 { return; @@ -233,8 +502,20 @@ impl MonitorState { self.save_history(); } + /// Poll all running processes and handle completions, failures, and anomalies. + /// + /// This is the core housekeeping method called at the top of each poll iteration. It: + /// 1. Computes median time-per-block and log-size-per-block from completion history. + /// 2. Scans each running process via `try_wait()`: + /// - Exited successfully -> `ProcessAction::Success` + /// - Exited with error -> `ProcessAction::Retry` + /// - Still running but exceeds duration/time/log anomaly thresholds -> `ProcessAction::Kill` + /// 3. Applies deferred actions: records completions, kills runaways, re-queues failures. + /// + /// Actions are collected into a `Vec` first because we can't mutate `running_processes` + /// (to remove entries or call `maybe_requeue`) while iterating over it. fn cleanup_finished_processes(&mut self) { - // Calculate median log size per block + // Calculate median log size per block from completion history for anomaly comparison. let lpb_values: Vec = self .completion_history .iter() @@ -250,11 +531,13 @@ impl MonitorState { .collect(); let median_tpb: Option = median(&tpb_values, MEDIAN_THRESHOLD); + // Snapshot current log file sizes for all running processes so we can compare + // against the median without re-reading metadata during the scan loop. let running_log_sizes: HashMap = self .running_processes .iter() .map(|(id, est)| { - let size = fs::metadata(&est.log_file).map(|m| m.len()).unwrap_or(0); + let size = fs::metadata(&est.log_file.path).map(|m| m.len()).unwrap_or(0); (*id, size) }) .collect(); @@ -266,11 +549,13 @@ impl MonitorState { match estimator.process.try_wait() { Ok(Some(status)) => { - if status.success() { + let success = status.success(); + estimator.log_file.mark_complete(success); + if success { info!( "Cost estimator {} completed successfully, log file: {}", id, - estimator.log_file.display(), + estimator.log_file.path.display(), ); process_actions.push(( *id, @@ -284,7 +569,7 @@ impl MonitorState { "Cost estimator {} failed with status {:?}, log file: {}", id, status, - estimator.log_file.display(), + estimator.log_file.path.display(), ); process_actions.push(( *id, @@ -293,6 +578,8 @@ impl MonitorState { } } Ok(None) => { + // Process still running. Check kill conditions using a closure that + // returns Some(reason) on the first triggered condition. let kill_reason = (|| { if elapsed.as_secs() > self.max_process_duration_secs { return Some(format!( @@ -312,15 +599,19 @@ impl MonitorState { } } if let Some(med_lpb) = median_lpb { - let current_lpb = running_log_sizes.get(id).copied().unwrap_or(0) - as f64 / - estimator.block_range as f64; + let current_log_bytes = + running_log_sizes.get(id).copied().unwrap_or(0) as f64; + let current_lpb = current_log_bytes / estimator.block_range as f64; if current_lpb > LOG_VOLUME_KILL_MULTIPLIER * med_lpb { + let median_log_bytes = med_lpb * estimator.block_range as f64; return Some(format!( - "log size ({:.1} MB) exceeds {:.0}x median ({:.1} MB)", - current_lpb / (1024.0 * 1024.0), + "log size ({:.1} MB, {:.0} bytes/block) exceeds \ + {:.0}x median ({:.1} MB, {:.0} bytes/block)", + current_log_bytes / (1024.0 * 1024.0), + current_lpb, LOG_VOLUME_KILL_MULTIPLIER, - med_lpb / (1024.0 * 1024.0) + median_log_bytes / (1024.0 * 1024.0), + med_lpb, )); } } @@ -329,11 +620,12 @@ impl MonitorState { })(); if let Some(reason) = kill_reason { + estimator.log_file.mark_complete(false); error!( "Cost estimator {} is out of control ({}), log file: {}. Killing it.", id, reason, - estimator.log_file.display() + estimator.log_file.path.display() ); process_actions.push((*id, ProcessAction::Kill { reason })); } @@ -344,77 +636,297 @@ impl MonitorState { } } + // Apply deferred actions now that we're no longer borrowing running_processes. for (id, action) in process_actions { match action { ProcessAction::Success { duration, block_range } => { if let Some(est) = self.running_processes.remove(&id) { - let log_size = fs::metadata(&est.log_file).map(|m| m.len()).unwrap_or(0); - LogFile::mark_complete(&est.log_file, true); - if block_range > 0 { - self.push_completion(CompletionRecord { - duration, - log_size, - block_range, - }); + match est.kind { + AttemptKind::Primary { .. } => { + let log_size = + fs::metadata(&est.log_file.path).map(|m| m.len()).unwrap_or(0); + if block_range > 0 { + self.push_completion(CompletionRecord { + duration, + log_size, + block_range, + }); + } + self.mark_game_completed(id); + } + AttemptKind::Background { attempts } => { + // Game already marked completed when it entered the background + // queue. Skip mark_game_completed (avoids the SequenceTracker + // duplicate-add leak) and skip push_completion (delayed retries + // would skew the median-based anomaly detection). + info!( + "Background attempt {} for game {} succeeded; removing \ + from background queue", + attempts, id + ); + self.background_retries.retain(|bg| bg.game_index != id); + self.save_progress(); + } } } } ProcessAction::Kill { reason } => { if let Some(mut est) = self.running_processes.remove(&id) { let _ = est.process.kill(); - LogFile::mark_complete(&est.log_file, false); - self.maybe_requeue(id, est.retries, &reason); + self.maybe_requeue(id, est.kind, est.game_created_at, &reason); } } ProcessAction::Retry { reason } => { if let Some(est) = self.running_processes.remove(&id) { - LogFile::mark_complete(&est.log_file, false); - self.maybe_requeue(id, est.retries, &reason); + self.maybe_requeue(id, est.kind, est.game_created_at, &reason); } } } } } - fn maybe_requeue(&mut self, game_index: u64, retries: u32, reason: &str) { - if retries < MAX_RETRIES { - let new_retries = retries + 1; - warn!( - "Re-queuing game {} for retry {}/{} ({})", - game_index, new_retries, MAX_RETRIES, reason - ); - self.pending_games.push_back(PendingGame { - discovered_at: Instant::now(), - game_index, - retries: new_retries, - }); - } else { - error!( - "Game {} failed after {} retries ({}), giving up.", - game_index, MAX_RETRIES, reason - ); + /// Decide what to do with a failed game based on its `AttemptKind`: + /// + /// - **Primary with retries remaining**: re-queue to `pending_games` with linear backoff. + /// - **Primary with retries exhausted**: mark completed (so `last_contiguous` can advance), + /// move to `background_retries` with the first exponential wait. + /// - **Background**: quadruple `last_wait` on the existing entry for the next attempt. + fn maybe_requeue( + &mut self, + game_index: u64, + kind: AttemptKind, + game_created_at: SystemTime, + reason: &str, + ) { + match kind { + // Primary retry: re-queue with linear backoff (delay * 2 * retry_number). + AttemptKind::Primary { retries } if retries < self.max_retries => { + let new_retries = retries + 1; + let delay = self.initial_game_delay * 2 * new_retries; + warn!( + "Re-queuing game {} for retry {}/{} ({}) after {:?} delay", + game_index, new_retries, self.max_retries, reason, delay + ); + self.pending_games.push_front(PendingGame { + executable_at: Instant::now() + delay, + game_index, + kind: AttemptKind::Primary { retries: new_retries }, + }); + } + AttemptKind::Primary { .. } => { + // Primary retries exhausted: mark the game completed so last_contiguous can + // advance, and move it onto the background-retry queue for low-priority + // long-tail attempts. The first background wait is + // `initial_game_delay * 2 * max_retries * 4`, continuing the primary linear + // schedule scaled by 4x. `max_retries.max(1)` guards against a 0-length wait + // when max_retries is configured to 0. + let last_wait = self.initial_game_delay * 2 * self.max_retries.max(1) * 4; + let next_attempt_at = SystemTime::now() + last_wait; + warn!( + "Game {} exhausted {} primary retries ({}); moving to background queue with \ + first attempt in {:?}", + game_index, self.max_retries, reason, last_wait + ); + self.background_retries.push_back(BackgroundRetry { + game_index, + game_created_at, + next_attempt_at, + last_wait, + attempts: 0, + }); + self.mark_game_completed(game_index); + // mark_game_completed only saves when last_contiguous advances; force a save + // here so the new background entry is persisted regardless. + self.save_progress(); + } + AttemptKind::Background { attempts } => { + // A background attempt failed. Update the existing entry with a 4x-longer wait + // and bump the attempt counter. The game is already in the sequence tracker + // from the original primary-retry exhaustion, so we do not call + // mark_game_completed again. + let Some(entry) = + self.background_retries.iter_mut().find(|bg| bg.game_index == game_index) + else { + warn!( + "Background attempt {} for game {} failed ({}) but no background \ + record was found; dropping", + attempts, game_index, reason + ); + return; + }; + entry.last_wait *= 4; + entry.next_attempt_at = SystemTime::now() + entry.last_wait; + entry.attempts = attempts; + warn!( + "Background attempt {} for game {} failed ({}); next attempt in {:?}", + attempts, game_index, reason, entry.last_wait + ); + self.save_progress(); + } } } + /// Returns true if there are spare execution slots for new processes. fn can_spawn_new(&self, max_concurrent: usize) -> bool { self.running_processes.len() < max_concurrent } + /// Evict any background-retry entries whose game age (relative to L1 game creation time) + /// exceeds `background_retry_max_age`. Currently-running entries are left in place; they + /// will be considered on the next sweep after they finish. Returns the number evicted. + fn evict_aged_background_retries(&mut self) -> usize { + let now = SystemTime::now(); + let max_age = self.background_retry_max_age; + let running = &self.running_processes; + let before = self.background_retries.len(); + self.background_retries.retain(|bg| { + // Don't evict an entry whose process is currently running; let it finish. + if running.contains_key(&bg.game_index) { + return true; + } + let age = now.duration_since(bg.game_created_at).unwrap_or(Duration::ZERO); + if age > max_age { + warn!( + "Evicting background retry for game {} after {:?} (max age {:?}, \ + {} attempts)", + bg.game_index, age, max_age, bg.attempts + ); + false + } else { + true + } + }); + let evicted = before - self.background_retries.len(); + if evicted > 0 { + self.save_progress(); + } + evicted + } + + /// Graceful shutdown: kill all running cost-estimator processes and delete their log files + /// (incomplete logs are not useful for analysis). fn shutdown(&mut self) { info!("Shutting down: killing {} running processes", self.running_processes.len()); for (id, mut est) in self.running_processes.drain() { if let Err(e) = est.process.kill() { warn!("Failed to kill process for game {}: {}", id, e); } - if let Err(e) = fs::remove_file(&est.log_file) { - warn!("Failed to delete log file {}: {}", est.log_file.display(), e); + if let Err(e) = fs::remove_file(&est.log_file.path) { + warn!("Failed to delete log file {}: {}", est.log_file.path.display(), e); } } } + + /// Fetch game data, spawn a cost estimator, and insert into `running_processes`. + /// Queue-specific cleanup (on both `WrongGameType` and successful spawn) is performed + /// internally based on `kind`, so the call sites only have to pick a candidate and react + /// to the returned outcome. + async fn spawn_game( + &mut self, + game_index: u64, + kind: AttemptKind, + ctx: &SpawnContext<'_, P>, + ) -> Result { + let game_data = + match fetch_game_data(game_index, ctx.factory, ctx.l1_provider.clone()).await { + Ok(data) => data, + Err(FetchGameError::WrongGameType { game_index, game_type, expected }) => { + debug!( + "Skipping game at index {} (type {} != {})", + game_index, game_type, expected + ); + self.handle_wrong_game_type(game_index, kind); + return Ok(SpawnOutcome::WrongGameType); + } + Err(e) => { + warn!("Failed to fetch game data for index {}: {:#}. Retrying", game_index, e); + return Ok(SpawnOutcome::FetchFailed); + } + }; + + let log_file = + LogFile::new(ctx.logs_dir, game_data.game_index, game_data.game_address, kind); + let child = spawn_cost_estimator( + ctx.cost_estimator_binary_path, + ctx.batch_size, + ctx.env_file, + &log_file, + &game_data, + )?; + + let kind_descr = match kind { + AttemptKind::Primary { retries: 0 } => "primary".to_string(), + AttemptKind::Primary { retries } => format!("primary retry {}", retries), + AttemptKind::Background { attempts } => format!("background attempt {}", attempts), + }; + info!( + "Starting cost estimator [{}] for game at index {} address {} (blocks {}-{}, \ + created_at {:?})", + kind_descr, + game_data.game_index, + game_data.game_address, + game_data.start_block, + game_data.end_block, + game_data.created_at, + ); + + self.running_processes.insert( + game_data.game_index, + RunningEstimator { + started_at: Instant::now(), + process: child, + log_file, + block_range: game_data.block_range(), + kind, + game_created_at: game_data.created_at, + }, + ); + + // On-success queue-specific cleanup. Background entries stay in `background_retries` + // while running so `maybe_requeue` can update them on failure. + if matches!(kind, AttemptKind::Primary { .. }) { + self.pending_games.retain(|p| p.game_index != game_data.game_index); + } + + Ok(SpawnOutcome::Spawned) + } + + /// WrongGameType cleanup, dispatched by `kind`. + fn handle_wrong_game_type(&mut self, game_index: u64, kind: AttemptKind) { + match kind { + AttemptKind::Primary { .. } => { + self.pending_games.retain(|p| p.game_index != game_index); + self.mark_game_completed(game_index); + } + AttemptKind::Background { .. } => { + self.background_retries.retain(|bg| bg.game_index != game_index); + self.save_progress(); + } + } + } +} + +/// Write `data` to `path` atomically by writing to a temporary sibling file and renaming. +fn atomic_write(path: &Path, data: &[u8]) -> std::io::Result<()> { + let tmp_path = match path.file_name() { + Some(name) => { + let mut tmp_name = name.to_os_string(); + tmp_name.push(".tmp"); + path.with_file_name(tmp_name) + } + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "path has no file name", + )); + } + }; + fs::write(&tmp_path, data)?; + fs::rename(&tmp_path, path) } -/// Compute the median of a slice of f64 values. Returns if the length of the slice is below the -/// threshold. +/// Compute the median of `values`. Returns `None` if fewer than `threshold` entries are +/// available, preventing anomaly detection from triggering on insufficient data. fn median(values: &[f64], threshold: usize) -> Option { if values.len() < threshold { return None; @@ -437,13 +949,16 @@ enum FetchGameError { Other(#[from] anyhow::Error), } +/// Fetch on-chain metadata for a single game: type, address, L2 block range, creation time. +/// +/// Returns [`FetchGameError::WrongGameType`] if the game's type doesn't match [`GAME_TYPE`], +/// allowing callers to skip non-matching games without treating it as a transient failure. async fn fetch_game_data( - pending: &PendingGame, + game_index: u64, factory: &DisputeGameFactoryInstance

, l1_provider: P, ) -> Result { - let game_index = pending.game_index; - + // Look up game metadata from the factory contract. let game_info = factory .gameAtIndex(U256::from(game_index)) .call() @@ -457,6 +972,9 @@ async fn fetch_game_data( let game_address = game_info.proxy; + let created_at_secs = U256::from(game_info.timestamp).to::(); + let created_at = UNIX_EPOCH + Duration::from_secs(created_at_secs); + let game = OPSuccinctFaultDisputeGame::new(game_address, l1_provider); let l2_block_number = @@ -469,30 +987,39 @@ async fn fetch_game_data( .context("failed to get starting block number")? .to::(); - Ok(GameData { game_index, game_address, start_block, end_block: l2_block_number }) + Ok(GameData { game_index, game_address, start_block, end_block: l2_block_number, created_at }) } +/// Spawn a `cost-estimator` child process for the given game. +/// +/// The log file is pre-seeded with a header block containing the exact command and environment +/// variables, so that a failed run can be replayed manually via `rerun-cost-estimator.sh`. +/// Both stdout and stderr of the child are redirected into the log file. fn spawn_cost_estimator( - cost_estimator_binary_path: &PathBuf, + cost_estimator_binary_path: &Path, + batch_size: u64, env_file: &Path, - log_file: &PathBuf, + log_file: &LogFile, game_data: &GameData, ) -> Result { + // Cap batch size to the game's block range so small games run as a single chunk. + let effective_batch_size = std::cmp::min(batch_size, game_data.block_range()).to_string(); let args = [ "--start", &game_data.start_block.to_string(), "--end", &game_data.end_block.to_string(), "--batch-size", - &game_data.block_range().to_string(), + &effective_batch_size, "--env-file", env_file.to_str().unwrap(), + "--log-only", ]; let cmd = format!("{} {}", cost_estimator_binary_path.display(), args.join(" ")); // Write command and env to log file to facilitate easy re-running of the command. - let mut log_file_handle = File::create(log_file)?; + let mut log_file_handle = File::create(&log_file.path)?; writeln!(log_file_handle, "=== Cost Estimator Command ===")?; writeln!(log_file_handle, "{}", cmd)?; writeln!(log_file_handle, "=== Cost Estimator ENV ===")?; @@ -519,7 +1046,7 @@ fn spawn_cost_estimator( let stderr_file = log_file_handle.try_clone()?; info!("Running cost estimator: {}", cmd); - info!("Logging to: {}", log_file.display()); + info!("Logging to: {}", log_file.path.display()); let child = Command::new(cost_estimator_binary_path) .args(args) @@ -531,20 +1058,38 @@ fn spawn_cost_estimator( Ok(child) } -struct LogFile; +/// Manages the lifecycle of a per-game log file. +/// +/// Log files follow the naming convention: +/// `cost-estimator--

[-retry][-bg-retry].log` +/// +/// On completion, [`mark_complete`](Self::mark_complete) appends `-success` or `-failure` +/// before the `.log` extension. +struct LogFile { + path: PathBuf, +} impl LogFile { - fn path(logs_dir: &Path, game_index: u64, game_address: Address, retries: u32) -> PathBuf { - if retries > 0 { - logs_dir.join(format!( + /// Build the log file path from the game index, address, and attempt kind. + fn new(logs_dir: &Path, game_index: u64, game_address: Address, kind: AttemptKind) -> Self { + let path = match kind { + AttemptKind::Primary { retries: 0 } => { + logs_dir.join(format!("cost-estimator-{}-{}.log", game_index, game_address)) + } + AttemptKind::Primary { retries } => logs_dir.join(format!( "cost-estimator-{}-{}-retry{}.log", game_index, game_address, retries - )) - } else { - logs_dir.join(format!("cost-estimator-{}-{}.log", game_index, game_address)) - } + )), + AttemptKind::Background { attempts } => logs_dir.join(format!( + "cost-estimator-{}-{}-bg-retry{}.log", + game_index, game_address, attempts + )), + }; + Self { path } } + /// Parse the game index from a log filename. Used by log-space enforcement to identify + /// which game a log belongs to (so logs for running games are not deleted). fn extract_game_index(path: &Path) -> Option { let filename = path.file_name()?.to_str()?; let stripped = filename.strip_prefix("cost-estimator-")?; @@ -552,19 +1097,23 @@ impl LogFile { stripped[..dash_pos].parse().ok() } - fn mark_complete(path: &Path, success: bool) { - let Some(filename) = path.file_name().and_then(|f| f.to_str()) else { + /// Rename the log file to include a `-success` or `-failure` suffix before `.log`. + fn mark_complete(&mut self, success: bool) { + let Some(filename) = self.path.file_name().and_then(|f| f.to_str()) else { return; }; let Some(stem) = filename.strip_suffix(".log") else { return; }; let suffix = if success { "success" } else { "failure" }; - let new_path = path.with_file_name(format!("{}-{}.log", stem, suffix)); - if let Err(e) = fs::rename(path, &new_path) { - warn!("Failed to rename log {} to {}: {}", path.display(), new_path.display(), e); + let new_path = self.path.with_file_name(format!("{}-{}.log", stem, suffix)); + if let Err(e) = fs::rename(&self.path, &new_path) { + warn!("Failed to rename log {} to {}: {}", self.path.display(), new_path.display(), e); + } else { + self.path = new_path; } } + /// List all log files with their sizes and game indexes, for log-space enforcement. fn sizes(logs_dir: &Path) -> Result> { let mut log_files: Vec<(PathBuf, u64, u64)> = Vec::new(); for entry in fs::read_dir(logs_dir)? { @@ -583,6 +1132,8 @@ impl LogFile { } } +/// Delete the oldest log files (by game index) until the total log directory size is within +/// `max_size_bytes`. Logs for currently running games are never deleted. fn enforce_log_space_limit( max_size_bytes: u64, running_game_indices: &HashMap, @@ -638,10 +1189,27 @@ fn enforce_log_space_limit( } } +/// Entry point. Parses the top-level subcommand and dispatches to the matching handler. +/// +/// Logging setup is deferred to each subcommand: the daemon configures `sp1_sdk`'s logger so +/// that operational logs go to stderr, while one-shot tools like `gen-background-retry` +/// deliberately leave logging unconfigured to keep stdout clean for machine-readable output. #[tokio::main] async fn main() -> Result<()> { - let args = GameMonitorArgs::parse(); + let cli = Cli::parse(); + match cli.command { + CliCommand::Run(args) => run(args).await, + CliCommand::GenBackgroundRetry(args) => gen_background_retry(args).await, + } +} +/// Run the long-running game monitor daemon. +/// +/// Loads the env file, initialises providers, restores any persisted progress from disk, then +/// enters the poll loop until SIGINT or SIGTERM is received. On shutdown all in-flight +/// cost-estimator processes are killed (their logs are also removed since a partial run is +/// not useful for analysis). +async fn run(args: RunArgs) -> Result<()> { // Load environment variables dotenv::from_path(&args.env_file).ok(); sp1_sdk::utils::setup_logger(); @@ -669,35 +1237,71 @@ async fn main() -> Result<()> { let factory = DisputeGameFactoryInstance::new(dispute_game_factory_address, l1_provider.clone()); - // If start_index is unset start from the most recent game, or game at index 0 if there are no - // games. Otherwise use the start_index. - let next_game_index = match args.start_index { - Some(index) => index, - None => { - let initial_game_count = factory.gameCount().call().await?.to::(); - match initial_game_count { - 0 => 0, - n => n - 1, - } + let progress_file = + args.progress_file.clone().unwrap_or_else(|| args.logs_dir.join("progress.json")); + + let persisted_progress = MonitorState::load_progress(&progress_file); + + // Determine starting game index: explicit flag > persisted progress > latest on-chain. + let next_game_index = if let Some(index) = args.start_index { + index + } else if let Some(progress) = persisted_progress.as_ref() { + let next = progress.last_contiguous + 1; + info!("Resuming from persisted last contiguous {}", progress.last_contiguous); + next + } else { + let initial_game_count = + factory.gameCount().call().block(BlockId::finalized()).await?.to::(); + match initial_game_count { + 0 => 0, + n => n - 1, } }; + + let background_retries: VecDeque = + persisted_progress.map(|p| p.background_retries.into_iter().collect()).unwrap_or_default(); + info!("Loaded {} persisted background retries", background_retries.len()); + + let delay = Duration::from_secs(args.delay); + let history_file = args.history_file.clone().unwrap_or_else(|| args.logs_dir.join("completion_history.json")); let mut state = MonitorState::new( next_game_index, + delay, args.max_process_duration_secs, args.max_history_length, + args.cost_estimator_retries, history_file, + progress_file, + background_retries, + Duration::from_secs(args.background_retry_max_age_secs), ); + // Run an eviction sweep up front to discard anything that aged out while the process was + // down. + state.evict_aged_background_retries(); + let poll_interval = Duration::from_secs(args.poll_interval); - let delay = Duration::from_secs(args.delay); let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?; let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?; + let spawn_ctx = SpawnContext { + factory: &factory, + l1_provider: &l1_provider, + cost_estimator_binary_path: args.cost_estimator_binary_path.as_path(), + batch_size: args.batch_size, + env_file: args.env_file.as_path(), + logs_dir: args.logs_dir.as_path(), + }; + + // ── Main poll loop ────────────────────────────────────────────────────── + // Each iteration: cleanup -> evict -> enforce log limits -> spawn primary -> spawn + // background -> discover new games. The loop breaks on SIGTERM/SIGINT for graceful + // shutdown. 'outer: loop { tokio::select! { _ = sleep(poll_interval) => {} @@ -711,8 +1315,13 @@ async fn main() -> Result<()> { } } + // Phase 1: handle completed/failed/runaway processes. state.cleanup_finished_processes(); + // Phase 2: drop background retries that have exceeded their max age. + state.evict_aged_background_retries(); + + // Phase 3: delete oldest log files if the directory exceeds the size limit. if args.max_logs_size_mb > 0 { enforce_log_space_limit( args.max_logs_size_mb * 1024 * 1024, @@ -722,82 +1331,62 @@ async fn main() -> Result<()> { } info!( - "Running: {}/{}, Pending: {}", + "Running: {}/{}, Pending: {}, Background pending: {}", state.running_processes.len(), args.max_concurrent, - state.pending_games.len() + state.pending_games.len(), + state.background_retries.len(), ); - // Process pending games whose delay has elapsed: fetch game info and spawn. - while let Some(pending) = state.pending_games.front() { - if !state.can_spawn_new(args.max_concurrent) { - break; - } - // Retries skip the discovery delay. - if pending.retries == 0 && pending.discovered_at.elapsed() < delay { - break; + // Phase 4: spawn pending primary games whose delay has elapsed, up to max_concurrent. + let current_time = Instant::now(); + while state.can_spawn_new(args.max_concurrent) { + let Some((game_index, kind)) = state + .pending_games + .iter() + .find(|p| p.executable_at <= current_time) + .map(|p| (p.game_index, p.kind)) + else { + break; // nothing ready in the queue + }; + if state.spawn_game(game_index, kind, &spawn_ctx).await? == SpawnOutcome::FetchFailed { + continue 'outer; } + } - let game_data = match fetch_game_data(pending, &factory, l1_provider.clone()).await { - Ok(data) => data, - Err(FetchGameError::WrongGameType { game_index, game_type, expected }) => { - debug!( - "Skipping game at index {} (type {} != {})", - game_index, game_type, expected - ); - state.pending_games.pop_front(); - continue; - } - Err(e) => { - warn!( - "Failed to fetch game data for index {}: {:#}. Retrying", - pending.game_index, e - ); + // Phase 5: spawn background retries using spare slots. Only runs when no primary + // entries are ready, so primary scheduling always wins on contention. Entries stay in + // background_retries while running so maybe_requeue can update them on failure; + // cleanup_finished_processes removes them on success. + let primary_has_ready = + state.pending_games.iter().any(|p| p.executable_at <= Instant::now()); + if !primary_has_ready { + while state.can_spawn_new(args.max_concurrent) { + let now_sys = SystemTime::now(); + let Some((game_index, kind)) = state + .background_retries + .iter() + .find(|bg| { + bg.next_attempt_at <= now_sys && + !state.running_processes.contains_key(&bg.game_index) + }) + .map(|bg| { + (bg.game_index, AttemptKind::Background { attempts: bg.attempts + 1 }) + }) + else { + break; + }; + if state.spawn_game(game_index, kind, &spawn_ctx).await? == + SpawnOutcome::FetchFailed + { continue 'outer; } - }; - - let pending = state.pending_games.pop_front().unwrap(); - - info!( - "Game {} covers L2 blocks {} to {}", - game_data.game_address, game_data.start_block, game_data.end_block - ); - - let log_file = LogFile::path( - &args.logs_dir, - game_data.game_index, - game_data.game_address, - pending.retries, - ); - - let child = spawn_cost_estimator( - &args.cost_estimator_binary_path, - &args.env_file, - &log_file, - &game_data, - )?; - info!( - "Started cost estimator for game {} at index {} (blocks {}-{})", - game_data.game_address, - game_data.game_index, - game_data.start_block, - game_data.end_block - ); - state.running_processes.insert( - game_data.game_index, - RunningEstimator { - started_at: Instant::now(), - process: child, - log_file, - block_range: game_data.block_range(), - retries: pending.retries, - }, - ); + } } - // Discover new game indices and queue them for deferred processing. - let current_game_count = match factory.gameCount().call().await { + // Phase 6: discover new game indexes from the factory and queue them with a delay. + let current_game_count = match factory.gameCount().call().block(BlockId::finalized()).await + { Ok(count) => count.to::(), Err(e) => { warn!( @@ -813,12 +1402,12 @@ async fn main() -> Result<()> { info!( "Discovered new game at index {}, queuing for processing after {:?} delay", - game_index, delay + game_index, state.initial_game_delay ); - state.pending_games.push_back(PendingGame { - discovered_at: Instant::now(), + state.pending_games.push_front(PendingGame { + executable_at: Instant::now() + state.initial_game_delay, game_index, - retries: 0, + kind: AttemptKind::Primary { retries: 0 }, }); } } @@ -826,3 +1415,72 @@ async fn main() -> Result<()> { state.shutdown(); Ok(()) } + +/// Build a [`BackgroundRetry`] for `game_index` by fetching its on-chain creation timestamp. +/// +/// Used by the [`gen_background_retry`] subcommand. Errors propagate from +/// [`fetch_game_data`] including the `WrongGameType` case, so that operators see a clear +/// failure if any of the requested indexes does not correspond to an +/// `OPSuccinctFaultDisputeGame` of the expected `GAME_TYPE`. +/// +/// `next_attempt_at` is set to `UNIX_EPOCH` so the entry is immediately eligible the next +/// time the daemon starts. `attempts` is initialised to zero so the daemon's scheduling +/// treats the first run as background attempt 1. +async fn make_background_retry( + game_index: u64, + last_wait: Duration, + factory: &DisputeGameFactoryInstance

, + l1_provider: P, +) -> Result { + let game = fetch_game_data(game_index, factory, l1_provider) + .await + .with_context(|| format!("failed to fetch game data for index {}", game_index))?; + Ok(BackgroundRetry { + game_index, + game_created_at: game.created_at, + next_attempt_at: UNIX_EPOCH, + last_wait, + attempts: 0, + }) +} + +/// Load an existing `progress.json`, generate [`BackgroundRetry`] entries for the supplied +/// game indexes, append them to the existing state, and print the complete [`ProgressState`] +/// as pretty-printed JSON on stdout. +/// +/// Example usage (daemon must be stopped first): +/// +/// ```sh +/// game-monitor gen-background-retry \ +/// --progress-file /logs/progress.json \ +/// --last-wait-secs 4800 \ +/// 12345 12346 > /logs/progress.json.new +/// # inspect the output, then replace the original: +/// mv /logs/progress.json.new /logs/progress.json +/// ``` +/// +/// All log output is suppressed (no logger is initialised) so that stdout contains only the +/// JSON payload. Errors are returned to `main` and printed via `anyhow` on stderr. +async fn gen_background_retry(args: GenBackgroundRetryArgs) -> Result<()> { + let data = fs::read_to_string(&args.progress_file) + .with_context(|| format!("failed to read {}", args.progress_file.display()))?; + let mut state: ProgressState = serde_json::from_str(&data) + .with_context(|| format!("failed to parse {}", args.progress_file.display()))?; + + let l1_url = args.l1_rpc.parse().context("invalid L1 RPC URL")?; + let l1_provider = ProviderBuilder::new().connect_http(l1_url); + let factory = + DisputeGameFactoryInstance::new(args.dispute_game_factory_address, l1_provider.clone()); + let last_wait = Duration::from_secs(args.last_wait_secs); + + for game_index in args.game_indexes { + let entry = + make_background_retry(game_index, last_wait, &factory, l1_provider.clone()).await?; + state.background_retries.push(entry); + } + + let json = serde_json::to_string_pretty(&state) + .context("failed to serialize progress state to JSON")?; + println!("{}", json); + Ok(()) +} diff --git a/scripts/utils/fetch-game-logs.sh b/scripts/utils/fetch-game-logs.sh new file mode 100755 index 00000000..66256b83 --- /dev/null +++ b/scripts/utils/fetch-game-logs.sh @@ -0,0 +1,60 @@ +#!/bin/sh +# fetch-game-logs.sh -- tar matching log files for the given game indexes to +# stdout. Intended to be run via kubectl exec inside the game-monitor pod and +# piped into tar on the local machine. +# +# Usage: fetch-game-logs.sh GAME_INDEX [GAME_INDEX ...] +# +# Example: +# kubectl exec succinct-game-monitor-0 -- ./fetch-game-logs.sh 25933 25934 \ +# | tar xf - -C ./fetched-logs +# +# Override LOGS_DIR (default /logs) when running outside the pod. + +set -eu + +LOGS_DIR=${LOGS_DIR:-/logs} + +if [ $# -eq 0 ]; then + echo "fetch-game-logs: missing required GAME_INDEX argument(s)" >&2 + echo "usage: fetch-game-logs.sh GAME_INDEX [GAME_INDEX ...]" >&2 + exit 2 +fi + +if [ ! -d "$LOGS_DIR" ]; then + echo "fetch-game-logs: logs directory not found: $LOGS_DIR" >&2 + exit 1 +fi + +files="" + +for GAME_INDEX in "$@"; do + # Reject non-numeric indices. + case "$GAME_INDEX" in + *[!0-9]*) + echo "fetch-game-logs: GAME_INDEX must be numeric, got: $GAME_INDEX" >&2 + exit 2 + ;; + esac + + # Use a glob to find matching files, avoiding ls | grep (SC2010). + found=0 + for path in "$LOGS_DIR"/cost-estimator-"${GAME_INDEX}"-*.log; do + [ -f "$path" ] || continue + files="$files $(basename "$path")" + found=1 + done + + if [ "$found" -eq 0 ]; then + echo "fetch-game-logs: no logs found for game $GAME_INDEX" >&2 + continue + fi +done + +if [ -z "$files" ]; then + echo "fetch-game-logs: no log files found for any of the given indexes" >&2 + exit 1 +fi + +# shellcheck disable=SC2086 +exec tar cf - -C "$LOGS_DIR" $files diff --git a/scripts/utils/rerun-cost-estimator.sh b/scripts/utils/rerun-cost-estimator.sh new file mode 100755 index 00000000..3e98ccd1 --- /dev/null +++ b/scripts/utils/rerun-cost-estimator.sh @@ -0,0 +1,128 @@ +#!/bin/sh +# rerun-cost-estimator.sh -- replay a cost-estimator run from the header +# game-monitor records at the top of every cost-estimator-*.log file initiated +# by the game-monitor. Intended for use within the game-monitor pod. +# +# Usage: rerun-cost-estimator.sh GAME_INDEX +# Picks the most recent cost-estimator--*.log under $LOGS_DIR. +# +# Locates `cost-estimator` via `which`, writes the env block verbatim to +# .env_ in $PWD, and re-execs the binary with the recorded args +# (with --env-file rewritten to .env_). +# +# Override LOGS_DIR (default /logs) when running outside the pod. + +set -eu +umask 077 + +LOGS_DIR=${LOGS_DIR:-/logs} +GAME_INDEX=${1:-} + +if [ -z "$GAME_INDEX" ]; then + echo "rerun-cost-estimator: missing required GAME_INDEX argument" >&2 + echo "usage: rerun-cost-estimator.sh GAME_INDEX" >&2 + exit 2 +fi + +# Reject non-numeric indices so they cannot smuggle regex/filename metachars. +case "$GAME_INDEX" in + *[!0-9]*) + echo "rerun-cost-estimator: GAME_INDEX must be numeric, got: $GAME_INDEX" >&2 + exit 2 + ;; +esac + +if [ ! -d "$LOGS_DIR" ]; then + echo "rerun-cost-estimator: logs directory not found: $LOGS_DIR" >&2 + exit 1 +fi + +# Anchor the index as a complete token so 312 cannot match 31200. +pattern="^cost-estimator-${GAME_INDEX}-.*\\.log\$" + +# `ls -t` lists newest first, so retryN logs naturally win over the initial +# attempt for the same game index. +LOG_FILE=$( + ls -1t -- "$LOGS_DIR" 2>/dev/null \ + | grep -E "$pattern" \ + | head -n 1 \ + || true +) + +if [ -z "$LOG_FILE" ]; then + echo "rerun-cost-estimator: no cost-estimator-${GAME_INDEX}-*.log under $LOGS_DIR" >&2 + exit 1 +fi + +LOG_PATH="$LOGS_DIR/$LOG_FILE" + +echo "rerun-cost-estimator: log = $LOG_PATH" >&2 +echo "rerun-cost-estimator: index = $GAME_INDEX" >&2 + +BIN=$(which cost-estimator 2>/dev/null || true) +if [ -z "$BIN" ] || [ ! -x "$BIN" ]; then + echo "rerun-cost-estimator: cost-estimator binary not found in PATH" >&2 + exit 1 +fi +echo "rerun-cost-estimator: bin = $BIN" >&2 + +# Header layout written by game_monitor.rs: +# === Cost Estimator Command === +# +# === Cost Estimator ENV === +# KEY=VALUE... +# === Output === +# + +CMD_LINE=$( + awk ' + /^=== Cost Estimator Command ===[[:space:]]*$/ { in_cmd = 1; next } + /^=== Cost Estimator ENV ===[[:space:]]*$/ { in_cmd = 0 } + in_cmd && NF { print; exit } + ' "$LOG_PATH" +) + +if [ -z "$CMD_LINE" ]; then + echo "rerun-cost-estimator: no Cost Estimator Command block in $LOG_PATH" >&2 + exit 1 +fi + +ENV_FILE=".env_${GAME_INDEX}" + +awk ' + /^=== Cost Estimator ENV ===[[:space:]]*$/ { in_env = 1; next } + /^=== Output ===[[:space:]]*$/ { in_env = 0; exit } + in_env { print } +' "$LOG_PATH" > "$ENV_FILE" + +if [ ! -s "$ENV_FILE" ]; then + echo "rerun-cost-estimator: Cost Estimator ENV block was empty in $LOG_PATH" >&2 + rm -f "$ENV_FILE" + exit 1 +fi + +echo "rerun-cost-estimator: env = $ENV_FILE" >&2 + +# Drop the leading binary token (we use $BIN from `which`), then retarget +# --env-file at the new env file. +ARGS=$( + printf '%s\n' "$CMD_LINE" \ + | sed \ + -e 's/^[[:space:]]\{1,\}//' \ + -e 's/^[^[:space:]]\{1,\}[[:space:]]\{1,\}//' \ + -e 's|--env-file[[:space:]]\{1,\}[^[:space:]][^[:space:]]*|--env-file '"$ENV_FILE"'|' +) + +# Defensive: if the recorded command somehow had no --env-file, append one. +case " $ARGS " in + *" --env-file "*) ;; + *) ARGS="$ARGS --env-file $ENV_FILE" ;; +esac + +echo "rerun-cost-estimator: exec = $BIN $ARGS" >&2 + +# Word-splitting on $ARGS is deliberate -- the recorded args are space- +# separated tokens with no quoting (game_monitor.rs uses args.join(" ")). +# Do NOT quote $ARGS or the script breaks. +# shellcheck disable=SC2086 +exec "$BIN" $ARGS diff --git a/utils/common/Cargo.toml b/utils/common/Cargo.toml new file mode 100644 index 00000000..fae0b696 --- /dev/null +++ b/utils/common/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "op-succinct-common" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true diff --git a/utils/common/src/lib.rs b/utils/common/src/lib.rs new file mode 100644 index 00000000..ec7565e0 --- /dev/null +++ b/utils/common/src/lib.rs @@ -0,0 +1,3 @@ +mod sequence_tracker; + +pub use sequence_tracker::SequenceTracker; diff --git a/utils/common/src/sequence_tracker.rs b/utils/common/src/sequence_tracker.rs new file mode 100644 index 00000000..34e5fe01 --- /dev/null +++ b/utils/common/src/sequence_tracker.rs @@ -0,0 +1,125 @@ +use std::collections::HashSet; + +/// Tracks the end of a contiguous sequence of indices. +/// +/// As indices are added (potentially out of order), the tracker eagerly advances +/// its `end` value through any contiguous run starting from `end + 1`. +pub struct SequenceTracker { + end: u64, + pending: HashSet, +} + +impl SequenceTracker { + /// Creates a new tracker with `end` set to the given value. + pub fn new(end: u64) -> Self { + Self { end, pending: HashSet::new() } + } + + /// Returns the current end of the contiguous sequence. + pub fn end(&self) -> u64 { + self.end + } + + /// Adds an index and eagerly advances `end` through any contiguous run. + pub fn add(&mut self, index: u64) { + if index <= self.end { + return; + } + self.pending.insert(index); + let mut check_from = self.end + 1; + while self.pending.remove(&check_from) { + self.end = check_from; + check_from += 1; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_no_additions() { + let tracker = SequenceTracker::new(1); + assert_eq!(tracker.end(), 1); + } + + #[test] + fn test_single_next() { + let mut tracker = SequenceTracker::new(0); + tracker.add(1); + assert_eq!(tracker.end(), 1); + } + + #[test] + fn test_single_not_next() { + let mut tracker = SequenceTracker::new(1); + tracker.add(3); + assert_eq!(tracker.end(), 1); + } + + #[test] + fn test_consecutive() { + let mut tracker = SequenceTracker::new(0); + tracker.add(1); + tracker.add(2); + tracker.add(3); + tracker.add(4); + assert_eq!(tracker.end(), 4); + } + + #[test] + fn test_out_of_order() { + let mut tracker = SequenceTracker::new(0); + tracker.add(3); + tracker.add(1); + tracker.add(2); + tracker.add(5); + assert_eq!(tracker.end(), 3); + + // Adding the missing index advances through the gap. + tracker.add(4); + assert_eq!(tracker.end(), 5); + } + + #[test] + fn test_duplicates() { + let mut tracker = SequenceTracker::new(0); + tracker.add(1); + tracker.add(1); + tracker.add(2); + tracker.add(2); + assert_eq!(tracker.end(), 2); + } + + #[test] + fn test_pending_cleared() { + let mut tracker = SequenceTracker::new(0); + tracker.add(1); + tracker.add(2); + tracker.add(4); + tracker.add(5); + assert_eq!(tracker.end(), 2); + // 4 and 5 are still pending + assert!(tracker.pending.contains(&4)); + assert!(tracker.pending.contains(&5)); + + // Fill the gap + tracker.add(3); + assert_eq!(tracker.end(), 5); + assert!(tracker.pending.is_empty()); + } + + #[test] + fn test_add_at_or_below_end() { + let mut tracker = SequenceTracker::new(5); + tracker.add(3); + tracker.add(5); + assert_eq!(tracker.end(), 5); + // Indices at or below `end` are dropped, not stashed in `pending`. + assert!(tracker.pending.is_empty()); + tracker.add(6); + assert_eq!(tracker.end(), 6); + assert!(tracker.pending.is_empty()); + } +}