diff --git a/crates/karva/tests/it/cancel.rs b/crates/karva/tests/it/cancel.rs new file mode 100644 index 00000000..50e5a525 --- /dev/null +++ b/crates/karva/tests/it/cancel.rs @@ -0,0 +1,117 @@ +#![cfg(unix)] + +use std::process::{Command, Stdio}; +use std::time::Duration; + +use insta::assert_snapshot; + +use crate::common::TestContext; + +#[test] +fn test_ctrlc_emits_cancellation_banner() { + // Mix of fast tests (which complete and print PASS lines) and slow + // tests (which keep workers busy when SIGINT arrives) so the snapshot + // exercises both code paths and shows non-trivial output. + let context = TestContext::with_file( + "test_mixed.py", + r" +import time + +def test_fast_a(): pass +def test_fast_b(): pass +def test_fast_c(): pass +def test_fast_d(): pass +def test_fast_e(): pass +def test_slow_a(): time.sleep(60) +def test_slow_b(): time.sleep(60) +def test_slow_c(): time.sleep(60) +def test_slow_d(): time.sleep(60) +def test_slow_e(): time.sleep(60) +", + ); + + let child = context + .command() + .args(["--num-workers", "2"]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("Failed to spawn karva"); + + let pid = child.id(); + + // Wait long enough for karva to launch its workers, run the fast + // tests, and reach the wait-for-completion loop blocked on the slow + // tests. The slow tests sleep for 60s so karva will still be running + // when we send the signal. + std::thread::sleep(Duration::from_secs(5)); + + let status = Command::new("kill") + .args(["-s", "INT", &pid.to_string()]) + .status() + .expect("Failed to invoke kill"); + assert!(status.success(), "kill -s INT {pid} failed"); + + let output = child + .wait_with_output() + .expect("Failed to wait on karva process"); + + let mut stdout = String::from_utf8_lossy(&output.stdout).into_owned(); + // Which two of the five slow tests are in flight when SIGINT arrives + // depends on partitioning and timing, so collapse the suffix to keep + // the snapshot stable across runs. + stdout = regex::Regex::new(r"test_slow_[a-e]") + .unwrap() + .replace_all(&stdout, "test_slow_X") + .into_owned(); + // Worker scheduling means PASS and SIGINT lines can appear in any + // order. Sort each block independently for a deterministic snapshot. + // The ordering of every other line (Starting / Cancelling / summary + // / error) is deterministic. + sort_block_starting_with(&mut stdout, "PASS"); + sort_block_starting_with(&mut stdout, "SIGINT"); + + assert_snapshot!(stdout, @r" + Starting 10 tests across 2 workers + PASS [TIME] test_mixed::test_fast_a + PASS [TIME] test_mixed::test_fast_b + PASS [TIME] test_mixed::test_fast_c + PASS [TIME] test_mixed::test_fast_d + PASS [TIME] test_mixed::test_fast_e + Cancelling due to interrupt: 10 tests still running + SIGINT [TIME] test_mixed::test_slow_X + SIGINT [TIME] test_mixed::test_slow_X + ──────────── + Summary [TIME] 0 tests run: 0 passed, 0 skipped + error: no tests to run + (hint: use `--no-tests` to customize) + "); +} + +/// Sort the contiguous block of lines whose first token is `label` so +/// the snapshot is deterministic. Workers run in parallel so PASS- and +/// SIGINT-line ordering is racy, but every other line is emitted by +/// the orchestrator in a fixed order. +fn sort_block_starting_with(stdout: &mut String, label: &str) { + let lines: Vec<&str> = stdout.lines().collect(); + let first = lines.iter().position(|l| l.trim_start().starts_with(label)); + let Some(start) = first else { return }; + let end = start + + lines[start..] + .iter() + .take_while(|l| l.trim_start().starts_with(label)) + .count(); + let mut sorted: Vec = lines[start..end].iter().map(ToString::to_string).collect(); + sorted.sort(); + let mut rebuilt = lines[..start].join("\n"); + if !rebuilt.is_empty() { + rebuilt.push('\n'); + } + rebuilt.push_str(&sorted.join("\n")); + rebuilt.push('\n'); + rebuilt.push_str(&lines[end..].join("\n")); + if stdout.ends_with('\n') { + rebuilt.push('\n'); + } + *stdout = rebuilt; +} diff --git a/crates/karva/tests/it/main.rs b/crates/karva/tests/it/main.rs index c267a0a2..1dde936b 100644 --- a/crates/karva/tests/it/main.rs +++ b/crates/karva/tests/it/main.rs @@ -3,6 +3,7 @@ pub(crate) mod common; mod r#async; mod basic; mod cache; +mod cancel; mod configuration; mod coverage; mod discovery; diff --git a/crates/karva_cache/src/artifact.rs b/crates/karva_cache/src/artifact.rs index a8fa670e..44cfdea3 100644 --- a/crates/karva_cache/src/artifact.rs +++ b/crates/karva_cache/src/artifact.rs @@ -32,6 +32,10 @@ pub enum CacheFile { FailFastSignal, /// Cache-root JSON: list of last-run failed test names. LastFailed, + /// Per-worker JSON: name + start time of the test currently executing, + /// or absent when the worker is between tests. Used by the orchestrator + /// to render per-test `SIGINT` lines on Ctrl+C. + CurrentTest, } impl CacheFile { @@ -46,6 +50,7 @@ impl CacheFile { Self::Coverage => "coverage.json", Self::FailFastSignal => "fail-fast", Self::LastFailed => "last-failed.json", + Self::CurrentTest => "current_test.json", } } diff --git a/crates/karva_cache/src/cache.rs b/crates/karva_cache/src/cache.rs index c413b6f7..fd8f85e9 100644 --- a/crates/karva_cache/src/cache.rs +++ b/crates/karva_cache/src/cache.rs @@ -6,10 +6,24 @@ use anyhow::Result; use camino::{Utf8Path, Utf8PathBuf}; use karva_diagnostic::{FlakyTest, TestResultStats, TestRunResult}; use ruff_db::diagnostic::{DisplayDiagnosticConfig, DisplayDiagnostics, FileResolver}; +use serde::{Deserialize, Serialize}; use crate::artifact::{CacheFile, read_json, read_text, write_json, write_json_if_nonempty}; use crate::{RUN_PREFIX, RunHash, WORKER_PREFIX, worker_folder}; +/// Snapshot of the test a worker is currently executing. +/// +/// Workers update this file at the start of each test and remove it on +/// completion; the orchestrator reads it on Ctrl+C to render per-test +/// `SIGINT` lines. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CurrentTest { + /// Fully qualified test name (`module::function[params]`). + pub name: String, + /// Wall-clock start of the test, milliseconds since the Unix epoch. + pub start_unix_ms: u64, +} + /// Aggregated test results collected from all worker processes. #[derive(Default)] pub struct AggregatedResults { @@ -67,6 +81,19 @@ impl RunCache { CacheFile::Coverage.path_in(&self.worker_dir(worker_id)) } + /// Path to the per-worker file describing the test currently executing. + pub fn current_test_file(&self, worker_id: usize) -> Utf8PathBuf { + CacheFile::CurrentTest.path_in(&self.worker_dir(worker_id)) + } + + /// Reads the snapshot of which test the worker is currently running. + /// Returns `None` if the worker is between tests or hasn't started yet. + pub fn read_current_test(&self, worker_id: usize) -> Option { + read_json::(&self.worker_dir(worker_id), CacheFile::CurrentTest) + .ok() + .flatten() + } + /// Returns paths to every per-worker coverage file that exists for this /// run, sorted by worker directory. Used to feed the coverage report. pub fn coverage_files(&self) -> Result> { diff --git a/crates/karva_cache/src/lib.rs b/crates/karva_cache/src/lib.rs index 5d4d0d0a..1d51edfa 100644 --- a/crates/karva_cache/src/lib.rs +++ b/crates/karva_cache/src/lib.rs @@ -3,8 +3,8 @@ pub(crate) mod cache; pub(crate) mod hash; pub use cache::{ - AggregatedResults, PruneResult, RunCache, clean_cache, prune_cache, read_last_failed, - read_recent_durations, write_last_failed, + AggregatedResults, CurrentTest, PruneResult, RunCache, clean_cache, prune_cache, + read_last_failed, read_recent_durations, write_last_failed, }; pub use hash::RunHash; pub use karva_diagnostic::{DisplayFlakyTests, FlakyTest}; diff --git a/crates/karva_diagnostic/src/reporter.rs b/crates/karva_diagnostic/src/reporter.rs index a5e2bd79..128f2835 100644 --- a/crates/karva_diagnostic/src/reporter.rs +++ b/crates/karva_diagnostic/src/reporter.rs @@ -1,6 +1,7 @@ use std::fmt::Write; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use camino::Utf8PathBuf; use colored::Colorize; use karva_logging::time::format_duration_bracketed; use karva_logging::{Printer, StatusLevel}; @@ -41,6 +42,21 @@ pub trait Reporter: Send + Sync { fn report_test_slow(&self, test_name: &QualifiedTestName, duration: Duration) { let _ = (test_name, duration); } + + /// Called immediately before a test starts executing. + /// + /// Used by reporters that track in-flight tests for cancellation + /// reporting; default is a no-op. + fn report_test_started(&self, test_name: &QualifiedTestName) { + let _ = test_name; + } + + /// Called when a test finishes (passed, failed, or skipped) so the + /// reporter can clear any in-flight state recorded by + /// [`Self::report_test_started`]. Default no-op. + fn report_test_finished(&self, test_name: &QualifiedTestName) { + let _ = test_name; + } } fn show_for_status_level(level: StatusLevel, kind: &IndividualTestResultKind) -> bool { @@ -77,11 +93,27 @@ impl Reporter for DummyReporter { /// A reporter that outputs test results to stdout as they complete. pub struct TestCaseReporter { printer: Printer, + /// Optional path to a JSON file describing the test currently + /// executing. The orchestrator reads this on Ctrl+C to render + /// per-test `SIGINT` lines. + progress_file: Option, } impl TestCaseReporter { pub fn new(printer: Printer) -> Self { - Self { printer } + Self { + printer, + progress_file: None, + } + } + + /// Direct the reporter to write the currently running test's name and + /// start time to `path` whenever a test begins, and remove the file + /// when it ends. + #[must_use] + pub fn with_progress_file(mut self, path: Utf8PathBuf) -> Self { + self.progress_file = Some(path); + self } } @@ -163,6 +195,51 @@ impl Reporter for TestCaseReporter { ) .ok(); } + + fn report_test_started(&self, test_name: &QualifiedTestName) { + let Some(path) = self.progress_file.as_ref() else { + return; + }; + let start_unix_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX)) + .unwrap_or(0); + // Avoid pulling in `karva_cache::CurrentTest` here (would be a + // circular dep). The cache crate deserialises the same JSON shape. + let body = format!( + "{{\"name\":{},\"start_unix_ms\":{start_unix_ms}}}", + json_string(&test_name.to_string()), + ); + let _ = std::fs::write(path, body); + } + + fn report_test_finished(&self, _test_name: &QualifiedTestName) { + if let Some(path) = self.progress_file.as_ref() { + let _ = std::fs::remove_file(path); + } + } +} + +/// Quote a string for JSON. Stays in this crate so we don't take a hard +/// dependency on `serde_json` just for one field. +fn json_string(s: &str) -> String { + let mut out = String::with_capacity(s.len() + 2); + out.push('"'); + for ch in s.chars() { + match ch { + '"' => out.push_str("\\\""), + '\\' => out.push_str("\\\\"), + '\n' => out.push_str("\\n"), + '\r' => out.push_str("\\r"), + '\t' => out.push_str("\\t"), + c if (c as u32) < 0x20 => { + let _ = write!(out, "\\u{:04x}", c as u32); + } + c => out.push(c), + } + } + out.push('"'); + out } /// The width that result labels (`PASS`, `FAIL`, `SKIP`, `SLOW`, `TRY N PASS`, diff --git a/crates/karva_runner/src/orchestration.rs b/crates/karva_runner/src/orchestration.rs index 593603dd..25cdb6f7 100644 --- a/crates/karva_runner/src/orchestration.rs +++ b/crates/karva_runner/src/orchestration.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use std::fmt::Write; use std::process::{Child, Stdio}; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use camino::Utf8PathBuf; @@ -16,7 +16,7 @@ use karva_cache::{ use karva_cli::SubTestCommand; use karva_collector::{CollectedPackage, CollectionSettings}; use karva_logging::Printer; -use karva_logging::time::format_duration; +use karva_logging::time::{format_duration, format_duration_bracketed}; use karva_project::Project; use crate::binary::find_karva_worker_binary; @@ -24,19 +24,40 @@ use crate::collection::ParallelCollector; use crate::partition::{Partition, partition_collected_tests}; use crate::worker_args::{WorkerSpawn, worker_command}; +/// Width that result labels (`PASS`, `FAIL`, `SIGINT`) are right-padded to so +/// columns align. Mirrors the constant in `karva_diagnostic::reporter`. +const LABEL_COLUMN_WIDTH: usize = 12; + +/// How `wait_for_completion` exited. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum WaitOutcome { + /// Every worker exited on its own. + AllCompleted, + /// Ctrl+C was received; remaining workers must be killed. + Cancelled, + /// A worker hit the fail-fast budget; remaining workers must be killed. + FailFast, +} + #[derive(Debug)] struct Worker { id: usize, child: Child, start_time: Instant, + /// Number of tests assigned to this worker. Used to give a useful count in + /// the cancellation summary; the orchestrator can't see worker progress + /// (workers only flush results to the cache on exit), so this is an upper + /// bound on the tests still running in the worker. + test_count: usize, } impl Worker { - fn new(id: usize, child: Child) -> Self { + fn new(id: usize, child: Child, test_count: usize) -> Self { Self { id, child, start_time: Instant::now(), + test_count, } } @@ -51,8 +72,8 @@ struct WorkerManager { } impl WorkerManager { - fn spawn(&mut self, worker_id: usize, child: Child) { - self.workers.push(Worker::new(worker_id, child)); + fn spawn(&mut self, worker_id: usize, child: Child, test_count: usize) { + self.workers.push(Worker::new(worker_id, child, test_count)); } /// Wait for all workers to complete. @@ -62,9 +83,9 @@ impl WorkerManager { &mut self, shutdown_rx: Option<&Receiver<()>>, cache: Option<&RunCache>, - ) { + ) -> WaitOutcome { if self.workers.is_empty() { - return; + return WaitOutcome::AllCompleted; } tracing::info!( @@ -77,7 +98,7 @@ impl WorkerManager { match rx.try_recv() { Ok(()) | Err(TryRecvError::Disconnected) => { tracing::info!("Shutdown requested — stopping remaining workers"); - break; + return WaitOutcome::Cancelled; } Err(TryRecvError::Empty) => {} } @@ -87,7 +108,7 @@ impl WorkerManager { && cache.has_fail_fast_signal() { tracing::info!("Fail-fast signal received — stopping remaining workers"); - break; + return WaitOutcome::FailFast; } self.workers @@ -118,7 +139,7 @@ impl WorkerManager { if self.workers.is_empty() { tracing::info!("All workers completed"); - break; + return WaitOutcome::AllCompleted; } std::thread::sleep(WORKER_POLL_INTERVAL); @@ -138,6 +159,93 @@ impl WorkerManager { let _ = worker.child.wait(); } } + + /// Stop remaining workers and emit nextest-style cancellation lines. + /// + /// Each worker writes a `current_test.json` file at the start of every + /// test and removes it when the test finishes. We read those files + /// *before* killing — once we kill the worker, that file may be removed + /// by an in-flight finalizer or simply lost — and remember a + /// `(worker_id, test name, test start time)` snapshot for each. + /// + /// Workers are killed and reaped before we print so any in-flight + /// `PASS`/`FAIL` lines they were writing to the inherited stdout land + /// before our banner; otherwise the cancellation block interleaves + /// with worker output. A short settle pause lets any kernel-buffered + /// writes drain. + fn cancel_and_kill(&mut self, printer: Printer, cache: &RunCache) { + if self.workers.is_empty() { + return; + } + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX)) + .unwrap_or(0); + + let in_flight: Vec<_> = self + .workers + .iter() + .map(|worker| { + let current = cache.read_current_test(worker.id); + let elapsed = current.as_ref().and_then(|c| { + now_ms + .checked_sub(c.start_unix_ms) + .map(Duration::from_millis) + }); + (worker.id, current.map(|c| c.name), elapsed) + }) + .collect(); + + let total_tests: usize = self.workers.iter().map(|w| w.test_count).sum(); + + for worker in &mut self.workers { + let _ = worker.child.kill(); + } + for worker in &mut self.workers { + let _ = worker.child.wait(); + } + std::thread::sleep(STDOUT_SETTLE); + + let mut stdout = printer.stream_for_test_result().lock(); + let cancel_label = "Cancelling".yellow().bold(); + let interrupt_label = "interrupt".yellow().bold(); + let _ = writeln!( + stdout, + " {cancel_label} due to {interrupt_label}: {total_tests} tests still running" + ); + + let label = "SIGINT".yellow().bold(); + let padding = " ".repeat(LABEL_COLUMN_WIDTH.saturating_sub("SIGINT".len())); + for (worker_id, test_name, elapsed) in in_flight { + let duration_str = format_duration_bracketed(elapsed.unwrap_or_default()); + match test_name { + Some(name) => { + let colored = format_in_flight_test(&name); + let _ = writeln!(stdout, "{padding}{label} {duration_str} {colored}"); + } + None => { + let _ = writeln!( + stdout, + "{padding}{label} {duration_str} worker {worker_id} (between tests)" + ); + } + } + } + } +} + +/// Render a `module::function[params]` test name as it was serialised by +/// the worker (`QualifiedTestName::Display`), colouring the module cyan +/// and the function blue+bold to match the per-test result line format. +fn format_in_flight_test(name: &str) -> String { + if let Some((module, rest)) = name.split_once("::") { + let module = module.cyan(); + let rest = rest.blue().bold(); + format!("{module}::{rest}") + } else { + name.blue().bold().to_string() + } } pub struct ParallelTestConfig { @@ -181,7 +289,7 @@ fn spawn_workers(spawn: &WorkerSpawn, partitions: &[Partition]) -> Result Result { + // Install the Ctrl+C handler before any potentially long-running work + // (collection, partitioning, worker spawn). Otherwise an early SIGINT + // hits the default disposition and the run terminates silently with no + // cancellation banner. + let shutdown_rx = if config.create_ctrlc_handler { + Some(shutdown_receiver()) + } else { + None + }; + let collected = collect_tests(project)?; let total_tests = collected.test_count(); @@ -325,16 +443,14 @@ pub fn run_parallel_tests( }; let mut worker_manager = spawn_workers(&spawn, &partitions)?; - let shutdown_rx = if config.create_ctrlc_handler { - Some(shutdown_receiver()) - } else { - None - }; - let max_fail_cache = project.settings().max_fail().has_limit().then_some(&cache); - worker_manager.wait_for_completion(shutdown_rx, max_fail_cache); - worker_manager.kill_remaining(); + let outcome = worker_manager.wait_for_completion(shutdown_rx, max_fail_cache); + if outcome == WaitOutcome::Cancelled { + worker_manager.cancel_and_kill(printer, &cache); + } else { + worker_manager.kill_remaining(); + } let results = cache.aggregate_results()?; @@ -356,3 +472,6 @@ pub fn run_parallel_tests( const MIN_TESTS_PER_WORKER: usize = 5; const WORKER_POLL_INTERVAL: Duration = Duration::from_millis(10); +/// Pause after killing workers to let kernel-buffered output drain to +/// stdout before we emit the cancellation banner. +const STDOUT_SETTLE: Duration = Duration::from_millis(50); diff --git a/crates/karva_test_semantic/src/context.rs b/crates/karva_test_semantic/src/context.rs index b6df8042..c182f512 100644 --- a/crates/karva_test_semantic/src/context.rs +++ b/crates/karva_test_semantic/src/context.rs @@ -72,6 +72,19 @@ impl<'a> Context<'a> { self.result.borrow().clone().into_sorted() } + /// Record the start of a test execution. Forwarded to the reporter + /// so cancellation logic can render per-test `SIGINT` lines naming + /// the in-flight test. + pub fn report_test_started(&self, test_case_name: &QualifiedTestName) { + self.reporter.report_test_started(test_case_name); + } + + /// Pair to [`Self::report_test_started`]: clears the in-flight marker + /// once the test completes (passed, failed, or skipped). + pub fn report_test_finished(&self, test_case_name: &QualifiedTestName) { + self.reporter.report_test_finished(test_case_name); + } + pub fn register_test_case_result( &self, test_case_name: &QualifiedTestName, diff --git a/crates/karva_test_semantic/src/runner/package_runner.rs b/crates/karva_test_semantic/src/runner/package_runner.rs index 9e1e777f..670f165d 100644 --- a/crates/karva_test_semantic/src/runner/package_runner.rs +++ b/crates/karva_test_semantic/src/runner/package_runner.rs @@ -545,12 +545,14 @@ impl<'ctx, 'a> PackageRunner<'ctx, 'a> { }; let configured_retries = self.context.settings().test().retry; + self.context.report_test_started(&qualified_test_name); let RetryOutcome { test_result, attempt, max_attempts, was_retried, } = self.run_with_retries(py, &qualified_test_name, configured_retries, run_test); + self.context.report_test_finished(&qualified_test_name); let report_ctx = VariantReportCtx { name: &name, diff --git a/crates/karva_worker/src/cli.rs b/crates/karva_worker/src/cli.rs index 246b6c8d..704a216e 100644 --- a/crates/karva_worker/src/cli.rs +++ b/crates/karva_worker/src/cli.rs @@ -166,10 +166,16 @@ fn run(f: impl FnOnce(Vec) -> Vec) -> anyhow::Result = if matches!(printer.status_level(), StatusLevel::None) { Box::new(DummyReporter) } else { - Box::new(TestCaseReporter::new(printer)) + Box::new(TestCaseReporter::new(printer).with_progress_file(progress_file)) }; let result = karva_test_semantic::run_tests(