diff --git a/crates/prover/src/worker/builder.rs b/crates/prover/src/worker/builder.rs index 1b562f2b92..80fbb94ae1 100644 --- a/crates/prover/src/worker/builder.rs +++ b/crates/prover/src/worker/builder.rs @@ -297,6 +297,10 @@ impl SP1WorkerBuilder { /// Create a [SP1WorkerBuilder] for a CPU worker. pub fn cpu_worker_builder() -> SP1WorkerBuilder { + // Initialize the rayon thread pool before any parallel work. + // Defaults to physical cores (not logical/SMT) to avoid work-stealing contention. + slop_futures::rayon::init_global_pool(); + // Create the prover permits, setting it to having 4 provers. let prover_permits = ProverSemaphore::new(4); diff --git a/slop/crates/futures/Cargo.toml b/slop/crates/futures/Cargo.toml index 2572453bec..ff0785b4d1 100644 --- a/slop/crates/futures/Cargo.toml +++ b/slop/crates/futures/Cargo.toml @@ -14,6 +14,7 @@ rayon = { workspace = true } tokio = { workspace = true, features = ["sync", "rt"] } futures = { workspace = true } thiserror = { workspace = true } +num_cpus = { workspace = true } crossbeam = { workspace = true } pin-project = { workspace = true } diff --git a/slop/crates/futures/src/rayon.rs b/slop/crates/futures/src/rayon.rs index d01980174a..349bf19315 100644 --- a/slop/crates/futures/src/rayon.rs +++ b/slop/crates/futures/src/rayon.rs @@ -9,8 +9,34 @@ use crate::handle::TaskHandle; static GLOBAL_POOL: OnceLock<()> = OnceLock::new(); -fn init_global_pool() { - rayon::ThreadPoolBuilder::new().panic_handler(panic_handler).build_global().ok(); +/// Initialize the rayon global thread pool. +/// +/// Thread count selection (when `RAYON_NUM_THREADS` is not set): +/// - Uses `min(available_parallelism, physical_cores)` to avoid both +/// SMT oversubscription (crossbeam contention) and container overcommit. +/// - `available_parallelism` respects cgroup CPU quotas (K8s `resources.limits.cpu`, +/// `docker --cpus=N`) and affinity masks, so this works in containers. +/// - `get_physical()` caps it to avoid SMT siblings on bare metal. +/// +/// Must be called before any rayon work (par_iter, spawn, etc.) to take effect. +/// Safe to call multiple times — only the first call configures the pool. +pub fn init_global_pool() { + GLOBAL_POOL.get_or_init(|| { + let mut builder = rayon::ThreadPoolBuilder::new().panic_handler(panic_handler); + + if std::env::var("RAYON_NUM_THREADS").is_err() { + let cgroup_aware = + std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); + let physical = num_cpus::get_physical(); + let threads = cgroup_aware.min(physical); + tracing::info!( + "rayon pool: using {threads} threads (available_parallelism={cgroup_aware}, physical={physical})" + ); + builder = builder.num_threads(threads); + } + + builder.build_global().ok(); + }); } fn panic_handler(panic_payload: Box) { @@ -40,7 +66,7 @@ where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - GLOBAL_POOL.get_or_init(init_global_pool); + init_global_pool(); let (tx, rx) = oneshot::channel(); let (abort_handle, _) = AbortHandle::new_pair(); rayon::spawn(move || { @@ -56,7 +82,7 @@ where F: FnOnce(AbortHandle) -> R + Send + 'static, R: Send + 'static, { - GLOBAL_POOL.get_or_init(init_global_pool); + init_global_pool(); let (tx, rx) = oneshot::channel(); let (abort_handle, abort_registration) = AbortHandle::new_pair(); rayon::spawn(move || {