diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ba9fd0d57..7362b9e69 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -298,6 +298,22 @@ Current posture (as of Phase 9 entry, SHA `0433065c7`): The `build.rs` / proc-macro / `macro_rules!` / codegen / env-var per-class contracts, the per-crate registry, the env-var registry, the workspace inventory script (`scripts/dev/build_codegen_audit.sh`), and the per-phase decisions log live in [`docs/architecture/code-quality/build_codegen_policy.md`](docs/architecture/code-quality/build_codegen_policy.md). +## Concurrency policy + +UFFS enforces a strict task-ownership, lock-discipline, channel-backpressure, timeout-coverage, and blocking-IO posture in production async code. Three workspace Clippy lints at `deny` cover the std-side lock-across-await family (`await_holding_lock`, `await_holding_refcell_ref`, `await_holding_invalid_type`); the rest is enforced by `scripts/dev/concurrency_audit.sh` + a per-site annotation contract. + +The one-line rule: **every `tokio::spawn` declares its owner / shutdown / errors / cancellation; every async lock guard is dropped before the next `.await`; every channel is bounded with documented capacity OR unbounded with a documented producer-rate ceiling; every cross-process / cross-thread / cross-network await has a timeout OR is justified as a cooperatively-cancelled forever-loop; every `std::fs::*` / `std::thread::sleep` inside an `async fn` is wrapped in `spawn_blocking` / `block_in_place` OR is a sync helper called only from sync contexts.** + +Five dimensions, each with a taxonomy contributors quote inline: + +- **Task ownership** (`T1` named-constructor / `T2` inline-spawn / `T3` fire-and-forget / `T4` test-only) — every prod spawn site documents the four facets above. +- **Lock discipline** (`L1`-`L5` patterns; `L6` lock-across-await is forbidden) — three Clippy `await_holding_*` lints at `deny`. +- **Channel discipline** (`C1` bounded / `C2` broadcast / `C3` oneshot / `C4` watch / `C5` unbounded-with-ceiling; `C6` undocumented unbounded is forbidden). +- **Timeout policy** (`W1` named const / `W2` env-overridable / `W3` cooperatively-cancelled forever-loop / `W4` inline literal; `W5` unbounded cross-process await is forbidden). +- **Blocking-IO rule** (`B1` `spawn_blocking` / `B2` `block_in_place` / `B3` sync helper / `B4` startup/Drop/CLI one-shot; `B5` unbounded sync I/O on runtime worker is forbidden). + +Full taxonomy, per-site annotation templates, the workspace inventory script (`scripts/dev/concurrency_audit.sh`), per-crate posture matrix, and the per-phase audit trail live in [`docs/architecture/code-quality/concurrency_policy.md`](docs/architecture/code-quality/concurrency_policy.md). + ## Docs map - Root overview: `README.md` diff --git a/Cargo.toml b/Cargo.toml index 0a1eb06c1..197bc0dd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -337,6 +337,18 @@ multiple_unsafe_ops_per_block = "deny" # Limit unsafe operations unnecessary_safety_doc = "deny" # Accurate safety docs unnecessary_safety_comment = "deny" # Accurate safety comments +# ───── Phase 10 concurrency discipline lints — see concurrency_policy.md §1 ───── +# The `await_holding_*` family lives in clippy's `suspicious` group, which +# defaults to `warn`. We pin all three at `deny` explicitly so the policy +# doc's enforcement claim is literally true (not "effectively at deny via +# `-D warnings` in CI") and survives any future tightening of the workspace +# `--deny warnings` shape. Triggering one is **never** acceptable in prod +# code — the canonical fix is the snapshot-then-await pattern documented in +# `concurrency_policy.md §2.2`. +await_holding_lock = "deny" # No std::sync::Mutex/RwLock guard across .await +await_holding_refcell_ref = "deny" # No RefCell::borrow{,_mut}() guard across .await +await_holding_invalid_type = "deny" # No Rc / Cell across .await (Send violation) + # ───── DENY level lints — safety, correctness, discipline ───── # These MUST be clean in production code. clippy.toml relaxes test code # automatically via allow-*-in-tests settings. diff --git a/crates/uffs-client/src/lib.rs b/crates/uffs-client/src/lib.rs index 90e82accb..9dbe54009 100644 --- a/crates/uffs-client/src/lib.rs +++ b/crates/uffs-client/src/lib.rs @@ -6,6 +6,25 @@ //! All surfaces (CLI, TUI, GUI, MCP) use this crate to communicate with //! the daemon. It handles auto-start, connection, keepalive, and reconnect. //! +//! # Concurrency +//! +//! Hybrid runtime model: +//! +//! * [`connect::UffsClient`] (async; feature `async` default-on) — runs on the +//! caller's tokio runtime. Per-RPC `read_line` ceiling 300 s +//! (`Duration::from_mins(5)`; hard-coded). Notification channel is +//! `mpsc::unbounded_channel()` rate-bounded by the daemon's +//! `broadcast::Sender` capacity (Phase 10d C5 verdict). +//! * [`connect_sync::UffsClientSync`] (sync) — synchronous I/O on the caller +//! thread. Per-RPC deadline 60 s default, env-overridable via +//! `UFFS_CLIENT_TIMEOUT_SECS`; Windows path uses +//! `windows_deadline::WindowsDeadlineGuard` (a `CancelSynchronousIo` +//! watchdog, `#[cfg(windows)]`-only) to enforce the deadline on blocking pipe +//! I/O. +//! +//! See `docs/architecture/code-quality/concurrency_policy.md` for the +//! workspace contract. +//! //! # Example //! //! ```rust,ignore diff --git a/crates/uffs-daemon/src/lib.rs b/crates/uffs-daemon/src/lib.rs index 4d0413f8b..56fdd7003 100644 --- a/crates/uffs-daemon/src/lib.rs +++ b/crates/uffs-daemon/src/lib.rs @@ -7,15 +7,11 @@ //! both from the standalone `uffs-daemon` binary and from the embedded //! `uffs daemon run` subcommand in the CLI. //! -//! Exception: `file_size_policy` allows this file to exceed 800 LOC. -//! Rationale: `run_daemon` plus the cohesive cluster of background- -//! task spawners (`spawn_load_task`, `spawn_ipc_servers`, -//! `spawn_stats_heartbeat`, `spawn_idle_demote_controller`, -//! `spawn_journal_loops_for_warm_shards`, `spawn_pressure_subscriber`) -//! form the daemon's startup graph; splitting the controllers -//! across sibling modules fragments the shared `DaemonConfig` / -//! `EventSender` wiring and obscures the parent-task lifetime -//! relationships between them. +//! Setup helpers + shutdown coordination live in sibling modules +//! ([`log_init`], `startup`, `shutdown` — the latter two are +//! crate-private); the orchestrator [`run_daemon`] and the `spawn_*` +//! cluster live here so the parent-task lifetime relationships stay +//! cohesive in one file. //! //! # Environment //! @@ -37,6 +33,32 @@ //! | `UFFS_USN_REFRESH_INTERVAL_SECS` | `int` (seconds) | `300` (5 min) | USN journal refresh interval override (via `USN_REFRESH_INTERVAL_ENV` const indirection). INTERNAL semver class. | //! | `UFFS_SEARCH_MAX_CONCURRENCY` | `int` (search permits) | auto: `max(2, cpus × 26 / (drives × 10))` | Overrides the auto-tuned search-permit target for `(cpus, drives)` topology (via `index::DriveIndex::SEARCH_CONCURRENCY_ENV` const indirection). INTERNAL semver class. | //! | `XDG_RUNTIME_DIR` | `path` | (XDG: `/run/user/$UID`) | Linux daemon-socket location. STANDARD semver class. | +//! +//! # Concurrency +//! +//! Runs on `#[tokio::main]` (default multi-threaded runtime). The +//! daemon's startup graph spawns a fixed set of long-lived tasks via +//! the named `spawn_*` constructors above: +//! +//! * `spawn_load_task` — drive-load orchestration with per-drive timeout +//! (`IndexManager::DRIVE_LOAD_TIMEOUT`). +//! * `spawn_ipc_servers` — Unix-socket accept loop + Windows `AF_UNIX` bridge; +//! per-connection idle timeout (`IDLE_CONNECTION_SECS`). +//! * `spawn_stats_heartbeat` — periodic `DaemonStats` snapshot to subscribed +//! clients. +//! * `spawn_idle_demote_controller` — memory-pressure-driven shard-demote +//! signal source. +//! * `spawn_journal_loops_for_warm_shards` — per-shard USN journal loops, each +//! cooperatively cancelled via a dedicated `watch::Sender`. +//! * `spawn_pressure_subscriber` — listens to OS memory-pressure events and +//! drives the demote controller. +//! +//! All shutdown coordination flows through the daemon's top-level +//! `LifecycleHandle` (`watch::Sender` broadcast + force-exit +//! watchdog). See the crate-private `lifecycle` module for the full +//! ownership diagram and +//! `docs/architecture/code-quality/concurrency_policy.md` for the +//! workspace contract. // Enable unstable Windows Unix domain socket support (Windows 10 1803+). #![cfg_attr(windows, feature(windows_unix_domain_sockets))] @@ -91,99 +113,16 @@ mod runtime_orphans; /// Process-level memory and runtime telemetry. pub(crate) mod telemetry; -/// Default log file location: `/uffs/uffsd.log`. -/// -/// Falls back to `./uffsd.log` if the platform data directory -/// cannot be determined. -#[must_use] -pub(crate) fn default_log_file() -> PathBuf { - dirs_next::data_local_dir().map_or_else( - || PathBuf::from("uffsd.log"), - |dir| dir.join("uffs").join("uffsd.log"), - ) -} - -/// Initialise tracing for the daemon process. -/// -/// * `log_file = Some(path)` — write to that file (append mode). A path of -/// `"-"` or empty string uses `default_log_file`. -/// * `log_file = None` **and** the effective log level is `debug` or `trace` — -/// automatically write to `default_log_file` so that diagnostic output is -/// never lost to `/dev/null`. -/// * `log_file = None` with a higher level — write to stdout. -/// -/// Returns a guard that **must** be held until the daemon exits — -/// dropping it flushes the non-blocking writer. -#[must_use] -pub fn init_tracing( - log_spec: &str, - log_file: Option<&std::path::Path>, -) -> Option { - let filter = tracing_subscriber::EnvFilter::try_new(log_spec) - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); +/// Tracing-subscriber bootstrap (re-exports [`init_tracing`] as part +/// of the daemon library's public API). +pub mod log_init; +/// Graceful-shutdown + force-exit watchdog. +mod shutdown; +/// Pre-spawn startup helpers (panic-hook install, data-source +/// gathering, lifecycle bootstrap). +mod startup; - // Decide whether to use a file writer. - let is_verbose = { - let lower = log_spec.to_ascii_lowercase(); - lower.contains("debug") || lower.contains("trace") - }; - let effective_file: Option = match log_file { - Some(path) => { - let resolved = if path.as_os_str().is_empty() || path == std::path::Path::new("-") { - default_log_file() - } else { - path.to_path_buf() - }; - Some(resolved) - } - None if is_verbose => Some(default_log_file()), - None => None, - }; - - if let Some(resolved) = effective_file { - // Compute a *safe* parent directory. - // - // `PathBuf::from("uffsd.log").parent()` returns `Some(Path::new(""))`, - // not `None` — so the defensive `unwrap_or_else(|| Path::new("."))` - // below used to never fire for a relative file name, and - // `tracing_appender::rolling::never("", "uffsd.log")` would propagate - // the empty path through `create_dir_all("")`, which errors on - // Windows ("The system cannot find the path specified") and then - // panics via `.expect("initializing rolling file appender failed")` - // — killing the detached daemon before it ever binds IPC. - // - // Coerce both `None` and `Some("")` to the current directory so - // relative `--log-file` paths work the same everywhere. - let parent_dir = match resolved.parent() { - Some(parent) if !parent.as_os_str().is_empty() => parent, - _ => std::path::Path::new("."), - }; - let _mkdir_ignore = std::fs::create_dir_all(parent_dir); - - let file_appender = tracing_appender::rolling::never( - parent_dir, - resolved - .file_name() - .unwrap_or_else(|| std::ffi::OsStr::new("uffsd.log")), - ); - let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); - // `try_init` — a subscriber may already exist when invoked via - // the embedded `uffs daemon run` path. - let _ignore = tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(false) - .with_ansi(false) - .with_writer(non_blocking) - .try_init(); - Some(guard) - } else { - let _ignore = tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(false) - .try_init(); - None - } -} +pub use log_init::init_tracing; /// Configuration for [`run_daemon`]. pub struct DaemonConfig { @@ -208,40 +147,6 @@ pub struct DaemonConfig { pub log_file: Option, } -/// Bail if the daemon has nothing to serve. -fn validate_data_sources( - mft_files: &[PathBuf], - drives: &[uffs_mft::platform::DriveLetter], - lifecycle_mgr: &lifecycle::LifecycleManager, -) -> anyhow::Result<()> { - let has_data = !mft_files.is_empty() || { - #[cfg(windows)] - { - !drives.is_empty() - } - #[cfg(not(windows))] - { - // `drives` is Windows-only; no auto-discovery on macOS/Linux. - // The explicit type pins the annotation clippy expects on - // discarded bindings. - let _: &[uffs_mft::platform::DriveLetter] = drives; - false - } - }; - if !has_data { - tracing::error!( - "No data sources provided. On macOS/Linux pass --mft-file; \ - on Windows, NTFS drives are auto-discovered." - ); - lifecycle_mgr.remove_pid_file(); - anyhow::bail!( - "Daemon has no data sources to load. \ - Provide --mft-file (or --data-dir when launching via CLI)." - ); - } - Ok(()) -} - /// Run the UFFS daemon with the given configuration. /// /// This is the main entry point shared by both the standalone @@ -255,13 +160,13 @@ fn validate_data_sources( /// Returns an error if another daemon is already running, data sources /// are missing, or the IPC server fails to bind. pub async fn run_daemon(config: DaemonConfig) -> anyhow::Result<()> { - install_catastrophe_panic_hook(); - log_daemon_starting(&config); + startup::install_catastrophe_panic_hook(); + startup::log_daemon_starting(&config); let (event_tx, _event_rx) = events::event_channel(); - emit_daemon_starting_event(&event_tx); + startup::emit_daemon_starting_event(&event_tx); - let lifecycle_mgr = bootstrap_lifecycle_manager(&config, event_tx.clone())?; + let lifecycle_mgr = startup::bootstrap_lifecycle_manager(&config, event_tx.clone())?; // D5.0: clean up stale shmem files from previous daemon sessions. uffs_client::shmem::cleanup_stale_shmem_files(); @@ -277,7 +182,7 @@ pub async fn run_daemon(config: DaemonConfig) -> anyhow::Result<()> { // platform-default location. Factored out to keep // `run_daemon`'s cognitive complexity under the workspace's // strict-clippy ceiling. - let daemon_config = load_daemon_config()?; + let daemon_config = startup::load_daemon_config()?; // Create index manager — uses the user-supplied --data-dir for offline MFT // discovery and hot-loading (not the lifecycle directory). @@ -288,12 +193,12 @@ pub async fn run_daemon(config: DaemonConfig) -> anyhow::Result<()> { )); tracing::debug!(index_data_dir = ?idx.data_dir(), "Index manager created"); - let mft_files = gather_mft_files(&config); - let drives = resolve_drive_list(&config); + let mft_files = startup::gather_mft_files(&config); + let drives = startup::resolve_drive_list(&config); tracing::info!(mft_files = mft_files.len(), drives = ?drives, "Final data sources"); // Refuse to start with zero data sources — an empty daemon is useless. - validate_data_sources(&mft_files, &drives, &lifecycle_mgr)?; + startup::validate_data_sources(&mft_files, &drives, &lifecycle_mgr)?; tracing::info!("Data sources validated OK"); let load_task = spawn_load_task( @@ -328,217 +233,7 @@ pub async fn run_daemon(config: DaemonConfig) -> anyhow::Result<()> { // Run idle timer (blocks until shutdown or timeout) then tear // everything down. Returns `!` so `force_exit_with_watchdog` // covers the post-await tail. - await_shutdown_then_force_exit(lifecycle_mgr, ipc_task, load_task).await -} - -/// Emit the startup `tracing::info!` line with every config field -/// the operator might want to grep for. Extracted so the orchestrator -/// stays under clippy's `cognitive_complexity` budget. -fn log_daemon_starting(config: &DaemonConfig) { - tracing::info!( - pid = std::process::id(), - version = env!("CARGO_PKG_VERSION"), - broker_available = broker_client::broker_available(), - mft_files = ?config.mft_files, - drives = ?config.drives, - data_dir = ?config.data_dir, - no_cache = config.no_cache, - no_retire = config.no_retire, - "uffsd starting" - ); -} - -/// Publish the [`DaemonEvent::DaemonStarting`] notification so any -/// pre-IPC subscriber (e.g. the embedded MCP server) sees the -/// transition. -fn emit_daemon_starting_event(event_tx: &events::EventSender) { - event_tx.emit(events::DaemonEvent::DaemonStarting { - pid: std::process::id(), - version: env!("CARGO_PKG_VERSION").to_owned(), - }); -} - -/// Wait for the idle timer / shutdown signal, then run the graceful -/// shutdown sequence: abort the IPC task, timeout-join the load task, -/// drop the lifecycle manager (which cleans up PID + socket files), -/// and finally force-exit via the watchdog. -/// -/// Returns `!` because both legitimate exits (clean shutdown, watchdog -/// abort) terminate the process. -async fn await_shutdown_then_force_exit( - mut lifecycle_mgr: lifecycle::LifecycleManager, - ipc_task: tokio::task::JoinHandle<()>, - load_task: tokio::task::JoinHandle<()>, -) -> ! { - lifecycle_mgr.run_idle_timer().await; - - tracing::info!("Daemon shutting down"); - ipc_task.abort(); - // Give the load task a brief window to finish, then abandon it. - // Stuck kernel-mode I/O threads cannot be cancelled, so we don't - // wait indefinitely — process::exit at the bottom will clean up. - let shutdown_deadline = tokio::time::timeout(core::time::Duration::from_secs(3), load_task); - let _ignore = shutdown_deadline.await; - tracing::info!("Daemon stopped"); - - // Clean up PID + socket files before exiting. - drop(lifecycle_mgr); - - force_exit_with_watchdog() -} - -/// Install a panic hook that runs the existing default hook (so the -/// usual stack trace + payload still print) and then force-exits. -/// -/// Without this, a panic on any blocking I/O thread can leave the -/// daemon in a zombie state — the default hook tries to unwind through -/// kernel-mode I/O which may never return. Force-exiting with code -/// `101` matches Rust's standard panic exit code so process supervisors -/// don't see a "clean" 0-exit on a panic. -fn install_catastrophe_panic_hook() { - let default_hook = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |info| { - default_hook(info); - #[expect(clippy::exit, reason = "catastrophe safety net — force-exit on panic")] - { - std::process::exit(101); - } - })); -} - -/// Resolve the operator's `daemon.toml` from the platform-default -/// location and emit a structured `tracing::info!` event with the -/// resolved path. -/// -/// Phase 6 Commit C task 6.5 helper. A missing file is **not** an -/// error: [`config::Config::load_default`] returns the -/// Phase-3-equivalent defaults so every existing deployment boots -/// with the same observable behavior (plan task 6.8). A malformed -/// file propagates as a startup error so a typo doesn't silently -/// fall back to defaults — the operator gets a precise parser error -/// with line and column. -/// -/// Returned as `Arc` so the index manager and any future -/// background controller can share a single read-only view without -/// cloning the BTreeMap-bearing `[shards.per_drive]` table. -fn load_daemon_config() -> anyhow::Result> { - let cfg = config::Config::load_default() - .map_err(|err| anyhow::anyhow!("Failed to load daemon.toml from default path: {err}"))?; - tracing::info!( - daemon_config_path = ?config::Config::default_path(), - "daemon.toml resolved (or defaults used when missing)", - ); - Ok(Arc::new(cfg)) -} - -/// Build the [`LifecycleManager`], gate against another running -/// instance via the PID file, and write a fresh PID file. -/// -/// Returns the manager ready for use, or bails when another daemon is -/// already alive. -fn bootstrap_lifecycle_manager( - config: &DaemonConfig, - event_tx: events::EventSender, -) -> anyhow::Result { - // Determine data directory: - // - lifecycle_dir: always %LOCALAPPDATA%\uffs — PID/socket/lock files - // - data_dir: user-supplied --data-dir (for MFT file discovery/hot-load) - let lifecycle_dir = dirs_next::data_local_dir() - .map_or_else(|| PathBuf::from("/tmp/uffs"), |base| base.join("uffs")); - - let idle_timeout = if config.no_retire { - None - } else { - Some(core::time::Duration::from_secs(config.idle_timeout)) - }; - let mut lifecycle_mgr = - lifecycle::LifecycleManager::new(&lifecycle_dir, idle_timeout, event_tx); - - tracing::info!(data_dir = %lifecycle_mgr.data_dir().display(), "Lifecycle data directory"); - - if !lifecycle_mgr.check_stale_pid() { - tracing::error!("Another daemon instance is already running"); - anyhow::bail!("Another daemon instance is already running"); - } - - lifecycle_mgr.write_pid_file()?; - tracing::info!("PID file written"); - Ok(lifecycle_mgr) -} - -/// Merge `--mft-file` arguments with files discovered under -/// `--data-dir`, applying the `--drive` filter when present. -fn gather_mft_files(config: &DaemonConfig) -> Vec { - let mut mft_files = config.mft_files.clone(); - let Some(dir) = config.data_dir.as_ref() else { - return mft_files; - }; - - let discovered = uffs_mft::discovery::discover_mft_files(dir); - let filtered: Vec = if config.drives.is_empty() { - discovered - } else { - discovered - .into_iter() - .filter(|path| drive_letter_matches(path, &config.drives)) - .collect() - }; - tracing::info!( - data_dir = %dir.display(), - count = filtered.len(), - filter = ?config.drives, - "Discovered MFT files from --data-dir" - ); - mft_files.extend(filtered); - mft_files -} - -/// Returns `true` when `path`'s parent directory carries a -/// `drive_` prefix that matches one of `wanted` (case- -/// insensitive — `DriveLetter::parse` canonicalises to uppercase). -fn drive_letter_matches( - path: &std::path::Path, - wanted: &[uffs_mft::platform::DriveLetter], -) -> bool { - path.parent() - .and_then(|parent| parent.file_name()) - .and_then(|name| name.to_str()) - .and_then(|name| name.strip_prefix("drive_")) - .and_then(|suffix| suffix.chars().next()) - .and_then(|letter_ch| uffs_mft::platform::DriveLetter::parse(letter_ch).ok()) - .is_some_and(|letter| wanted.contains(&letter)) -} - -/// Resolve the drive list to scan. -/// -/// On Windows, an empty `--drive` triggers auto-discovery; non-empty -/// respects the explicit list. Always empty on non-Windows since -/// live MFT scanning is Windows-only. -#[cfg(windows)] -fn resolve_drive_list(config: &DaemonConfig) -> Vec { - let explicit = config.drives.clone(); - if explicit.is_empty() { - let auto_drives = uffs_mft::detect_ntfs_drives(); - tracing::info!( - count = auto_drives.len(), - drives = ?auto_drives, - "Auto-discovered NTFS drives (no --drive flag)" - ); - auto_drives - } else { - tracing::info!( - drives = ?explicit, - "Loading only requested drives (--drive flag)" - ); - explicit - } -} - -/// Non-Windows variant: live MFT scanning is unsupported, so the -/// drive list is always empty regardless of `config`. -#[cfg(not(windows))] -const fn resolve_drive_list(_config: &DaemonConfig) -> Vec { - Vec::new() + shutdown::await_shutdown_then_force_exit(lifecycle_mgr, ipc_task, load_task).await } /// Spawn the parallel load task that reads `mft_files` from disk and @@ -992,49 +687,5 @@ pub(crate) fn spawn_pressure_subscriber( }) } -/// Final shutdown: spawn a 5 s watchdog thread that calls -/// [`std::process::abort`] if `process::exit` itself hangs (kernel -/// I/O can wedge atexit handlers), then force-exit. -/// -/// Returns `!` because both arms terminate the process. -fn force_exit_with_watchdog() -> ! { - tracing::info!("Spawning shutdown watchdog (5s grace period)"); - _ = std::thread::Builder::new() - .name("shutdown-watchdog".into()) - .spawn(|| { - std::thread::sleep(core::time::Duration::from_secs(5)); - // process::exit did not complete in 5 s — threads are stuck - // in kernel I/O. Force-terminate via abort(). - // - // Use eprintln! as a last-resort — tracing may not flush - // before abort(). print_stderr is intentional here: this is - // a catastrophe path where the structured logging subsystem - // may be unavailable. - let msg = "Shutdown watchdog: process::exit stuck for 5s — calling abort()"; - tracing::error!("{msg}"); - #[expect( - clippy::print_stderr, - reason = "catastrophe path — tracing may be dead" - )] - let _: () = eprintln!("[CATASTROPHE] {msg}"); - std::process::abort(); - }); // best-effort; if thread spawn fails, exit may still work - - // Force-exit the process. The Windows IPC server uses - // `std::os::windows::net::UnixListener` with `spawn_blocking(accept)` - // and per-connection `std::thread::spawn` bridge threads. These - // blocking std threads cannot be cancelled by `ipc_task.abort()` and - // will keep the process alive indefinitely after the daemon logic has - // finished, turning it into a multi-GB zombie. `process::exit(0)` is - // the standard pattern for daemons with uncancellable blocking threads. - #[expect( - clippy::exit, - reason = "daemon has orphaned blocking threads that prevent normal exit" - )] - { - std::process::exit(0); - } -} - #[cfg(test)] mod tests; diff --git a/crates/uffs-daemon/src/log_init.rs b/crates/uffs-daemon/src/log_init.rs new file mode 100644 index 000000000..fe284b751 --- /dev/null +++ b/crates/uffs-daemon/src/log_init.rs @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2025-2026 SKY, LLC. + +//! Tracing-subscriber bootstrap for the daemon. +//! +//! Extracted from `lib.rs` so the daemon's startup graph + `spawn_*` +//! cluster can stay focused on lifecycle wiring without the log-file +//! parent-directory normalisation noise. No collision risk with the +//! `tracing` crate because the module is named `log_init`. + +use std::path::PathBuf; + +/// Default log file location: `/uffs/uffsd.log`. +/// +/// Falls back to `./uffsd.log` if the platform data directory +/// cannot be determined. +#[must_use] +pub(crate) fn default_log_file() -> PathBuf { + dirs_next::data_local_dir().map_or_else( + || PathBuf::from("uffsd.log"), + |dir| dir.join("uffs").join("uffsd.log"), + ) +} + +/// Initialise tracing for the daemon process. +/// +/// * `log_file = Some(path)` — write to that file (append mode). A path of +/// `"-"` or empty string uses `default_log_file`. +/// * `log_file = None` **and** the effective log level is `debug` or `trace` — +/// automatically write to `default_log_file` so that diagnostic output is +/// never lost to `/dev/null`. +/// * `log_file = None` with a higher level — write to stdout. +/// +/// Returns a guard that **must** be held until the daemon exits — +/// dropping it flushes the non-blocking writer. +#[must_use] +pub fn init_tracing( + log_spec: &str, + log_file: Option<&std::path::Path>, +) -> Option { + let filter = tracing_subscriber::EnvFilter::try_new(log_spec) + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + + // Decide whether to use a file writer. + let is_verbose = { + let lower = log_spec.to_ascii_lowercase(); + lower.contains("debug") || lower.contains("trace") + }; + let effective_file: Option = match log_file { + Some(path) => { + let resolved = if path.as_os_str().is_empty() || path == std::path::Path::new("-") { + default_log_file() + } else { + path.to_path_buf() + }; + Some(resolved) + } + None if is_verbose => Some(default_log_file()), + None => None, + }; + + if let Some(resolved) = effective_file { + // Compute a *safe* parent directory. + // + // `PathBuf::from("uffsd.log").parent()` returns `Some(Path::new(""))`, + // not `None` — so the defensive `unwrap_or_else(|| Path::new("."))` + // below used to never fire for a relative file name, and + // `tracing_appender::rolling::never("", "uffsd.log")` would propagate + // the empty path through `create_dir_all("")`, which errors on + // Windows ("The system cannot find the path specified") and then + // panics via `.expect("initializing rolling file appender failed")` + // — killing the detached daemon before it ever binds IPC. + // + // Coerce both `None` and `Some("")` to the current directory so + // relative `--log-file` paths work the same everywhere. + let parent_dir = match resolved.parent() { + Some(parent) if !parent.as_os_str().is_empty() => parent, + _ => std::path::Path::new("."), + }; + let _mkdir_ignore = std::fs::create_dir_all(parent_dir); + + let file_appender = tracing_appender::rolling::never( + parent_dir, + resolved + .file_name() + .unwrap_or_else(|| std::ffi::OsStr::new("uffsd.log")), + ); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + // `try_init` — a subscriber may already exist when invoked via + // the embedded `uffs daemon run` path. + let _ignore = tracing_subscriber::fmt() + .with_env_filter(filter) + .with_target(false) + .with_ansi(false) + .with_writer(non_blocking) + .try_init(); + Some(guard) + } else { + let _ignore = tracing_subscriber::fmt() + .with_env_filter(filter) + .with_target(false) + .try_init(); + None + } +} diff --git a/crates/uffs-daemon/src/shutdown.rs b/crates/uffs-daemon/src/shutdown.rs new file mode 100644 index 000000000..f64c22cfa --- /dev/null +++ b/crates/uffs-daemon/src/shutdown.rs @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2025-2026 SKY, LLC. + +//! Graceful-shutdown + force-exit watchdog for [`crate::run_daemon`]. +//! +//! Extracted from `lib.rs` so the orchestrator and the `spawn_*` +//! cluster stay focused on lifecycle wiring without the +//! `process::exit` / `process::abort` catastrophe-path noise. +//! Every fn here is `pub(crate)` — no external caller. + +use crate::lifecycle; + +/// Wait for the idle timer / shutdown signal, then run the graceful +/// shutdown sequence: abort the IPC task, timeout-join the load task, +/// drop the lifecycle manager (which cleans up PID + socket files), +/// and finally force-exit via the watchdog. +/// +/// Returns `!` because both legitimate exits (clean shutdown, watchdog +/// abort) terminate the process. +pub(crate) async fn await_shutdown_then_force_exit( + mut lifecycle_mgr: lifecycle::LifecycleManager, + ipc_task: tokio::task::JoinHandle<()>, + load_task: tokio::task::JoinHandle<()>, +) -> ! { + lifecycle_mgr.run_idle_timer().await; + + tracing::info!("Daemon shutting down"); + ipc_task.abort(); + // Give the load task a brief window to finish, then abandon it. + // Stuck kernel-mode I/O threads cannot be cancelled, so we don't + // wait indefinitely — process::exit at the bottom will clean up. + let shutdown_deadline = tokio::time::timeout(core::time::Duration::from_secs(3), load_task); + let _ignore = shutdown_deadline.await; + tracing::info!("Daemon stopped"); + + // Clean up PID + socket files before exiting. + drop(lifecycle_mgr); + + force_exit_with_watchdog() +} + +/// Final shutdown: spawn a 5 s watchdog thread that calls +/// [`std::process::abort`] if `process::exit` itself hangs (kernel +/// I/O can wedge atexit handlers), then force-exit. +/// +/// Returns `!` because both arms terminate the process. +fn force_exit_with_watchdog() -> ! { + tracing::info!("Spawning shutdown watchdog (5s grace period)"); + _ = std::thread::Builder::new() + .name("shutdown-watchdog".into()) + .spawn(|| { + std::thread::sleep(core::time::Duration::from_secs(5)); + // process::exit did not complete in 5 s — threads are stuck + // in kernel I/O. Force-terminate via abort(). + // + // Use eprintln! as a last-resort — tracing may not flush + // before abort(). print_stderr is intentional here: this is + // a catastrophe path where the structured logging subsystem + // may be unavailable. + let msg = "Shutdown watchdog: process::exit stuck for 5s — calling abort()"; + tracing::error!("{msg}"); + #[expect( + clippy::print_stderr, + reason = "catastrophe path — tracing may be dead" + )] + let _: () = eprintln!("[CATASTROPHE] {msg}"); + std::process::abort(); + }); // best-effort; if thread spawn fails, exit may still work + + // Force-exit the process. The Windows IPC server uses + // `std::os::windows::net::UnixListener` with `spawn_blocking(accept)` + // and per-connection `std::thread::spawn` bridge threads. These + // blocking std threads cannot be cancelled by `ipc_task.abort()` and + // will keep the process alive indefinitely after the daemon logic has + // finished, turning it into a multi-GB zombie. `process::exit(0)` is + // the standard pattern for daemons with uncancellable blocking threads. + #[expect( + clippy::exit, + reason = "daemon has orphaned blocking threads that prevent normal exit" + )] + { + std::process::exit(0); + } +} diff --git a/crates/uffs-daemon/src/startup.rs b/crates/uffs-daemon/src/startup.rs new file mode 100644 index 000000000..37abe3f18 --- /dev/null +++ b/crates/uffs-daemon/src/startup.rs @@ -0,0 +1,238 @@ +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2025-2026 SKY, LLC. + +//! Pre-spawn startup helpers for [`crate::run_daemon`]. +//! +//! Hosts the orchestrator's setup phase: panic-hook install, +//! structured startup logging, `daemon.toml` resolution, lifecycle- +//! manager bootstrap, data-source gathering + validation, and drive- +//! list resolution. Extracted from `lib.rs` so the orchestrator and +//! the `spawn_*` cluster stay focused on lifecycle wiring without the +//! data-source-discovery noise. Every fn here is `pub(crate)` — no +//! external caller. + +use alloc::sync::Arc; +use std::path::PathBuf; + +use crate::{DaemonConfig, broker_client, config, events, lifecycle}; + +/// Bail if the daemon has nothing to serve. +pub(crate) fn validate_data_sources( + mft_files: &[PathBuf], + drives: &[uffs_mft::platform::DriveLetter], + lifecycle_mgr: &lifecycle::LifecycleManager, +) -> anyhow::Result<()> { + let has_data = !mft_files.is_empty() || { + #[cfg(windows)] + { + !drives.is_empty() + } + #[cfg(not(windows))] + { + // `drives` is Windows-only; no auto-discovery on macOS/Linux. + // The explicit type pins the annotation clippy expects on + // discarded bindings. + let _: &[uffs_mft::platform::DriveLetter] = drives; + false + } + }; + if !has_data { + tracing::error!( + "No data sources provided. On macOS/Linux pass --mft-file; \ + on Windows, NTFS drives are auto-discovered." + ); + lifecycle_mgr.remove_pid_file(); + anyhow::bail!( + "Daemon has no data sources to load. \ + Provide --mft-file (or --data-dir when launching via CLI)." + ); + } + Ok(()) +} + +/// Emit the startup `tracing::info!` line with every config field +/// the operator might want to grep for. Extracted so the orchestrator +/// stays under clippy's `cognitive_complexity` budget. +pub(crate) fn log_daemon_starting(config: &DaemonConfig) { + tracing::info!( + pid = std::process::id(), + version = env!("CARGO_PKG_VERSION"), + broker_available = broker_client::broker_available(), + mft_files = ?config.mft_files, + drives = ?config.drives, + data_dir = ?config.data_dir, + no_cache = config.no_cache, + no_retire = config.no_retire, + "uffsd starting" + ); +} + +/// Publish the [`events::DaemonEvent::DaemonStarting`] notification +/// so any pre-IPC subscriber (e.g. the embedded MCP server) sees the +/// transition. +pub(crate) fn emit_daemon_starting_event(event_tx: &events::EventSender) { + event_tx.emit(events::DaemonEvent::DaemonStarting { + pid: std::process::id(), + version: env!("CARGO_PKG_VERSION").to_owned(), + }); +} + +/// Install a panic hook that runs the existing default hook (so the +/// usual stack trace + payload still print) and then force-exits. +/// +/// Without this, a panic on any blocking I/O thread can leave the +/// daemon in a zombie state — the default hook tries to unwind through +/// kernel-mode I/O which may never return. Force-exiting with code +/// `101` matches Rust's standard panic exit code so process supervisors +/// don't see a "clean" 0-exit on a panic. +pub(crate) fn install_catastrophe_panic_hook() { + let default_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + default_hook(info); + #[expect(clippy::exit, reason = "catastrophe safety net — force-exit on panic")] + { + std::process::exit(101); + } + })); +} + +/// Resolve the operator's `daemon.toml` from the platform-default +/// location and emit a structured `tracing::info!` event with the +/// resolved path. +/// +/// Phase 6 Commit C task 6.5 helper. A missing file is **not** an +/// error: [`config::Config::load_default`] returns the +/// Phase-3-equivalent defaults so every existing deployment boots +/// with the same observable behavior (plan task 6.8). A malformed +/// file propagates as a startup error so a typo doesn't silently +/// fall back to defaults — the operator gets a precise parser error +/// with line and column. +/// +/// Returned as `Arc` so the index manager and any future +/// background controller can share a single read-only view without +/// cloning the BTreeMap-bearing `[shards.per_drive]` table. +pub(crate) fn load_daemon_config() -> anyhow::Result> { + let cfg = config::Config::load_default() + .map_err(|err| anyhow::anyhow!("Failed to load daemon.toml from default path: {err}"))?; + tracing::info!( + daemon_config_path = ?config::Config::default_path(), + "daemon.toml resolved (or defaults used when missing)", + ); + Ok(Arc::new(cfg)) +} + +/// Build the [`lifecycle::LifecycleManager`], gate against another +/// running instance via the PID file, and write a fresh PID file. +/// +/// Returns the manager ready for use, or bails when another daemon is +/// already alive. +pub(crate) fn bootstrap_lifecycle_manager( + config: &DaemonConfig, + event_tx: events::EventSender, +) -> anyhow::Result { + // Determine data directory: + // - lifecycle_dir: always %LOCALAPPDATA%\uffs — PID/socket/lock files + // - data_dir: user-supplied --data-dir (for MFT file discovery/hot-load) + let lifecycle_dir = dirs_next::data_local_dir() + .map_or_else(|| PathBuf::from("/tmp/uffs"), |base| base.join("uffs")); + + let idle_timeout = if config.no_retire { + None + } else { + Some(core::time::Duration::from_secs(config.idle_timeout)) + }; + let mut lifecycle_mgr = + lifecycle::LifecycleManager::new(&lifecycle_dir, idle_timeout, event_tx); + + tracing::info!(data_dir = %lifecycle_mgr.data_dir().display(), "Lifecycle data directory"); + + if !lifecycle_mgr.check_stale_pid() { + tracing::error!("Another daemon instance is already running"); + anyhow::bail!("Another daemon instance is already running"); + } + + lifecycle_mgr.write_pid_file()?; + tracing::info!("PID file written"); + Ok(lifecycle_mgr) +} + +/// Merge `--mft-file` arguments with files discovered under +/// `--data-dir`, applying the `--drive` filter when present. +pub(crate) fn gather_mft_files(config: &DaemonConfig) -> Vec { + let mut mft_files = config.mft_files.clone(); + let Some(dir) = config.data_dir.as_ref() else { + return mft_files; + }; + + let discovered = uffs_mft::discovery::discover_mft_files(dir); + let filtered: Vec = if config.drives.is_empty() { + discovered + } else { + discovered + .into_iter() + .filter(|path| drive_letter_matches(path, &config.drives)) + .collect() + }; + tracing::info!( + data_dir = %dir.display(), + count = filtered.len(), + filter = ?config.drives, + "Discovered MFT files from --data-dir" + ); + mft_files.extend(filtered); + mft_files +} + +/// Returns `true` when `path`'s parent directory carries a +/// `drive_` prefix that matches one of `wanted` (case- +/// insensitive — `DriveLetter::parse` canonicalises to uppercase). +/// +/// `pub(crate)` so the regression-pin test in +/// [`crate::tests`] can exercise the contract directly without +/// going through [`gather_mft_files`]. +pub(crate) fn drive_letter_matches( + path: &std::path::Path, + wanted: &[uffs_mft::platform::DriveLetter], +) -> bool { + path.parent() + .and_then(|parent| parent.file_name()) + .and_then(|name| name.to_str()) + .and_then(|name| name.strip_prefix("drive_")) + .and_then(|suffix| suffix.chars().next()) + .and_then(|letter_ch| uffs_mft::platform::DriveLetter::parse(letter_ch).ok()) + .is_some_and(|letter| wanted.contains(&letter)) +} + +/// Resolve the drive list to scan. +/// +/// On Windows, an empty `--drive` triggers auto-discovery; non-empty +/// respects the explicit list. Always empty on non-Windows since +/// live MFT scanning is Windows-only. +#[cfg(windows)] +pub(crate) fn resolve_drive_list(config: &DaemonConfig) -> Vec { + let explicit = config.drives.clone(); + if explicit.is_empty() { + let auto_drives = uffs_mft::detect_ntfs_drives(); + tracing::info!( + count = auto_drives.len(), + drives = ?auto_drives, + "Auto-discovered NTFS drives (no --drive flag)" + ); + auto_drives + } else { + tracing::info!( + drives = ?explicit, + "Loading only requested drives (--drive flag)" + ); + explicit + } +} + +/// Non-Windows variant: live MFT scanning is unsupported, so the +/// drive list is always empty regardless of `config`. +#[cfg(not(windows))] +pub(crate) const fn resolve_drive_list( + _config: &DaemonConfig, +) -> Vec { + Vec::new() +} diff --git a/crates/uffs-daemon/src/tests.rs b/crates/uffs-daemon/src/tests.rs index 0f2efa9b2..5f471241d 100644 --- a/crates/uffs-daemon/src/tests.rs +++ b/crates/uffs-daemon/src/tests.rs @@ -10,7 +10,7 @@ use std::path::Path; -use super::drive_letter_matches; +use super::startup::drive_letter_matches; /// `drive_letter_matches` keys discovered MFT files to the /// `--drive` filter by walking `path.parent().file_name()` and diff --git a/crates/uffs-mcp/src/lib.rs b/crates/uffs-mcp/src/lib.rs index d69a3bcb9..9c6aa54e6 100644 --- a/crates/uffs-mcp/src/lib.rs +++ b/crates/uffs-mcp/src/lib.rs @@ -17,6 +17,19 @@ //! It is **not** in the query data path — it merely bridges MCP framing to the //! daemon's native protocol. //! +//! # Concurrency +//! +//! Runs on `#[tokio::main]` (default multi-threaded runtime). Spawns +//! a small number of long-lived tasks: the MCP stdio dispatcher (or +//! the `streamable-http` axum server when that feature is on), the +//! daemon-bridge `UffsClient` reader-loop, and per-RPC handler tasks +//! spawned by the rmcp SDK. Per-RPC timeouts inherit the async +//! client's 300 s `read_line` ceiling; the reload pipeline in the +//! crate-private `process::mcp_reload` is intentionally sequential +//! (one-shot CLI context). See +//! `docs/architecture/code-quality/concurrency_policy.md` for the +//! workspace contract. +//! //! # Usage //! //! ```rust,no_run diff --git a/crates/uffs-mft/src/lib.rs b/crates/uffs-mft/src/lib.rs index 866940317..a6c3fec81 100644 --- a/crates/uffs-mft/src/lib.rs +++ b/crates/uffs-mft/src/lib.rs @@ -14,6 +14,19 @@ //! manipulation //! - **Parquet Persistence**: Save/load indexes in compressed Parquet format //! +//! # Concurrency +//! +//! Predominantly a sync library + CLI binary. Tokio is used only by +//! the daemon-embedded loaders (`commands::load`, `commands::windows`) +//! to schedule per-drive `spawn_blocking` MFT reads in parallel. All +//! `std::fs::*` calls live in CLI command handlers or sync helpers +//! invoked from sync `fn main` (Phase 10f B3/B4 verdicts). No +//! per-shard background tasks, no shared mutable state, no channels +//! at this layer — the daemon owns all of that and pulls +//! `DataFrame` snapshots from us. See +//! `docs/architecture/code-quality/concurrency_policy.md` for the +//! workspace contract. +//! //! ## Quick Start //! //! ```rust,ignore diff --git a/docs/architecture/code-quality/concurrency_policy.md b/docs/architecture/code-quality/concurrency_policy.md new file mode 100644 index 000000000..b518ce104 --- /dev/null +++ b/docs/architecture/code-quality/concurrency_policy.md @@ -0,0 +1,422 @@ +# UFFS Concurrency Policy + +UFFS enforces a **strict task-ownership, lock-discipline, channel-backpressure, timeout-coverage, and blocking-IO posture in production async code** via a combination of: + + * workspace-wide Clippy lints (the `await_holding_lock`, `await_holding_refcell_ref` and `await_holding_invalid_type` families at `deny`), + * a workspace inventory script (`scripts/dev/concurrency_audit.sh`) that emits a Markdown report per dimension at hook time and CI time, and + * a per-site annotation contract that contributors quote inline at every spawn / lock / channel / timeout / blocking-IO call site. + +This document is the project's **concurrency contract**: it codifies *what shape* a `tokio::spawn` / lock / channel / timeout / `std::fs::*`-in-async site must take, and *how* a contributor justifies one inline. + +The companion docs cover the broader posture: + + * [`lint-posture.md`](lint-posture.md) — workspace lint configuration. + * [`panic_policy.md`](panic_policy.md) — when `unwrap` / `expect` / `panic!` is acceptable. + * [`allocation_policy.md`](allocation_policy.md) — clone-and-allocation discipline in hot paths. + * [`trait_policy.md`](trait_policy.md) — trait / generic / dispatch shapes. + * [`dependency_policy.md`](dependency_policy.md) — feature additivity + dep duplication. + * [`build_codegen_policy.md`](build_codegen_policy.md) — build.rs / macro / env-var justification. + +For the per-phase strategy that produced the current posture, see [`../../dev/architecture/code_clean/phase_10_async_concurrency_shared_state_implementation_plan.md`](../../dev/architecture/code_clean/phase_10_async_concurrency_shared_state_implementation_plan.md) *(local-only — internal plan)*. + +--- + +## 0 The model at a glance + +UFFS has one process where concurrency matters in production — the daemon (`uffs-daemon`). Three satellite binaries (`uffs-mft`, `uffs-mcp`, `uffs-mcp http_gateway`) carry their own `#[tokio::main]` multi-threaded runtimes but are simpler one-shot CLIs or stateless protocol bridges; `uffs-cli` is sync end-to-end. This page describes the daemon's runtime model so a new contributor can read it in five minutes; everything in §1-§9 below follows from it. + +### 0.1 Daemon task graph + +The daemon's startup graph spawns a fixed set of long-lived tasks named via dedicated `spawn_*` constructors in `crates/uffs-daemon/src/lib.rs`. Eight tasks total (six top-level + two subsystem-internal): + +```mermaid +flowchart TD + R[run_daemon orchestrator] + R --> L[spawn_load_task
drive-load + journal-loop bootstrap] + R --> I[spawn_ipc_servers
AF_UNIX accept loop +
Windows named-pipe sibling] + R --> S[spawn_stats_heartbeat
periodic DaemonStats snapshot] + R --> D[spawn_idle_demote_controller
30 s TTL demote sweep] + R --> M[spawn_mem_snapshot_task
memory telemetry] + R --> P[spawn_pressure_subscriber
OS memory-pressure watcher] + L --> J[spawn_journal_loops_for_warm_shards
per-drive USN journal loop] + J --> JA[RegistryPatchSink::spawn_with_applier
journal-event sink] + I --> C[handle_connection
fire-and-forget per accepted client] + C --> W[writer_loop / notification_loop] +``` + +Per-connection sub-tasks (`reader_loop`, `writer_loop`, `notification_loop`) are abort-on-EOF and not shown as top-level edges. The full per-site rustdoc inventory lives in §6 ("Spawn-site registry") below. + +### 0.2 Shard-state lifecycle + +The daemon's index is a per-drive collection of shards, each transitioning between six states (`crates/uffs-daemon/src/cache/shard.rs::ShardState`): + +| State | Body | Bloom | Trie | Entered by | +|---|---|---|---|---| +| `Unknown` | – | – | – | initial discovery | +| `Cold` | – | – | – | parked-compactor demote (from `Parked`) | +| `Parked` | – | ✓ | ✓ | `spawn_idle_demote_controller` after TTL OR `spawn_pressure_subscriber` cascade | +| `Warm` | mmap | ✓ | ✓ | initial load OR promote from `Parked` | +| `Hot` | mmap + prefaulted | ✓ | ✓ | recent search activity | +| `Evicting` | (in transit) | – | – | transient — demote in progress | + +Legal transitions are pinned in `ShardState::can_transition_to`. The two demote drivers are: + + * **Idle TTL** — `spawn_idle_demote_controller` runs every 30 s and demotes `Warm`/`Hot` → `Parked` after a configurable idle-since-last-access window. + * **Memory-pressure cascade** — `spawn_pressure_subscriber` listens to the OS memory-pressure watch and cascades `Warm` → `Cold` one step at a time on `Low` transitions, preempted by `High`/`Normal`. No-op on Mac/Linux (the platform `PressureSignal` never fires by design). + +### 0.3 IPC-request lifecycle + + 1. **Accept** — `spawn_ipc_servers` runs the `AF_UNIX` accept loop on Unix and an `AF_UNIX` + Windows-named-pipe sibling pair on Windows. Per-connection `MAX_CONNECTIONS = 1024` cap. + 2. **Per-connection task** — each accepted client spawns a fire-and-forget `handle_connection` task; ownership self-resets on socket EOF or `IDLE_CONNECTION_SECS = 60 s` idle timeout. + 3. **Reader / writer / notifier** — inside the connection task, `reader_loop` reads JSON-RPC frames and dispatches to `handler::handle_request`; `writer_loop` writes responses; `notification_loop` forwards broadcast events. Sub-tasks are `.abort()`-ed when `reader_loop` returns. + 4. **Per-RPC timeout** — long-running methods carry their own deadline: search 30 s (`SEARCH_TIMEOUT`), drive-load `IndexManager::DRIVE_LOAD_TIMEOUT`, refresh fire-and-forget (immediate 202 ack; runs to completion or process exit). + +### 0.4 Shutdown sequence + +The terminal phase lives in `crates/uffs-daemon/src/shutdown.rs::await_shutdown_then_force_exit`: + + 1. **Signal source** — idle timer expiry, `Ctrl-C`, or RPC `Shutdown` method. Any one releases `lifecycle_mgr.run_idle_timer().await`. + 2. **IPC drain** — `ipc_task.abort()` cancels the accept loop; in-flight per-connection tasks finish their current request and exit on next read EOF. + 3. **Load drain** — `tokio::time::timeout(3 s, load_task)` waits briefly; abandons on the deadline because stuck kernel-mode I/O cannot be cancelled. + 4. **PID + socket cleanup** — `drop(lifecycle_mgr)` removes the PID file and the Unix-domain socket file. + 5. **Force exit** — `force_exit_with_watchdog` spawns a 5 s watchdog thread that calls `std::process::abort` if `process::exit` itself hangs (kernel I/O can wedge atexit handlers), then calls `process::exit(0)`. + +--- + +## 1 The rule + +Stated as a one-liner contributors can quote: + +> **Every `tokio::spawn` declares its owner / shutdown / errors / cancellation. +> Every async lock guard is dropped before the next `.await`. +> Every channel is bounded with documented capacity OR unbounded with a documented producer-rate ceiling. +> Every cross-process / cross-thread / cross-network await has a timeout OR is justified as a cooperatively-cancelled forever-loop. +> Every `std::fs::*` / `std::thread::sleep` inside an `async fn` is either wrapped in `spawn_blocking` / `block_in_place` OR is a sync helper called only from sync contexts.** + +The rule is enforced by **three layers**: + + 1. **Clippy lints** at `deny` level in the workspace `Cargo.toml`: + + ```toml + [workspace.lints.clippy] + await_holding_lock = "deny" # No std::sync::Mutex held across .await + await_holding_refcell_ref = "deny" # No RefCell::borrow() held across .await + await_holding_invalid_type = "deny" # No Rc / Cell held across .await (Send violation) + ``` + + These three lints live in clippy's `suspicious` group (default `warn`) and are pinned at `deny` explicitly so the contract survives any future tightening of the workspace `-D warnings` shape — even if `--deny warnings` is removed from a CI gate, the named entries hold the line. + + 2. **`scripts/dev/concurrency_audit.sh`** — emits an 11-section Markdown report covering: per-crate async-surface table, `tokio::spawn` site list, async-lock site list, `.read/.write/.lock().await` candidate set, `Arc>` nesting, channel inventory, timeout coverage, blocking-IO-in-async candidate files, cancellation infrastructure, `#[tokio::test]` count. Runs as part of pre-push `lint-pre-push` and CI's `pr-fast.yml`. + + 3. **Per-site annotation contract** — every spawn / lock / channel / timeout / blocking-IO call site carries either a rustdoc `# Task ownership` / `# Concurrency` section OR an inline `// Phase 10x:` comment explaining the invariant. Audits live in `docs/dev/baseline/2026-05-19/phase_10_*.md` (local-only); the policy categories below summarise the verdicts. + +Test code is **exempt** from the spawn-ownership and timeout-coverage rules — `#[tokio::test]` harnesses spawn ephemeral tasks whose ownership is the test fixture itself, and timeouts in tests are replaced by `wait_for` polling primitives. Test code is NOT exempt from the lock-discipline and blocking-IO rules (a lock held across `.await` in a test is the same bug it would be in prod). + +--- + +## 2 The five dimensions + +Every prod concurrency primitive in the workspace must fit exactly one of these five dimensions, each with a specific posture, taxonomy, and required treatment. + +### 2.1 Task ownership (Phase 10c) + +Every `tokio::spawn(` and every named `spawn_*` constructor function must answer four questions in rustdoc OR an inline `// Task ownership:` comment: + + * **Parent task** — which logical task owns this spawn? Daemon main, IPC accept loop, per-shard journal loop, … + * **Shutdown mechanism** — how does the spawned task receive a stop signal? `watch::Sender`, drop-the-channel, parent-task cancellation, `CancellationToken`, `process::exit`, … + * **Error observation policy** — what happens to the task's `Result` / panics? Awaited and propagated, logged + dropped, `JoinSet`-collected, watchdog'd, … + * **Cancellation behavior** — what state does cancellation leave behind? Idempotent shutdown, partial-state OK, drains-on-cancel, fire-and-forget OK, … + +Taxonomy: + +| # | Category | Example | Required treatment | +|---|---|---|---| +| **T1** | Named constructor with explicit ownership | `spawn_per_shard_journal_loop` / `spawn_idle_demote_controller` | Rustdoc `# Task ownership` section answering all four questions | +| **T2** | Inline `tokio::spawn(` with documented inline shape | IPC connection-handler spawn | Inline `// Task ownership:` comment block above the spawn answering all four questions | +| **T3** | Fire-and-forget by design | `handle_refresh` (returns 202; task continues independently) | Inline `// Task ownership: fire-and-forget — ` comment | +| **T4** | Test-only spawn | `#[tokio::test] tokio::spawn(...)` | Exempt from the contract | + +**Workspace inventory (post-Phase 10c):** 18 prod sites + 9 test-only sites. All 18 prod sites carry T1/T2/T3 documentation. Full per-site verdict table: `docs/dev/baseline/2026-05-19/phase_10_task_ownership_inventory.md`. + +### 2.2 Lock discipline (Phase 10b) + +The workspace bans holding any lock across an `.await`. Three lints at `deny` enforce the std::sync side mechanically; the async-lock side (`tokio::sync::{Mutex, RwLock}`) is enforced by hand-audit because Clippy cannot statically prove a `MutexGuard<…>` doesn't escape across an `.await` in tokio's `Mutex::lock().await` shape. + +Taxonomy: + +| # | Category | Example | Required treatment | +|---|---|---|---| +| **L1** | Extract-then-await | `let v = guard.field; drop(guard); other.await(v)` | Standard pattern; no annotation required | +| **L2** | Single-statement guard | `*self.counter.write() += 1` | Standard pattern; no annotation required | +| **L3** | Sync-only work inside guard | `let g = lock.read(); cpu_only(&g); /* no await */` | Standard pattern; no annotation required | +| **L4** | Explicit `drop(guard);` before await | `let g = lock.read(); let v = g.field; drop(g); v.await()` | Standard pattern; no annotation required | +| **L5** | Snapshot-then-await with rustdoc invariant | `status` RPC reading multiple fields under a single read guard | `# Concurrency` rustdoc on the enclosing fn naming the snapshot invariant | +| **L6** | **Lock held across `.await`** | **Forbidden** | **Refactor to L1/L4/L5; never suppress** | + +**Workspace inventory (post-Phase 10b):** 36 candidate sites; 34 textbook-clean (L1-L4), 2 augmented to L5 in PR #304 (`stats.rs:78` status RPC + `handler/mod.rs:256` MCP dispatch). 0 L6 sites. Full per-site verdict table: `docs/dev/baseline/2026-05-19/phase_10_lock_across_await_audit.md`. + +### 2.3 Channel discipline (Phase 10d) + +Every channel construction must declare its bound or its "by-construction bounded" rationale. + +Taxonomy: + +| # | Category | Example | Required treatment | +|---|---|---|---| +| **C1** | Bounded `mpsc::channel(N)` with documented capacity | `event_channel(64)` | Rustdoc on the constructor naming N and why this capacity | +| **C2** | `broadcast::channel(N)` for fan-out | daemon notifications to multiple clients | Rustdoc naming N + the slow-consumer drop semantics | +| **C3** | `oneshot::channel()` for single-shot signal | shutdown notification | No annotation required (oneshot has no capacity question) | +| **C4** | `watch::channel(init)` for state-snapshot fan-out | per-shard cancel signal | No annotation required (watch keeps only latest) | +| **C5** | `mpsc::unbounded_channel()` with documented "by-construction bounded" rationale | `journal_sink::apply_tx` (rate-bounded by `SaveTrigger`); `client::notification_tx` (rate-bounded by daemon broadcast capacity) | Inline `// Phase 10d:` comment OR rustdoc on the field/constructor naming the upstream producer-rate ceiling + memory worst-case | +| **C6** | **Unbounded without rationale** | **Forbidden** | **Convert to C1 or document as C5; never leave undocumented** | + +**Workspace inventory (post-Phase 10d):** 2 prod unbounded channels (`journal_sink::apply_tx`, `client::notification_tx`), both C5 with documented ceilings. Full per-site verdict: `docs/dev/baseline/2026-05-19/phase_10_backpressure_audit.md`. + +### 2.4 Timeout policy (Phase 10e) + +Every long-running cross-process / cross-thread / cross-network await must have a `tokio::time::timeout` OR a documented "deliberately blocking forever" rationale. + +Taxonomy: + +| # | Category | Example | Required treatment | +|---|---|---|---| +| **W1** | `tokio::time::timeout(const, fut).await` | `IDLE_CONNECTION_SECS` on IPC reader_loop | Named constant + inline `Action on expiry` comment | +| **W2** | Env-overridable deadline | `UFFS_CLIENT_TIMEOUT_SECS` on sync client | Const default + env-parse helper + `# Errors` rustdoc on `ClientError::Timeout` | +| **W3** | Cooperatively-cancelled forever-loop | per-shard journal loop (cancelled via `watch::Sender`) | Loop body documents the cancel-signal source in rustdoc | +| **W4** | Inline literal timeout (acceptable but a smell) | search dispatch 30 s | Inline `// ` comment; flag for migration to W1 with a `DaemonConfig` knob in a future operator-config pass | +| **W5** | **Unbounded await on cross-process I/O** | **Forbidden** | **Wrap in `tokio::time::timeout` or convert to W3 with cancel-signal** | + +**Workspace inventory (post-Phase 10e):** 7 prod timeout sites across 4 crates. No W5 sites. Soft asymmetry: async client uses 300 s hard-coded (W4) while sync client uses W2 with `UFFS_CLIENT_TIMEOUT_SECS`. Full per-site verdict: `docs/dev/baseline/2026-05-19/phase_10_timeout_coverage_audit.md`. + +### 2.5 Blocking-IO rule (Phase 10f) + +Every `std::fs::*` / `std::thread::sleep` inside an `async fn` must be either: + + * **(a)** wrapped in `tokio::task::spawn_blocking` (preferred when the call site can give up ownership of its inputs), OR + * **(b)** wrapped in `tokio::task::block_in_place` (use when ownership can't be cheaply transferred; requires multi-threaded runtime), OR + * **(c)** inside a sync helper called only from sync contexts (CLI subcommand top-level, `Drop`, startup-once init). + +Taxonomy: + +| # | Category | Example | Required treatment | +|---|---|---|---| +| **B1** | `spawn_blocking` with owned inputs | rayon-driven `search_index` | Inline comment naming the work-amplification + cost class | +| **B2** | `block_in_place` with borrowed inputs | `write_rows_to_file` (Phase 10f fix) | Inline comment explaining why `spawn_blocking` is not used (ownership / clone-cost trade-off) + runtime-multi-threaded requirement | +| **B3** | Sync helper called from sync context | `bind_unix_listener`, `LifecycleManager::write_pid_file` | Function is `fn` (not `async fn`); no special annotation | +| **B4** | Sync I/O at startup / `Drop` / one-shot CLI | `init_tracing_subscriber`'s `create_dir_all`; `UffsClient::shutdown`'s 120-byte PID-file read | Inline comment naming the "bounded one-shot" rationale OR a comment pointing to the audit doc | +| **B5** | **Unbounded sync I/O on async runtime worker** | **Forbidden** | **Convert to B1 / B2; never leave on the runtime hot path** | + +**Workspace inventory (post-Phase 10f):** 14 candidate files; 13 sites B3/B4-justified; 1 prior B5 site (`search.rs:351` `write_rows_to_file`) fixed in PR #307 to B2. 0 remaining B5 sites. Full per-site verdict: `docs/dev/baseline/2026-05-19/phase_10_blocking_io_in_async_audit.md`. + +--- + +## 3 Shutdown coordination + +A separate concern from the five dimensions above — every long-lived async task must observe a shutdown signal AND every shutdown signal must reach every task that observes it. + +Workspace contract: + + * **Single source of truth** — the daemon's top-level `LifecycleHandle` owns the canonical `watch::Sender` used to broadcast cooperative shutdown to every spawned task. + * **Fan-out via `watch::Receiver`** — every long-lived task `select!`s on its work future + `cancel_rx.changed().await`. + * **Force-exit watchdog** — the daemon's `force_exit_with_watchdog` spawns a 5-second `std::thread::sleep` then `process::exit`; this guarantees a stuck kernel-mode I/O thread cannot prevent process termination. + * **Per-shard journal loops** — each per-drive journal loop has its own `JournalLoopHandle::cancel()` API that flips its dedicated `watch::Sender` and joins the task. + +Per-task shutdown semantics live at the spawn site under § 2.1's `# Task ownership` rubric. + +--- + +## 4 Required annotation shapes + +### 4.1 Per-spawn `# Task ownership` template + +Every T1 site (named constructor) carries a rustdoc section like: + +```rust +/// Spawn the per-shard USN journal loop for `drive`. +/// +/// # Task ownership +/// +/// * **Parent task** — daemon main runtime. +/// * **Shutdown mechanism** — `JournalLoopHandle::cancel()` flips the +/// per-loop `watch::Sender`; the loop's `select!` arm sees the +/// change and returns from the `loop`. +/// * **Error observation policy** — the `JoinHandle` is held by +/// `JournalLoopHandle`; `cancel()` returns the join future so the +/// caller can `.await` it to surface panics during shutdown. +/// * **Cancellation behavior** — the loop drains any pending +/// `SaveTrigger` before exiting; partial-state OK because the +/// applier task on the other side of `apply_tx` is idempotent on +/// `ApplyMsg::Save` (next batch will re-emit any missed events). +pub(crate) fn spawn_per_shard_journal_loop(/* … */) -> JournalLoopHandle { … } +``` + +T2 sites use the same four bullets as a `// Task ownership:` comment block. T3 sites use a single-line `// Task ownership: fire-and-forget — ` comment. + +### 4.2 Per-lock `# Concurrency` invariant template + +L5 sites carry a rustdoc section like: + +```rust +/// # Concurrency +/// +/// Holds an `IndexManager` read guard while building the `StatusResponse` +/// payload (multiple field reads under a single guard for snapshot +/// consistency). The `.await` on `build_search_profile` runs AFTER the +/// guard is dropped at the `;` on line N; the borrow checker enforces +/// this because `payload` does not outlive the guard. +async fn status(&self) -> StatusResponse { … } +``` + +### 4.3 Per-channel `// Phase 10d:` rationale template + +C5 sites carry an inline comment: + +```rust +// Phase 10d: unbounded by-design — see backpressure_audit.md. +let (notification_tx, notification_rx) = tokio::sync::mpsc::unbounded_channel(); +``` + +OR a rustdoc block on the field: + +```rust +/// Notification sender — incoming daemon notifications are forwarded here. +notification_tx: tokio::sync::mpsc::UnboundedSender, +``` + +with the upstream ceiling captured in a `# Backpressure` section on the wrapping struct OR a comment at the construction site. + +### 4.4 Per-timeout `// ` template + +W4 sites (inline literal) carry a brief comment naming the reason: + +```rust +// 30 s — interactive RPCs require sub-minute response; deliberately +// hard-coded for now, migrate to DaemonConfig in a future pass. +let search_outcome = tokio::time::timeout( + core::time::Duration::from_secs(30), + search_handle, +).await; +``` + +### 4.5 Per-blocking-IO `// Phase 10f:` template + +B2 sites carry a multi-line comment naming: + + * the work being blocked, + * the worst-case cost class, + * why `block_in_place` is chosen over `spawn_blocking`, + * the runtime-multi-threaded requirement. + +```rust +// Phase 10f: `write_rows_to_file` does sync `File::create` + +// buffered `write_all` + `rename` on the tokio runtime thread. +// For large result sets (10⁵+ rows × ~200 bytes ≈ tens of MB), +// the write blocks for tens-to-hundreds of ms; `block_in_place` +// tells the multi-threaded runtime to move other tasks off this +// worker. Cheaper than `spawn_blocking` here because the `Err` +// arm falls through to the IPC path and reuses `filtered_rows`. +let write_result = tokio::task::block_in_place(|| { + Self::write_rows_to_file(&filtered_rows, output_path, &output_config) +}); +``` + +--- + +## 5 Per-crate posture + +The five dimensions above apply differently to each crate depending on its runtime model: + +| Crate | Runtime model | Primary dimensions | Notes | +|---|---|---|---| +| `uffs-daemon` | `#[tokio::main]` multi-threaded | All 5 + shutdown coordination | The most concurrency-sensitive crate; per-shard journal loops, IPC accept loop, search dispatch, idle-demote controller all live here | +| `uffs-mcp` | `#[tokio::main]` multi-threaded | T1/T2 spawn + W1/W2 timeouts | HTTP gateway + reload pipeline; CLI subcommands have B4 blocking-IO exemptions | +| `uffs-client` | both — `UffsClient` (async) + `UffsClientSync` (sync watchdog) | C5 backpressure + W2 timeouts | Async vs sync timeout asymmetry documented in `phase_10_timeout_coverage_audit.md` | +| `uffs-mft` | CLI binary, mostly sync | B4 blocking-IO exemptions | CLI command handlers are inherently sequential | +| `uffs-core` | library, no runtime | None | Algorithmic core; no spawn / lock / channel / timeout sites | +| `uffs-security` | library, no runtime | None | Same | +| `uffs-broker` | Windows-only sync service | None | Synchronous named-pipe service; not in async runtime | + +Per-crate rustdoc `# Concurrency` sections at each crate root summarize the runtime model + cross-link this policy. + +--- + +## 6 Spawn-site registry + +The complete enumeration of every prod `tokio::spawn(` call site in the workspace, with the four facets the rule mandates (owner / shutdown / errors / cancellation). 18 prod sites total; the matching 9 `#[cfg(test)]` sites are exempt per §1. Source-of-truth detail per site (every nuance, every commit-time verdict) lives in the local hand-audit at `docs/dev/baseline/2026-05-19/phase_10_task_ownership_inventory.md`; this table is the workspace-tracked summary so contributors can verify the contract without leaving the policy doc. + +| # | Group | Site | Constructor | Owner | Shutdown | Errors | Cancel | +|---|---|---|---|---|---|---|---| +| A1 | top-level | `daemon/src/lib.rs` | `spawn_load_task` | `run_daemon` (held) | `.abort()` in `await_shutdown_then_force_exit` | `tracing` inside | cooperative + abort fallback | +| A2 | top-level | `daemon/src/lib.rs` (`AF_UNIX`) | `spawn_ipc_servers` | `run_daemon` (held) | `.abort()` in shutdown | `tracing` | abort (accept loop is cancellation-safe) | +| A3 | top-level | `daemon/src/lib.rs` (named-pipe, win) | inline | dropped | process exit | `tracing` | none — watchdog reaps | +| A4 | top-level | `daemon/src/lib.rs` | `spawn_stats_heartbeat` | dropped (`_stats_task`) | process exit | broadcast (infallible) | runs to exit | +| A5 | top-level | `daemon/src/lib.rs` | `spawn_idle_demote_controller` | dropped | process exit | `tracing` | runs to exit | +| A6 | top-level | `daemon/src/lib.rs` | `spawn_pressure_subscriber` | dropped | watch-sender drop on `IndexManager` drop | `tracing` | **cooperative** | +| B1 | subsystem | `daemon/src/telemetry.rs` | `spawn_mem_snapshot_task` | dropped | process exit | `tracing` | runs to exit | +| B2 | subsystem | `daemon/src/cache/journal_sink.rs` | `RegistryPatchSink::spawn_with_applier` | held by sink | `Weak` upgrade `None` | `tracing` | **cooperative** | +| B3 | subsystem | `daemon/src/cache/journal_loop.rs` | `spawn_journal_loop` | `JournalLoopHandle` | `cancel_tx.send(true)` (per-shard `watch`) | `JournalLoopHandle::wait_done()` for tests | **cooperative via `select!`** | +| C1 | per-conn | `daemon/src/ipc.rs` | `spawn_unix_connection` | dropped (per-conn) | socket EOF | `tracing::debug!` | none — bounded by `MAX_CONNECTIONS` + idle timeout | +| C2 | per-conn | `daemon/src/ipc.rs` (named-pipe, win) | inline | dropped | pipe EOF | `tracing::debug!` | none | +| C3 | per-conn | `daemon/src/ipc/windows_unix_bridge.rs` | inline (bridge) | dropped | duplex EOF | `tracing::debug!` | none | +| D1 | sub-task | `daemon/src/ipc.rs::handle_connection` | `Self::writer_loop` (inline) | parent connection | `.abort()` when `reader_loop` returns | indirect (via reader write fail) | abort | +| D2 | sub-task | `daemon/src/ipc.rs::handle_connection` | `Self::notification_loop` (inline) | parent connection | `.abort()` when `reader_loop` returns | indirect (broadcast `Closed`/`Lagged`) | abort | +| E1 | one-shot | `daemon/src/handler.rs` | inline (`handle_refresh`) | dropped | runs to completion **or** process exit | `tracing` inside `IndexManager::refresh` | none — short-lived | +| F1 | runtime | `daemon/src/index/dispatch.rs` | inline (single-flight cleanup) | dropped | cooperative via `Shared` future | discarded (every awaiter has own clone) | none — cleanup IS the cancel-safety mechanism | +| F2 | client | `client/src/connect_keepalive.rs` | `start_keepalive` | `KeepaliveGuard` | `oneshot::Sender` drop on guard drop | `tracing::debug!` | **cooperative via oneshot drop (RAII)** | +| F3 | binary | `mft/src/main.rs` | inline (`run_until_shutdown`) | local `run_task` | `Ctrl-C` via `tokio::select!` arm | `JoinError` classified by `classify_binary_task_error` | abort on signal; cooperative on natural completion | + +**Group legend:** A = daemon top-level orchestration (`run_daemon`-spawned). B = subsystem long-lived constructors. C = per-connection IPC. D = IPC connection-internal sub-tasks. E = application one-shots. F = runtime cleanup / external-crate spawns. + +Adding a new prod `tokio::spawn(` site requires a corresponding row here in the same PR — the `concurrency_audit.sh §2` count is the gate; a new row in §2 with no matching row here fails review. + +--- + +## 7 Verification + +Every PR that touches async code must surface a clean run of: + +```sh +just lint-prod # Clippy with await-holding lints at deny +bash scripts/dev/concurrency_audit.sh > /tmp/audit.md +diff <(sed -n '/## §1/,/## §2/p' /tmp/audit.md) <(sed -n '/## §1/,/## §2/p' docs/dev/baseline//concurrency_audit.md) +``` + +The audit script's per-section counts are the contract: + + * **§1 per-crate async-surface table** — column totals must not regress (new `tokio::spawn` must be accompanied by new T1/T2/T3 documentation). + * **§3 lock-across-await candidates** — every new entry needs an L1-L5 verdict in the PR description. + * **§7 missing-timeout candidates** — every new entry needs a W1-W4 verdict. + * **§8 blocking-IO-in-async candidates** — every new file in this list needs a B1-B4 verdict. + +Test code is exempt from the spawn-ownership and timeout-coverage rules; lock-discipline and blocking-IO rules apply equally to tests. + +--- + +## 8 Anti-patterns + +These shapes are **always wrong** in production code; submit a PR converting them, not suppressing them: + + * **Holding a lock across `.await`** — refactor to L1/L4 extract-then-await OR L5 snapshot-then-await. + * **`mpsc::unbounded_channel()` without a documented producer-rate ceiling** — convert to bounded OR document the ceiling as C5. + * **Cross-process `.await` without a timeout** — wrap in `tokio::time::timeout` OR add a `select!` cancel arm. + * **`std::fs::*` / `std::thread::sleep` directly inside an `async fn`** — wrap in `block_in_place` (B2) or `spawn_blocking` (B1), OR move to a sync helper (B3). + * **`tokio::spawn(...)` whose `JoinHandle` is dropped without a documented "fire-and-forget OK" reason** — convert to T3 with an inline `// Task ownership: fire-and-forget — …` comment. + * **`Arc>>>` nesting** — flatten to a single lock OR refactor to actor-style channel ownership. + * **`std::sync::Mutex` in an async context** — convert to `tokio::sync::Mutex` OR move the locked work into a `spawn_blocking` so it's clearly sync. + +--- + +## 9 Phase 10 audit trail + +The five dimensions above were each closed in a separate PR over Phase 10: + + * **10a** — `scripts/dev/concurrency_audit.sh` baseline tool (#303). + * **10b** — Lock-across-await audit; 2 of 36 sites refactored to L5 (#304). + * **10c** — Task ownership inventory; 18 prod sites documented (#305). + * **10d** — Backpressure audit; 2 prod unbounded channels documented as C5 (#306). + * **10e** — Timeout coverage audit; 7 prod sites inventoried (findings-only, folded here). + * **10f** — Blocking-IO-in-async audit; 1 real prod hazard fixed via B2 `block_in_place` (#307). + * **10g** — this policy doc + per-crate `# Concurrency` rustdoc + daemon `lib.rs` decomposition + §0 model + §6 spawn-site registry + named `await_holding_*` clippy entries (#308). + * **10h** — `phase_10_final_report.md` (local-only) + tracking-issue closeout (#308). + +Per-site verdict tables live in `docs/dev/baseline/2026-05-19/phase_10_*.md` (local; not in git because the directory is gitignored). The audit script can be re-run at any time to regenerate the inventory. diff --git a/scripts/ci/file_size_exceptions.txt b/scripts/ci/file_size_exceptions.txt index 036227201..e2b727107 100644 --- a/scripts/ci/file_size_exceptions.txt +++ b/scripts/ci/file_size_exceptions.txt @@ -22,4 +22,3 @@ crates/uffs-core/src/aggregate/integration_tests.rs|PERMANENT: Aggregate engine crates/uffs-mft/src/platform/volume.rs|PERMANENT: Volume handle + write-protect fallback handles; splitting would fragment the handle lifecycle crates/uffs-core/src/search/backend.rs|PERMANENT: Search backend facade — DisplayRow, PhaseTimings, SearchResult, FilterMode, MultiDriveBackend orchestrator; cross-cutting types for the whole search pipeline crates/uffs-daemon/src/lifecycle.rs|PERMANENT: LifecycleManager + LifecycleHandle + idle-timer state machine; splitting the cohesive run_idle_timer + load_stalled_force_retire + extended_timeout_for_activity cluster fragments the active-connection / load-stall / shutdown semantics across files -crates/uffs-daemon/src/lib.rs|PERMANENT: run_daemon orchestrator + the cohesive cluster of spawn_load_task / spawn_ipc_servers / spawn_stats_heartbeat / spawn_idle_demote_controller / spawn_usn_refresh_controller / spawn_pressure_subscriber background-task controllers; splitting the controllers across sibling modules fragments the daemon startup graph and the shared imports / DaemonConfig / EventSender wiring diff --git a/scripts/dev/concurrency_audit.sh b/scripts/dev/concurrency_audit.sh index 87b715647..9dcd4c9bc 100755 --- a/scripts/dev/concurrency_audit.sh +++ b/scripts/dev/concurrency_audit.sh @@ -194,6 +194,45 @@ count_regex() { | awk 'BEGIN{s=0} {s+=$1} END{print s+0}' } +# Bulk-collect per-crate match counts in a SINGLE `rg` pass. +# +# Usage: `bulk_per_crate [-F]` +# +# Replaces the N-crates × M-patterns nested loop that used to call +# `count_*` once per (crate, pattern) pair (~126 rg invocations for +# §1 alone, ~1.5 s of process-spawn overhead). This version makes +# **one** rg call per pattern across all of `crates/` and buckets +# the per-file counts into per-crate totals via `awk` + a bash +# nameref — cuts §1's wall time from ~1.5 s to ~0.3 s. +# +# The output array is keyed by crate name (e.g. `uffs-daemon`); the +# `${CRATES[@]}` enumeration is the source of truth for the eventual +# table row order. +bulk_per_crate() { + local pattern="$1" + local -n out_map=$2 # nameref — requires bash 4.3+ + local fixed_flag="${3:-}" # pass `-F` for fixed-string mode + local key value + while IFS=$'\t' read -r key value; do + [[ -z "$key" ]] && continue + out_map[$key]=$(( ${out_map[$key]:-0} + value )) + done < <( + rg "${RG_PROD_GLOBS[@]}" $fixed_flag --count-matches \ + "$pattern" crates 2>/dev/null \ + | awk -F: '{ + # Path shape: crates//<...rs>: + n = split($1, parts, "/"); + if (n >= 2 && parts[1] == "crates") { + crate = parts[2]; + sum[crate] += $2; + } + } + END { + for (c in sum) printf("%s\t%d\n", c, sum[c]); + }' + ) +} + # Count `#[tokio::test]` sites in a directory (INCLUDES tests/, since # those are precisely where #[tokio::test] lives). count_tokio_tests() { @@ -346,17 +385,31 @@ TOTAL_BOUNDED_CH=0 TOTAL_UNBOUNDED_CH=0 TOTAL_TIMEOUT=0 +# Bulk-collect all 9 per-crate counts in one rg pass per pattern. +# Replaces the prior N × M nested loop (~126 rg invocations) and +# brings §1 to roughly 1/5 of its previous wall time. +declare -A C_ASYNC_FN C_SPAWN C_SPAWN_BLK C_STD_LOCK C_TOKIO_LOCK \ + C_ARC_MU C_BOUNDED C_UNBOUNDED C_TIMEOUT +bulk_per_crate 'async fn|async move' C_ASYNC_FN +bulk_per_crate 'tokio::spawn(' C_SPAWN -F +bulk_per_crate 'spawn_blocking' C_SPAWN_BLK -F +bulk_per_crate 'std::sync::(Mutex|RwLock)|sync::Mutex<|sync::RwLock<' C_STD_LOCK +bulk_per_crate 'tokio::sync::(Mutex|RwLock|Semaphore)' C_TOKIO_LOCK +bulk_per_crate 'Arc<(Mutex|RwLock)<' C_ARC_MU +bulk_per_crate 'mpsc::channel\(|watch::channel\(|oneshot::channel\(' C_BOUNDED +bulk_per_crate 'unbounded_channel\(\)|broadcast::channel\(' C_UNBOUNDED +bulk_per_crate 'tokio::time::timeout\b|::timeout_at\(' C_TIMEOUT + for c in "${CRATES[@]}"; do - crate_dir="crates/$c" - async_fn=$(count_regex "$crate_dir" 'async fn|async move') - spawn=$(count_fixed "$crate_dir" 'tokio::spawn(') - spawn_blk=$(count_fixed "$crate_dir" 'spawn_blocking') - std_lock=$(count_regex "$crate_dir" 'std::sync::(Mutex|RwLock)|sync::Mutex<|sync::RwLock<') - tokio_lock=$(count_regex "$crate_dir" 'tokio::sync::(Mutex|RwLock|Semaphore)') - arc_mu=$(count_regex "$crate_dir" 'Arc<(Mutex|RwLock)<') - bounded=$(count_regex "$crate_dir" 'mpsc::channel\(|watch::channel\(|oneshot::channel\(') - unbounded=$(count_regex "$crate_dir" 'unbounded_channel\(\)|broadcast::channel\(') - timeout=$(count_regex "$crate_dir" 'tokio::time::timeout\b|::timeout_at\(') + async_fn=${C_ASYNC_FN[$c]:-0} + spawn=${C_SPAWN[$c]:-0} + spawn_blk=${C_SPAWN_BLK[$c]:-0} + std_lock=${C_STD_LOCK[$c]:-0} + tokio_lock=${C_TOKIO_LOCK[$c]:-0} + arc_mu=${C_ARC_MU[$c]:-0} + bounded=${C_BOUNDED[$c]:-0} + unbounded=${C_UNBOUNDED[$c]:-0} + timeout=${C_TIMEOUT[$c]:-0} TOTAL_ASYNC_FN=$((TOTAL_ASYNC_FN + async_fn)) TOTAL_SPAWN=$((TOTAL_SPAWN + spawn))