From 6c5355a96f50f2c1d5bcf81fba46dfc7f416ecf2 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Apr 2026 19:27:12 +0000 Subject: [PATCH 1/9] turbo-tasks-backend: prevent duplicate restores with restoring/restored bits Add data_restoring/meta_restoring transient flag bits, a shared restored Event, and synchronous EventListener::wait() to coordinate concurrent restore attempts. Only one thread performs I/O per task category; others wait via the event. Batch restores in prepare_tasks_with_callback use a 4-phase pipeline (classify, I/O, apply, wait) with a fast path when no backing storage exists. Errors are deferred until all restoring bits are cleared so waiting threads are never left hanging. Co-Authored-By: Claude --- .../src/backend/operation/mod.rs | 706 +++++++++++++----- .../src/backend/storage.rs | 17 +- .../src/backend/storage_schema.rs | 33 + turbopack/crates/turbo-tasks/src/event.rs | 28 + 4 files changed, 612 insertions(+), 172 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 03111a29b0eb13..3bc3aba6d4bca9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -12,8 +12,8 @@ use std::{ sync::Arc, }; +use anyhow::{Context, bail}; use bincode::{Decode, Encode}; -use tracing::trace_span; use turbo_tasks::{ CellId, FxIndexMap, TaskExecutionReason, TaskId, TaskPriority, TurboTasksBackendApi, TurboTasksCallApi, TypedSharedReference, backend::CachedTaskType, @@ -172,58 +172,92 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { } } + fn should_check_backing_storage(&self) -> bool { + self.backend.should_restore() + } + fn restore_task_data( &self, task_id: TaskId, category: SpecificTaskDataCategory, - ) -> TaskStorage { - let mut storage = TaskStorage::default(); - if !self.backend.should_restore() { - return storage; + ) -> anyhow::Result { + if !self.should_check_backing_storage() { + // If we don't need to restore, we can just return an empty storage + return Ok(TaskStorage::default()); } - let result = self - .backend + let mut storage = TaskStorage::default(); + self.backend .backing_storage - .lookup_data(task_id, category, &mut storage); - - match result { - Ok(()) => storage, - Err(e) => { - panic!( - "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!("{category:?} for {task_id})")) - ) - } - } + .lookup_data(task_id, category, &mut storage) + .with_context(|| format!("Failed to restore {category:?} for {task_id}"))?; + Ok(storage) } fn restore_task_data_batch( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Option> { - debug_assert!( - task_ids.len() > 1, - "Use restore_task_data_typed for single task" - ); - if !self.backend.should_restore() { - return None; + ) -> anyhow::Result>> { + debug_assert!(task_ids.len() > 1, "Use restore_task_data for single task"); + if !self.should_check_backing_storage() { + // If we don't need to restore, we return None + return Ok(None); } let result = self .backend .backing_storage - .batch_lookup_data(task_ids, category); - match result { - Ok(result) => Some(result), - Err(e) => { - panic!( - "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!( - "{category:?} for batch of {} tasks", - task_ids.len() - )) + .batch_lookup_data(task_ids, category) + .with_context(|| { + format!( + "Failed to restore {category:?} for batch of {} tasks", + task_ids.len() ) + })?; + Ok(Some(result)) + } + + /// Waits for another thread's in-progress restore of a task to complete. + /// + /// Precondition: the caller must have observed `is_restoring()` == true for + /// `task_id`+`category` and must have dropped the task lock before calling this. + /// + /// Returns `Ok(())` when the task is restored (the restoring bits are cleared and the + /// restored bits are set), or `Err` if the restoring thread failed (restoring was cleared + /// without setting restored). + fn wait_for_restoring_task( + &self, + task_id: TaskId, + category: SpecificTaskDataCategory, + ) -> anyhow::Result<()> { + let task_category = TaskDataCategory::from(category); + loop { + // Register a listener BEFORE checking the bits (avoids a lost-wakeup race). + let listener = self.backend.storage.restored.listen(); + + let task = self.backend.storage.access_mut(task_id); + let is_restoring = task.flags.is_restoring(task_category); + let is_restored = task.flags.is_restored(task_category); + drop(task); + + if is_restored { + // The restoring thread finished successfully; we're done waiting. + return Ok(()); + } + if !is_restoring { + // The restoring bit was cleared without setting the restored bit. + // This means the restoring thread encountered an error. + bail!("restoring failed"); } + + // Still restoring; block until notified, then loop to re-check. + listener.wait(); + } + } + + /// Panics if waiting for another thread's restore of `task_id`+`category` fails. + fn wait_for_restore_or_panic(&self, task_id: TaskId, category: SpecificTaskDataCategory) { + if let Err(e) = self.wait_for_restoring_task(task_id, category) { + panic!("Restore of {category:?} for task {task_id} failed in another thread: {e:?}"); } } @@ -231,7 +265,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &mut self, task_ids: impl IntoIterator, call_prepared_task_callback_for_transient_tasks: bool, - reason: &'static str, + _reason: &'static str, mut prepared_task_callback: impl FnMut( &mut Self, TaskId, @@ -239,13 +273,28 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { StorageWriteGuard<'e>, ), ) { - let span = trace_span!( - "prepare_tasks_with_callback", - reason, - requested_data = tracing::field::Empty, - requested_meta = tracing::field::Empty, - ); - let _guard = span.enter(); + // Fast path: no backing storage to restore from — mark all tasks as restored + // and invoke callbacks directly, skipping the I/O pipeline. + if !self.should_check_backing_storage() { + for (task_id, category) in task_ids { + self.task_lock_counter.acquire(); + let mut task = self.backend.storage.access_mut(task_id); + if !task.flags.is_restored(category) { + task.flags.set_restored(if task_id.is_transient() { + TaskDataCategory::All + } else { + category + }); + } + self.task_lock_counter.release(); + if !task_id.is_transient() || call_prepared_task_callback_for_transient_tasks { + prepared_task_callback(self, task_id, category, task); + } + // else: task guard is dropped here + } + return; + } + let mut data_count = 0; let mut meta_count = 0; let mut all_count = 0; @@ -253,10 +302,11 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { .into_iter() .filter(|&(id, category)| { if id.is_transient() { - // Transient tasks don't need DB restoration (restored flags are - // set at allocation time), so just invoke the callback directly. if call_prepared_task_callback_for_transient_tasks { - let task = self.backend.storage.access_mut(id); + let mut task = self.backend.storage.access_mut(id); + if !task.flags.is_restored(category) { + task.flags.set_restored(TaskDataCategory::All); + } prepared_task_callback(self, id, category, task); } false @@ -269,127 +319,339 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { TaskDataCategory::Meta => meta_count += 1, TaskDataCategory::All => all_count += 1, }) - .map(|(id, category)| (id, category, None, None)) + .map(|(id, category)| TaskRestoreEntry { + task_id: id, + category, + data_restore_result: None, + meta_restore_result: None, + wait_data: false, + wait_meta: false, + task_type: None, + self_restored: false, + }) .collect::>(); data_count += all_count; meta_count += all_count; - span.record("requested_data", data_count); - span.record("requested_meta", meta_count); let mut tasks_to_restore_for_data = Vec::with_capacity(data_count); - let mut tasks_to_restore_for_data_indicies = Vec::with_capacity(data_count); + let mut tasks_to_restore_for_data_indices = Vec::with_capacity(data_count); let mut tasks_to_restore_for_meta = Vec::with_capacity(meta_count); - let mut tasks_to_restore_for_meta_indicies = Vec::with_capacity(meta_count); - for (i, &(task_id, category, _, _)) in tasks.iter().enumerate() { + let mut tasks_to_restore_for_meta_indices = Vec::with_capacity(meta_count); + + // --- Phase 1a: Classify tasks under lock --- + // For each task, determine whether we will restore it ourselves or wait for another thread. + let mut any_waiting = false; + for (i, entry) in tasks.iter_mut().enumerate() { + let task_id = entry.task_id; + let category = entry.category; self.task_lock_counter.acquire(); - - let task = self.backend.storage.access_mut(task_id); + let mut task = self.backend.storage.access_mut(task_id); let mut ready = true; - if matches!(category, TaskDataCategory::Data | TaskDataCategory::All) - && !task.flags.is_restored(TaskDataCategory::Data) - { - tasks_to_restore_for_data.push(task_id); - tasks_to_restore_for_data_indicies.push(i); - ready = false; + + if category.includes_data() && !task.flags.data_restored() { + if task.flags.data_restoring() { + // Another thread is restoring data; we'll wait in Phase 3 + entry.wait_data = true; + any_waiting = true; + ready = false; + } else { + // We claim responsibility for restoring data + task.flags.set_data_restoring(true); + tasks_to_restore_for_data.push(task_id); + tasks_to_restore_for_data_indices.push(i); + ready = false; + } } - if matches!(category, TaskDataCategory::Meta | TaskDataCategory::All) - && !task.flags.is_restored(TaskDataCategory::Meta) - { - tasks_to_restore_for_meta.push(task_id); - tasks_to_restore_for_meta_indicies.push(i); - ready = false; + + if category.includes_meta() && !task.flags.meta_restored() { + if task.flags.meta_restoring() { + // Another thread is restoring meta; we'll wait in Phase 3 + entry.wait_meta = true; + any_waiting = true; + ready = false; + } else { + // We claim responsibility for restoring meta + task.flags.set_meta_restoring(true); + tasks_to_restore_for_meta.push(task_id); + tasks_to_restore_for_meta_indices.push(i); + ready = false; + } } + self.task_lock_counter.release(); if ready { prepared_task_callback(self, task_id, category, task); } + // else: task guard is dropped here } - if tasks_to_restore_for_meta.is_empty() && tasks_to_restore_for_data.is_empty() { + + if tasks_to_restore_for_data.is_empty() + && tasks_to_restore_for_meta.is_empty() + && !any_waiting + { return; } + // --- Phase 1b: Batch I/O for tasks we claimed --- + + // Data I/O match tasks_to_restore_for_data.len() { 0 => {} 1 => { let task_id = tasks_to_restore_for_data[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Data); - let idx = tasks_to_restore_for_data_indicies[0]; - tasks[idx].2 = Some(data); + let idx = tasks_to_restore_for_data_indices[0]; + tasks[idx].data_restore_result = + Some(self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); } _ => { - if let Some(data) = self.restore_task_data_batch( + match self.restore_task_data_batch( &tasks_to_restore_for_data, SpecificTaskDataCategory::Data, ) { - data.into_iter() - .zip(tasks_to_restore_for_data_indicies) - .for_each(|(item, idx)| { - tasks[idx].2 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_data_indicies { - tasks[idx].2 = Some(TaskStorage::default()); + Ok(Some(data)) => { + for (item, &idx) in data.into_iter().zip(&tasks_to_restore_for_data_indices) + { + tasks[idx].data_restore_result = Some(Ok(item)); + } + } + Ok(None) => { + // should_check_backing_storage() was false; treat as empty + for &idx in &tasks_to_restore_for_data_indices { + tasks[idx].data_restore_result = Some(Ok(TaskStorage::default())); + } + } + Err(e) => { + // Batch failure: distribute the error to each affected task + let msg = format!("{e:?}"); + for &idx in &tasks_to_restore_for_data_indices { + tasks[idx].data_restore_result = + Some(Err(anyhow::anyhow!("Batch data restore failed: {msg}"))); + } } } } } + + // Meta I/O match tasks_to_restore_for_meta.len() { 0 => {} 1 => { let task_id = tasks_to_restore_for_meta[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Meta); - let idx = tasks_to_restore_for_meta_indicies[0]; - tasks[idx].3 = Some(data); + let idx = tasks_to_restore_for_meta_indices[0]; + tasks[idx].meta_restore_result = + Some(self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); } _ => { - if let Some(data) = self.restore_task_data_batch( + match self.restore_task_data_batch( &tasks_to_restore_for_meta, SpecificTaskDataCategory::Meta, ) { - data.into_iter() - .zip(tasks_to_restore_for_meta_indicies) - .for_each(|(item, idx)| { - tasks[idx].3 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_meta_indicies { - tasks[idx].3 = Some(TaskStorage::new()); + Ok(Some(data)) => { + for (item, &idx) in data.into_iter().zip(&tasks_to_restore_for_meta_indices) + { + tasks[idx].meta_restore_result = Some(Ok(item)); + } + } + Ok(None) => { + // should_check_backing_storage() was false; treat as empty + for &idx in &tasks_to_restore_for_meta_indices { + tasks[idx].meta_restore_result = Some(Ok(TaskStorage::default())); + } + } + Err(e) => { + let msg = format!("{e:?}"); + for &idx in &tasks_to_restore_for_meta_indices { + tasks[idx].meta_restore_result = + Some(Err(anyhow::anyhow!("Batch meta restore failed: {msg}"))); + } } } } } - for (task_id, category, storage_for_data, storage_for_meta) in tasks { - if storage_for_data.is_none() && storage_for_meta.is_none() { + // --- Phase 1c: Apply I/O results for tasks we restored --- + // (callbacks are deferred to Phase 2 so we finish restoring — and notify waiters — + // as early as possible) + // Errors are collected rather than panicking immediately so that all tasks' restoring + // bits are cleared first. Otherwise other threads waiting on those bits would hang. + let mut any_self_restored = false; + let mut restore_errors: Vec<(TaskId, &str, anyhow::Error)> = Vec::new(); + for entry in &mut tasks { + if entry.data_restore_result.is_none() && entry.meta_restore_result.is_none() { continue; } - self.task_lock_counter.acquire(); + entry.self_restored = true; + any_self_restored = true; + let task_id = entry.task_id; - let mut task_type = None; + self.task_lock_counter.acquire(); let mut task = self.backend.storage.access_mut(task_id); - if let Some(storage) = storage_for_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); - task.flags.set_restored(TaskDataCategory::Data); - task_type = task.get_persistent_task_type().cloned() + + if let Some(result) = entry.data_restore_result.take() { + if let Some(e) = + apply_restore_result(&mut task, result, SpecificTaskDataCategory::Data) + { + restore_errors.push((task_id, "data", e)); + } else { + // Since we claimed this restore (data_restored() was false under the lock), + // the task type is always fresh here. + entry.task_type = task.get_persistent_task_type().cloned(); + } } - if let Some(storage) = storage_for_meta - && !task.flags.is_restored(TaskDataCategory::Meta) + + if let Some(result) = entry.meta_restore_result.take() + && let Some(e) = + apply_restore_result(&mut task, result, SpecificTaskDataCategory::Meta) { - task.restore_from(storage, TaskDataCategory::Meta); - task.flags.set_restored(TaskDataCategory::Meta); + restore_errors.push((task_id, "meta", e)); } + + // Drop the lock before notifying so woken threads don't + // immediately contend on the same DashMap shard. + drop(task); self.task_lock_counter.release(); - prepared_task_callback(self, task_id, category, task); - if let Some(task_type) = task_type { + } + + // Notify all waiting threads once, after all tasks have been restored + // (or had their restoring bits cleared on error). + if any_self_restored { + self.backend.storage.restored.notify(usize::MAX); + } + + if !restore_errors.is_empty() { + let msgs: Vec = restore_errors + .iter() + .map(|(id, cat, e)| format!("Failed to restore {cat} for task {id}: {e:?}")) + .collect(); + panic!("Restore failures:\n{}", msgs.join("\n")); + } + + // --- Phase 2: Callbacks for tasks we restored ourselves --- + // Separated from Phase 1c so that other threads are unblocked as early as possible. + for entry in &tasks { + if !entry.self_restored { + continue; + } + if let Some(task_type) = entry.task_type.clone() { // Insert into the task cache to avoid future lookups - self.backend.task_cache.entry(task_type).or_insert(task_id); + self.backend + .task_cache + .entry(task_type) + .or_insert(entry.task_id); + } + // Only call the callback if no category is still being restored by another thread. + // If so, Phase 3 calls the callback after all categories are fully restored. + if !entry.wait_data && !entry.wait_meta { + let task = self.backend.storage.access_mut(entry.task_id); + prepared_task_callback(self, entry.task_id, entry.category, task); + } + } + + // --- Phase 3: Wait for tasks being restored by other threads --- + if any_waiting { + // Use a single listener per iteration so all waiting tasks share one wakeup. + loop { + // Register listener BEFORE re-checking bits to avoid a lost-wakeup race. + let listener = self.backend.storage.restored.listen(); + let mut still_waiting = false; + for entry in &tasks { + if entry.wait_data { + let task = self.backend.storage.access_mut(entry.task_id); + let restoring = task.flags.data_restoring(); + let restored = task.flags.data_restored(); + drop(task); + if !restored { + if !restoring { + panic!( + "Restore of Data for task {} failed in another thread", + entry.task_id + ); + } + still_waiting = true; + } + } + if entry.wait_meta { + let task = self.backend.storage.access_mut(entry.task_id); + let restoring = task.flags.meta_restoring(); + let restored = task.flags.meta_restored(); + drop(task); + if !restored { + if !restoring { + panic!( + "Restore of Meta for task {} failed in another thread", + entry.task_id + ); + } + still_waiting = true; + } + } + } + if !still_waiting { + break; + } + listener.wait(); + } + // All waited tasks are now restored; call their callbacks. + for entry in &tasks { + if entry.wait_data || entry.wait_meta { + self.task_lock_counter.acquire(); + let task = self.backend.storage.access_mut(entry.task_id); + self.task_lock_counter.release(); + prepared_task_callback(self, entry.task_id, entry.category, task); + } } } } } +/// Per-task state threaded through the phases of `prepare_tasks_with_callback`. +struct TaskRestoreEntry { + task_id: TaskId, + category: TaskDataCategory, + /// Result of restoring the data category (set in Phase 1b, consumed in Phase 1c). + data_restore_result: Option>, + /// Result of restoring the meta category (set in Phase 1b, consumed in Phase 1c). + meta_restore_result: Option>, + /// Another thread claimed the data restore; we must wait in Phase 3. + wait_data: bool, + /// Another thread claimed the meta restore; we must wait in Phase 3. + wait_meta: bool, + /// Task type discovered during Phase 1c data restore (used to update task cache in Phase 2). + task_type: Option>, + /// This thread performed the restore for at least one category (set in Phase 1c). + self_restored: bool, +} + +/// Applies a restore I/O result to a task's in-memory state. +/// +/// Clears the `*_restoring` flag for `category` unconditionally (success or error). +/// On success, merges `storage` into the task if not already marked restored, then sets +/// the restored flag. On error, returns the error so the caller can drop the task lock, +/// notify waiters, and panic. +fn apply_restore_result( + task: &mut StorageWriteGuard<'_>, + result: anyhow::Result, + category: SpecificTaskDataCategory, +) -> Option { + let task_category = TaskDataCategory::from(category); + match result { + Ok(storage) => { + debug_assert!( + !task.flags.is_restored(task_category), + "apply_restore_result called for already-restored {task_category:?}" + ); + task.restore_from(storage, task_category); + task.flags.set_restored(task_category); + task.flags.set_restoring(task_category, false); + None + } + Err(e) => { + task.flags.set_restoring(task_category, false); + Some(e) + } + } +} + impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { type TaskGuardImpl = TaskGuardImpl<'e>; @@ -408,43 +670,74 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { let mut task = self.backend.storage.access_mut(task_id); if !task.flags.is_restored(category) { - // New tasks (transient and persistent) have restored flags set at allocation - // time, so this path is only hit for persistent tasks being restored from DB. - debug_assert!( - !task.flags.new_task() && !task_id.is_transient(), - "new or transient task should already be marked restored" - ); - - // Collect which categories need restoring while we have the lock - let needs_data = - category.includes_data() && !task.flags.is_restored(TaskDataCategory::Data); - let needs_meta = - category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta); + if task_id.is_transient() { + task.flags.set_restored(TaskDataCategory::All); + } else { + // Collect which categories need restoring while we have the lock + let needs_data = + category.includes_data() && !task.flags.is_restored(TaskDataCategory::Data); + let needs_meta = + category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta); + + // Check whether another thread is currently restoring each category. + let data_restoring = needs_data && task.flags.data_restoring(); + let meta_restoring = needs_meta && task.flags.meta_restoring(); + + // Claim categories no one else is restoring. + let do_data = needs_data && !data_restoring; + let do_meta = needs_meta && !meta_restoring; + if do_data { + task.flags.set_data_restoring(true); + } + if do_meta { + task.flags.set_meta_restoring(true); + } - if needs_data || needs_meta { - // Avoid holding the lock too long since this can also affect other tasks - // Drop lock once, do all I/O, then re-acquire once - drop(task); + if do_data || do_meta || data_restoring || meta_restoring { + // Drop lock while doing I/O or waiting. + drop(task); - let storage_data = needs_data - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); - let storage_meta = needs_meta - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); + // Wait for categories claimed by another thread. + if data_restoring { + self.wait_for_restore_or_panic(task_id, SpecificTaskDataCategory::Data); + } + if meta_restoring { + self.wait_for_restore_or_panic(task_id, SpecificTaskDataCategory::Meta); + } - task = self.backend.storage.access_mut(task_id); + // Perform I/O for categories we claimed. + let storage_data = do_data + .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); + let storage_meta = do_meta + .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); + + task = self.backend.storage.access_mut(task_id); + + // Apply results and clear restoring bits. + if let Some(result) = storage_data + && let Some(e) = + apply_restore_result(&mut task, result, SpecificTaskDataCategory::Data) + { + drop(task); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore data for task {task_id}: {e:?}"); + } + if let Some(result) = storage_meta + && let Some(e) = + apply_restore_result(&mut task, result, SpecificTaskDataCategory::Meta) + { + drop(task); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore meta for task {task_id}: {e:?}"); + } - // Handle race conditions and merge - if let Some(storage) = storage_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); - task.flags.set_restored(TaskDataCategory::Data); - } - if let Some(storage) = storage_meta - && !task.flags.is_restored(TaskDataCategory::Meta) - { - task.restore_from(storage, TaskDataCategory::Meta); - task.flags.set_restored(TaskDataCategory::Meta); + if do_data || do_meta { + // Drop the lock before notifying so woken threads don't + // immediately contend on the same DashMap shard. + drop(task); + self.backend.storage.restored.notify(usize::MAX); + task = self.backend.storage.access_mut(task_id); + } } } } @@ -504,7 +797,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { let (mut task1, mut task2) = self.backend.storage.access_pair_mut(task_id1, task_id2); - // Collect what needs restoring for each task + // Collect what needs restoring for each task. let needs_data1 = category.includes_data() && !task1.flags.is_restored(TaskDataCategory::Data); let needs_meta1 = @@ -514,51 +807,122 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { let needs_meta2 = category.includes_meta() && !task2.flags.is_restored(TaskDataCategory::Meta); - if needs_data1 || needs_meta1 || needs_data2 || needs_meta2 { - // Avoid holding the lock too long since this can also affect other tasks - // Drop locks once, do all I/O, then re-acquire once + // Check whether another thread is restoring each category. + let data1_restoring = needs_data1 && task1.flags.data_restoring(); + let meta1_restoring = needs_meta1 && task1.flags.meta_restoring(); + let data2_restoring = needs_data2 && task2.flags.data_restoring(); + let meta2_restoring = needs_meta2 && task2.flags.meta_restoring(); + + // Claim categories no one else is restoring. + let do_data1 = needs_data1 && !data1_restoring; + let do_meta1 = needs_meta1 && !meta1_restoring; + let do_data2 = needs_data2 && !data2_restoring; + let do_meta2 = needs_meta2 && !meta2_restoring; + if do_data1 { + task1.flags.set_data_restoring(true); + } + if do_meta1 { + task1.flags.set_meta_restoring(true); + } + if do_data2 { + task2.flags.set_data_restoring(true); + } + if do_meta2 { + task2.flags.set_meta_restoring(true); + } + + if do_data1 + || do_meta1 + || do_data2 + || do_meta2 + || data1_restoring + || meta1_restoring + || data2_restoring + || meta2_restoring + { + // Drop both locks while doing I/O or waiting. drop(task1); drop(task2); - let storage_data1 = needs_data1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); - let storage_meta1 = needs_meta1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); - let storage_data2 = needs_data2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); - let storage_meta2 = needs_meta2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); + // Wait for categories claimed by another thread. + if data1_restoring { + self.wait_for_restore_or_panic(task_id1, SpecificTaskDataCategory::Data); + } + if meta1_restoring { + self.wait_for_restore_or_panic(task_id1, SpecificTaskDataCategory::Meta); + } + if data2_restoring { + self.wait_for_restore_or_panic(task_id2, SpecificTaskDataCategory::Data); + } + if meta2_restoring { + self.wait_for_restore_or_panic(task_id2, SpecificTaskDataCategory::Meta); + } + + // Perform I/O for categories we claimed. + let storage_data1 = + do_data1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); + let storage_meta1 = + do_meta1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); + let storage_data2 = + do_data2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); + let storage_meta2 = + do_meta2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2); task1 = t1; task2 = t2; - // Merge results, handling race conditions - if let Some(storage) = storage_data1 - && !task1.flags.is_restored(TaskDataCategory::Data) + // Apply results and clear restoring bits. + // On error: drop both locks, notify waiters, then panic. + if let Some(result) = storage_data1 + && let Some(e) = + apply_restore_result(&mut task1, result, SpecificTaskDataCategory::Data) { - task1.restore_from(storage, TaskDataCategory::Data); - task1.flags.set_restored(TaskDataCategory::Data); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore data for task {task_id1}: {e:?}"); } - if let Some(storage) = storage_meta1 - && !task1.flags.is_restored(TaskDataCategory::Meta) + if let Some(result) = storage_meta1 + && let Some(e) = + apply_restore_result(&mut task1, result, SpecificTaskDataCategory::Meta) { - task1.restore_from(storage, TaskDataCategory::Meta); - task1.flags.set_restored(TaskDataCategory::Meta); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore meta for task {task_id1}: {e:?}"); } - if let Some(storage) = storage_data2 - && !task2.flags.is_restored(TaskDataCategory::Data) + if let Some(result) = storage_data2 + && let Some(e) = + apply_restore_result(&mut task2, result, SpecificTaskDataCategory::Data) { - task2.restore_from(storage, TaskDataCategory::Data); - task2.flags.set_restored(TaskDataCategory::Data); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore data for task {task_id2}: {e:?}"); } - if let Some(storage) = storage_meta2 - && !task2.flags.is_restored(TaskDataCategory::Meta) + if let Some(result) = storage_meta2 + && let Some(e) = + apply_restore_result(&mut task2, result, SpecificTaskDataCategory::Meta) { - task2.restore_from(storage, TaskDataCategory::Meta); - task2.flags.set_restored(TaskDataCategory::Meta); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore meta for task {task_id2}: {e:?}"); + } + + if do_data1 || do_meta1 || do_data2 || do_meta2 { + // Drop both locks before notifying so woken threads don't + // immediately contend on the same DashMap shards. + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2); + task1 = t1; + task2 = t2; } } + ( TaskGuardImpl { task: task1, @@ -622,7 +986,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { } fn task_by_type(&mut self, task_type: &CachedTaskType) -> Option { - if !self.backend.should_restore() { + if !self.should_check_backing_storage() { return None; } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index a19e75b1e09148..f2b8c67cadf07e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -10,7 +10,7 @@ use std::{ use thread_local::ThreadLocal; use turbo_bincode::TurboBincodeBuffer; -use turbo_tasks::{FxDashMap, TaskId, parallel}; +use turbo_tasks::{FxDashMap, TaskId, event::Event, parallel}; use crate::{ backend::storage_schema::TaskStorage, @@ -53,6 +53,15 @@ pub enum SpecificTaskDataCategory { Data, } +impl From for TaskDataCategory { + fn from(category: SpecificTaskDataCategory) -> Self { + match category { + SpecificTaskDataCategory::Meta => TaskDataCategory::Meta, + SpecificTaskDataCategory::Data => TaskDataCategory::Data, + } + } +} + impl SpecificTaskDataCategory { /// Returns the KeySpace for storing data of this category pub fn key_space(self) -> KeySpace { @@ -83,6 +92,11 @@ pub struct Storage { /// be marked as modified at the beginning of the next snapshot cycle. snapshots: FxDashMap>>, map: FxDashMap>, + /// A shared event notified whenever any task finishes restoring (successfully or not). + /// + /// Threads waiting for another thread's in-progress restore subscribe to this event, + /// then re-check the specific task's `restoring`/`restored` bits after waking. + pub(crate) restored: Event, } impl Storage { @@ -115,6 +129,7 @@ impl Storage { shard_amount, ), map, + restored: Event::new(|| || "Storage::restored".to_string()), } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs index e12a5309dd9d43..31f811945981e9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs @@ -161,6 +161,14 @@ struct TaskStorageSchema { #[field(storage = "flag", category = "transient")] data_restored: bool, + /// Whether meta data restoration is currently in progress by another thread. + #[field(storage = "flag", category = "transient")] + meta_restoring: bool, + + /// Whether data restoration is currently in progress by another thread. + #[field(storage = "flag", category = "transient")] + data_restoring: bool, + /// Whether meta was modified before snapshot mode was entered. #[field(storage = "flag", category = "transient")] meta_modified: bool, @@ -348,6 +356,31 @@ impl TaskFlags { } } + /// Check if the category's restoration is currently in progress by another thread + pub fn is_restoring(&self, category: TaskDataCategory) -> bool { + match category { + TaskDataCategory::Meta => self.meta_restoring(), + TaskDataCategory::Data => self.data_restoring(), + TaskDataCategory::All => self.meta_restoring() || self.data_restoring(), + } + } + + /// Set or clear the restoring bits for the given category + pub fn set_restoring(&mut self, category: TaskDataCategory, value: bool) { + match category { + TaskDataCategory::Meta => { + self.set_meta_restoring(value); + } + TaskDataCategory::Data => { + self.set_data_restoring(value); + } + TaskDataCategory::All => { + self.set_meta_restoring(value); + self.set_data_restoring(value); + } + } + } + /// Check if any snapshot flag is set pub fn any_modified_during_snapshot(&self) -> bool { self.meta_modified_during_snapshot() || self.data_modified_during_snapshot() diff --git a/turbopack/crates/turbo-tasks/src/event.rs b/turbopack/crates/turbo-tasks/src/event.rs index 8ed124cc1a19c3..370665ec4c0bc8 100644 --- a/turbopack/crates/turbo-tasks/src/event.rs +++ b/turbopack/crates/turbo-tasks/src/event.rs @@ -11,6 +11,7 @@ use std::{ time::Duration, }; +use event_listener::Listener as _; #[cfg(feature = "hanging_detection")] use tokio::time::{Timeout, timeout}; @@ -201,6 +202,17 @@ impl Future for EventListener { } } +#[cfg(not(feature = "hanging_detection"))] +impl EventListener { + /// Blocks the current thread until the event is notified. + /// + /// This is the synchronous equivalent of `.await`-ing the `EventListener`. + /// Only valid in synchronous contexts (e.g. backend operations). + pub fn wait(self) { + self.listener.wait(); + } +} + #[cfg(feature = "hanging_detection")] pub struct EventListener { description: Arc String + Sync + Send>, @@ -273,6 +285,22 @@ impl Future for EventListener { } } +#[cfg(feature = "hanging_detection")] +impl EventListener { + /// Blocks the current thread until the event is notified. + /// + /// Note: In `hanging_detection` builds, timeout warnings are not emitted + /// for sync waits (only for async `.await` usage). + pub fn wait(mut self) { + if let Some(future) = self.future.take() { + // SAFETY: EventListener is Unpin, so it's safe to move out of the Pin. + unsafe { std::pin::Pin::into_inner_unchecked(future) } + .into_inner() + .wait(); + } + } +} + #[cfg(all(test, not(feature = "hanging_detection")))] mod tests { use std::hint::black_box; From 489e10edb813b661bb049d0125d910efb0f60b3f Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Apr 2026 20:24:20 +0000 Subject: [PATCH 2/9] fix: handle initialize_new_task race in apply_restore_result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a new task is created, task_cache.insert() releases the cache lock before initialize_new_task() marks the task as restored. A concurrent thread can see data_restored=false in this window, claim the restoring bit, do I/O, then call apply_restore_result after initialize_new_task has already set data_restored=true — causing the debug_assert to fire and the process to panic, which in turn caused waiting threads to hang. Fix by silently discarding the redundant result (and clearing the restoring bit so waiters unblock) instead of panicking. Co-Authored-By: Claude --- .../turbo-tasks-backend/src/backend/operation/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 3bc3aba6d4bca9..655b3949e32fc2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -636,10 +636,13 @@ fn apply_restore_result( let task_category = TaskDataCategory::from(category); match result { Ok(storage) => { - debug_assert!( - !task.flags.is_restored(task_category), - "apply_restore_result called for already-restored {task_category:?}" - ); + if task.flags.is_restored(task_category) { + // Already restored by another path (e.g., initialize_new_task racing + // with our I/O). Just clear the restoring bit so waiting threads + // unblock; our result is redundant. + task.flags.set_restoring(task_category, false); + return None; + } task.restore_from(storage, task_category); task.flags.set_restored(task_category); task.flags.set_restoring(task_category, false); From 3444aa92e3500846880b067e9f9e124b8f808ec8 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Apr 2026 21:37:24 +0000 Subject: [PATCH 3/9] inline should_check_backing_storage() per review Co-Authored-By: Claude --- .../src/backend/operation/mod.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 655b3949e32fc2..a1013b028e1440 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -172,16 +172,12 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { } } - fn should_check_backing_storage(&self) -> bool { - self.backend.should_restore() - } - fn restore_task_data( &self, task_id: TaskId, category: SpecificTaskDataCategory, ) -> anyhow::Result { - if !self.should_check_backing_storage() { + if !self.backend.should_restore() { // If we don't need to restore, we can just return an empty storage return Ok(TaskStorage::default()); } @@ -199,7 +195,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { category: SpecificTaskDataCategory, ) -> anyhow::Result>> { debug_assert!(task_ids.len() > 1, "Use restore_task_data for single task"); - if !self.should_check_backing_storage() { + if !self.backend.should_restore() { // If we don't need to restore, we return None return Ok(None); } @@ -275,7 +271,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { ) { // Fast path: no backing storage to restore from — mark all tasks as restored // and invoke callbacks directly, skipping the I/O pipeline. - if !self.should_check_backing_storage() { + if !self.backend.should_restore() { for (task_id, category) in task_ids { self.task_lock_counter.acquire(); let mut task = self.backend.storage.access_mut(task_id); @@ -415,7 +411,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { } } Ok(None) => { - // should_check_backing_storage() was false; treat as empty + // should_restore() was false; treat as empty for &idx in &tasks_to_restore_for_data_indices { tasks[idx].data_restore_result = Some(Ok(TaskStorage::default())); } @@ -453,7 +449,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { } } Ok(None) => { - // should_check_backing_storage() was false; treat as empty + // should_restore() was false; treat as empty for &idx in &tasks_to_restore_for_meta_indices { tasks[idx].meta_restore_result = Some(Ok(TaskStorage::default())); } @@ -989,7 +985,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { } fn task_by_type(&mut self, task_type: &CachedTaskType) -> Option { - if !self.should_check_backing_storage() { + if !self.backend.should_restore() { return None; } From 2b8f55705bc2f4c481403fbf7e1167b9ef8f2db5 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Apr 2026 22:42:04 +0000 Subject: [PATCH 4/9] address review comments on restore concurrency PR - Import `anyhow::Result` to avoid `anyhow::` prefix on return types - Change `apply_restore_result` to return `Result<()>` instead of `Option` - Add `StorageReadGuard` and `access_read` for shared-lock flag checks while waiting - Use shared read lock in `wait_for_restoring_task` to reduce shard contention - `wait_for_restore_or_panic` now returns the write guard to avoid re-acquisition - Accept `TaskDataCategory` in wait helpers to collapse data+meta waits into one call - Move `ready = false` to the top of each unrestored-category block - Rewrite Phase 3 as a single per-task loop: wait then callback immediately Co-Authored-By: Claude --- .../src/backend/operation/mod.rs | 196 +++++++++--------- .../src/backend/storage.rs | 25 +++ 2 files changed, 119 insertions(+), 102 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index a1013b028e1440..f01170e60906ca 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -12,7 +12,7 @@ use std::{ sync::Arc, }; -use anyhow::{Context, bail}; +use anyhow::{Context, Result, bail}; use bincode::{Decode, Encode}; use turbo_tasks::{ CellId, FxIndexMap, TaskExecutionReason, TaskId, TaskPriority, TurboTasksBackendApi, @@ -176,7 +176,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &self, task_id: TaskId, category: SpecificTaskDataCategory, - ) -> anyhow::Result { + ) -> Result { if !self.backend.should_restore() { // If we don't need to restore, we can just return an empty storage return Ok(TaskStorage::default()); @@ -193,7 +193,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> anyhow::Result>> { + ) -> Result>> { debug_assert!(task_ids.len() > 1, "Use restore_task_data for single task"); if !self.backend.should_restore() { // If we don't need to restore, we return None @@ -217,27 +217,33 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { /// Precondition: the caller must have observed `is_restoring()` == true for /// `task_id`+`category` and must have dropped the task lock before calling this. /// - /// Returns `Ok(())` when the task is restored (the restoring bits are cleared and the - /// restored bits are set), or `Err` if the restoring thread failed (restoring was cleared - /// without setting restored). + /// Uses a shared (read) lock while polling so multiple waiting threads don't + /// contend on the same shard. + /// + /// Returns a `StorageWriteGuard` (acquired once after the restore completes) when + /// successful, or `Err` if the restoring thread failed (restoring was cleared without + /// setting restored). fn wait_for_restoring_task( &self, task_id: TaskId, - category: SpecificTaskDataCategory, - ) -> anyhow::Result<()> { - let task_category = TaskDataCategory::from(category); + category: TaskDataCategory, + ) -> Result> { loop { // Register a listener BEFORE checking the bits (avoids a lost-wakeup race). let listener = self.backend.storage.restored.listen(); - let task = self.backend.storage.access_mut(task_id); - let is_restoring = task.flags.is_restoring(task_category); - let is_restored = task.flags.is_restored(task_category); + let task = self + .backend + .storage + .access_read(task_id) + .expect("task entry must exist when waiting for restore"); + let is_restoring = task.flags.is_restoring(category); + let is_restored = task.flags.is_restored(category); drop(task); if is_restored { - // The restoring thread finished successfully; we're done waiting. - return Ok(()); + // The restoring thread finished successfully; acquire a write guard and return. + return Ok(self.backend.storage.access_mut(task_id)); } if !is_restoring { // The restoring bit was cleared without setting the restored bit. @@ -251,9 +257,18 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { } /// Panics if waiting for another thread's restore of `task_id`+`category` fails. - fn wait_for_restore_or_panic(&self, task_id: TaskId, category: SpecificTaskDataCategory) { - if let Err(e) = self.wait_for_restoring_task(task_id, category) { - panic!("Restore of {category:?} for task {task_id} failed in another thread: {e:?}"); + /// Returns the `StorageWriteGuard` acquired at the end of the wait so callers can + /// use it directly without a second lock acquisition. + fn wait_for_restore_or_panic( + &self, + task_id: TaskId, + category: TaskDataCategory, + ) -> StorageWriteGuard<'e> { + match self.wait_for_restoring_task(task_id, category) { + Ok(guard) => guard, + Err(e) => { + panic!("Restore of {category:?} for task {task_id} failed in another thread: {e:?}") + } } } @@ -345,32 +360,30 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { let mut ready = true; if category.includes_data() && !task.flags.data_restored() { + ready = false; if task.flags.data_restoring() { // Another thread is restoring data; we'll wait in Phase 3 entry.wait_data = true; any_waiting = true; - ready = false; } else { // We claim responsibility for restoring data task.flags.set_data_restoring(true); tasks_to_restore_for_data.push(task_id); tasks_to_restore_for_data_indices.push(i); - ready = false; } } if category.includes_meta() && !task.flags.meta_restored() { + ready = false; if task.flags.meta_restoring() { // Another thread is restoring meta; we'll wait in Phase 3 entry.wait_meta = true; any_waiting = true; - ready = false; } else { // We claim responsibility for restoring meta task.flags.set_meta_restoring(true); tasks_to_restore_for_meta.push(task_id); tasks_to_restore_for_meta_indices.push(i); - ready = false; } } @@ -484,19 +497,20 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { let mut task = self.backend.storage.access_mut(task_id); if let Some(result) = entry.data_restore_result.take() { - if let Some(e) = - apply_restore_result(&mut task, result, SpecificTaskDataCategory::Data) - { - restore_errors.push((task_id, "data", e)); - } else { - // Since we claimed this restore (data_restored() was false under the lock), - // the task type is always fresh here. - entry.task_type = task.get_persistent_task_type().cloned(); + match apply_restore_result(&mut task, result, SpecificTaskDataCategory::Data) { + Ok(()) => { + // Since we claimed this restore (data_restored() was false under the lock), + // the task type is always fresh here. + entry.task_type = task.get_persistent_task_type().cloned(); + } + Err(e) => { + restore_errors.push((task_id, "data", e)); + } } } if let Some(result) = entry.meta_restore_result.take() - && let Some(e) = + && let Err(e) = apply_restore_result(&mut task, result, SpecificTaskDataCategory::Meta) { restore_errors.push((task_id, "meta", e)); @@ -543,55 +557,22 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { } } - // --- Phase 3: Wait for tasks being restored by other threads --- + // --- Phase 3: Wait for tasks being restored by other threads, then call callbacks --- + // Process each waiting task individually: block until it is restored, then + // immediately call the callback with the already-acquired write guard. if any_waiting { - // Use a single listener per iteration so all waiting tasks share one wakeup. - loop { - // Register listener BEFORE re-checking bits to avoid a lost-wakeup race. - let listener = self.backend.storage.restored.listen(); - let mut still_waiting = false; - for entry in &tasks { - if entry.wait_data { - let task = self.backend.storage.access_mut(entry.task_id); - let restoring = task.flags.data_restoring(); - let restored = task.flags.data_restored(); - drop(task); - if !restored { - if !restoring { - panic!( - "Restore of Data for task {} failed in another thread", - entry.task_id - ); - } - still_waiting = true; - } - } - if entry.wait_meta { - let task = self.backend.storage.access_mut(entry.task_id); - let restoring = task.flags.meta_restoring(); - let restored = task.flags.meta_restored(); - drop(task); - if !restored { - if !restoring { - panic!( - "Restore of Meta for task {} failed in another thread", - entry.task_id - ); - } - still_waiting = true; - } - } - } - if !still_waiting { - break; - } - listener.wait(); - } - // All waited tasks are now restored; call their callbacks. for entry in &tasks { - if entry.wait_data || entry.wait_meta { + let wait_category = match (entry.wait_data, entry.wait_meta) { + (true, true) => Some(TaskDataCategory::All), + (true, false) => Some(TaskDataCategory::Data), + (false, true) => Some(TaskDataCategory::Meta), + (false, false) => None, + }; + if let Some(cat) = wait_category { + // Blocks (using shared read locks) until this task is fully restored. + // Returns the write guard so we call the callback without re-acquiring. self.task_lock_counter.acquire(); - let task = self.backend.storage.access_mut(entry.task_id); + let task = self.wait_for_restore_or_panic(entry.task_id, cat); self.task_lock_counter.release(); prepared_task_callback(self, entry.task_id, entry.category, task); } @@ -626,9 +607,9 @@ struct TaskRestoreEntry { /// notify waiters, and panic. fn apply_restore_result( task: &mut StorageWriteGuard<'_>, - result: anyhow::Result, + result: Result, category: SpecificTaskDataCategory, -) -> Option { +) -> Result<()> { let task_category = TaskDataCategory::from(category); match result { Ok(storage) => { @@ -637,16 +618,16 @@ fn apply_restore_result( // with our I/O). Just clear the restoring bit so waiting threads // unblock; our result is redundant. task.flags.set_restoring(task_category, false); - return None; + return Ok(()); } task.restore_from(storage, task_category); task.flags.set_restored(task_category); task.flags.set_restoring(task_category, false); - None + Ok(()) } Err(e) => { task.flags.set_restoring(task_category, false); - Some(e) + Err(e) } } } @@ -697,11 +678,15 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { drop(task); // Wait for categories claimed by another thread. - if data_restoring { - self.wait_for_restore_or_panic(task_id, SpecificTaskDataCategory::Data); - } - if meta_restoring { - self.wait_for_restore_or_panic(task_id, SpecificTaskDataCategory::Meta); + let wait_category = match (data_restoring, meta_restoring) { + (true, true) => Some(TaskDataCategory::All), + (true, false) => Some(TaskDataCategory::Data), + (false, true) => Some(TaskDataCategory::Meta), + (false, false) => None, + }; + if let Some(cat) = wait_category { + // Returns a write guard; drop it since we re-acquire below. + drop(self.wait_for_restore_or_panic(task_id, cat)); } // Perform I/O for categories we claimed. @@ -714,7 +699,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { // Apply results and clear restoring bits. if let Some(result) = storage_data - && let Some(e) = + && let Err(e) = apply_restore_result(&mut task, result, SpecificTaskDataCategory::Data) { drop(task); @@ -722,7 +707,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { panic!("Failed to restore data for task {task_id}: {e:?}"); } if let Some(result) = storage_meta - && let Some(e) = + && let Err(e) = apply_restore_result(&mut task, result, SpecificTaskDataCategory::Meta) { drop(task); @@ -844,17 +829,24 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { drop(task2); // Wait for categories claimed by another thread. - if data1_restoring { - self.wait_for_restore_or_panic(task_id1, SpecificTaskDataCategory::Data); - } - if meta1_restoring { - self.wait_for_restore_or_panic(task_id1, SpecificTaskDataCategory::Meta); - } - if data2_restoring { - self.wait_for_restore_or_panic(task_id2, SpecificTaskDataCategory::Data); + let wait_category1 = match (data1_restoring, meta1_restoring) { + (true, true) => Some(TaskDataCategory::All), + (true, false) => Some(TaskDataCategory::Data), + (false, true) => Some(TaskDataCategory::Meta), + (false, false) => None, + }; + let wait_category2 = match (data2_restoring, meta2_restoring) { + (true, true) => Some(TaskDataCategory::All), + (true, false) => Some(TaskDataCategory::Data), + (false, true) => Some(TaskDataCategory::Meta), + (false, false) => None, + }; + // Returns write guards; drop them since we re-acquire via access_pair_mut below. + if let Some(cat) = wait_category1 { + drop(self.wait_for_restore_or_panic(task_id1, cat)); } - if meta2_restoring { - self.wait_for_restore_or_panic(task_id2, SpecificTaskDataCategory::Meta); + if let Some(cat) = wait_category2 { + drop(self.wait_for_restore_or_panic(task_id2, cat)); } // Perform I/O for categories we claimed. @@ -874,7 +866,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { // Apply results and clear restoring bits. // On error: drop both locks, notify waiters, then panic. if let Some(result) = storage_data1 - && let Some(e) = + && let Err(e) = apply_restore_result(&mut task1, result, SpecificTaskDataCategory::Data) { drop(task1); @@ -883,7 +875,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { panic!("Failed to restore data for task {task_id1}: {e:?}"); } if let Some(result) = storage_meta1 - && let Some(e) = + && let Err(e) = apply_restore_result(&mut task1, result, SpecificTaskDataCategory::Meta) { drop(task1); @@ -892,7 +884,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { panic!("Failed to restore meta for task {task_id1}: {e:?}"); } if let Some(result) = storage_data2 - && let Some(e) = + && let Err(e) = apply_restore_result(&mut task2, result, SpecificTaskDataCategory::Data) { drop(task1); @@ -901,7 +893,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { panic!("Failed to restore data for task {task_id2}: {e:?}"); } if let Some(result) = storage_meta2 - && let Some(e) = + && let Err(e) = apply_restore_result(&mut task2, result, SpecificTaskDataCategory::Meta) { drop(task1); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index f2b8c67cadf07e..01168d25e7d61b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -371,6 +371,14 @@ impl Storage { ) } + /// Returns a read-only (shared) guard for `key`, or `None` if the task has no entry yet. + /// + /// Acquires a shared shard lock, allowing multiple concurrent readers on the same shard + /// (unlike `access_mut` which acquires an exclusive write lock). + pub fn access_read(&self, key: TaskId) -> Option> { + self.map.get(&key).map(|inner| StorageReadGuard { inner }) + } + pub fn drop_contents(&self) { drop_contents(&self.map); drop_contents(&self.snapshots); @@ -482,6 +490,23 @@ impl DerefMut for StorageWriteGuard<'_> { } } +/// A read-only guard for a task's storage entry. +/// +/// Holds a shared (read) lock on the DashMap shard — multiple threads may hold +/// `StorageReadGuard`s for tasks in the same shard concurrently, unlike +/// `StorageWriteGuard` which holds an exclusive write lock. +pub struct StorageReadGuard<'a> { + inner: dashmap::mapref::one::Ref<'a, TaskId, Box>, +} + +impl Deref for StorageReadGuard<'_> { + type Target = TaskStorage; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + /// How big of a buffer to allocate initially. Based on metrics from a large /// application this should cover about 98% of values with no resizes. const SCRATCH_BUFFER_INITIAL_SIZE: usize = 4096; From e222f2835293b652fb36e429654fa085c84c8830 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Apr 2026 22:52:56 +0000 Subject: [PATCH 5/9] add fast-path check in wait_for_restoring_task to skip listener allocation By the time Phase 3 runs, batch I/O from Phase 1b has elapsed and the waiting task is likely already restored. Check the flags without registering a listener first; only fall into the listener-based loop if the task is still being restored. Co-Authored-By: Claude --- .../src/backend/operation/mod.rs | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index f01170e60906ca..66981c4c82135a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -228,8 +228,30 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { task_id: TaskId, category: TaskDataCategory, ) -> Result> { + // Fast path: check without registering a listener first. + // By the time Phase 3 runs, batch I/O from Phase 1b has elapsed and the other + // thread has likely already finished restoring. + { + let task = self + .backend + .storage + .access_read(task_id) + .expect("task entry must exist when waiting for restore"); + let is_restoring = task.flags.is_restoring(category); + let is_restored = task.flags.is_restored(category); + drop(task); + + if is_restored { + return Ok(self.backend.storage.access_mut(task_id)); + } + if !is_restoring { + bail!("restoring failed"); + } + } + + // Slow path: register a listener and wait until the other thread signals completion. loop { - // Register a listener BEFORE checking the bits (avoids a lost-wakeup race). + // Register a listener BEFORE re-checking the bits (avoids a lost-wakeup race). let listener = self.backend.storage.restored.listen(); let task = self From 10bccf321b4b436cea9c376617d6676d2f50cdee Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Apr 2026 23:23:41 +0000 Subject: [PATCH 6/9] switch wait_for_restoring_task to access_mut; move task_pair wait after I/O - wait_for_restoring_task: replace access_read with access_mut throughout, returning the write guard directly when the task is already restored (no second lock acquisition needed) - task_pair: move the wait-for-other-thread calls to after our own I/O so both threads' restore work can overlap - Remove access_read and StorageReadGuard from storage.rs since they are no longer used Co-Authored-By: Claude --- .../src/backend/operation/mod.rs | 64 ++++++++----------- .../src/backend/storage.rs | 25 -------- 2 files changed, 26 insertions(+), 63 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 66981c4c82135a..6f03b0d0d6b11c 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -217,55 +217,42 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { /// Precondition: the caller must have observed `is_restoring()` == true for /// `task_id`+`category` and must have dropped the task lock before calling this. /// - /// Uses a shared (read) lock while polling so multiple waiting threads don't - /// contend on the same shard. - /// - /// Returns a `StorageWriteGuard` (acquired once after the restore completes) when - /// successful, or `Err` if the restoring thread failed (restoring was cleared without - /// setting restored). + /// Returns the `StorageWriteGuard` acquired at the end of the wait when successful, + /// or `Err` if the restoring thread failed (restoring was cleared without setting restored). fn wait_for_restoring_task( &self, task_id: TaskId, category: TaskDataCategory, ) -> Result> { - // Fast path: check without registering a listener first. - // By the time Phase 3 runs, batch I/O from Phase 1b has elapsed and the other - // thread has likely already finished restoring. + // Fast path: acquire the write guard and check flags directly. + // By the time this is called, some I/O has elapsed and the other thread has + // likely already finished restoring. { - let task = self - .backend - .storage - .access_read(task_id) - .expect("task entry must exist when waiting for restore"); + let task = self.backend.storage.access_mut(task_id); let is_restoring = task.flags.is_restoring(category); let is_restored = task.flags.is_restored(category); - drop(task); - if is_restored { - return Ok(self.backend.storage.access_mut(task_id)); + return Ok(task); } if !is_restoring { bail!("restoring failed"); } + // Still restoring — drop the write guard before waiting. + drop(task); } // Slow path: register a listener and wait until the other thread signals completion. loop { - // Register a listener BEFORE re-checking the bits (avoids a lost-wakeup race). + // Register a listener BEFORE re-acquiring the lock (avoids a lost-wakeup race). let listener = self.backend.storage.restored.listen(); - let task = self - .backend - .storage - .access_read(task_id) - .expect("task entry must exist when waiting for restore"); + let task = self.backend.storage.access_mut(task_id); let is_restoring = task.flags.is_restoring(category); let is_restored = task.flags.is_restored(category); - drop(task); if is_restored { - // The restoring thread finished successfully; acquire a write guard and return. - return Ok(self.backend.storage.access_mut(task_id)); + // The restoring thread finished successfully; return the write guard directly. + return Ok(task); } if !is_restoring { // The restoring bit was cleared without setting the restored bit. @@ -273,7 +260,8 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { bail!("restoring failed"); } - // Still restoring; block until notified, then loop to re-check. + // Still restoring; drop the lock and block until notified, then loop to re-check. + drop(task); listener.wait(); } } @@ -850,7 +838,17 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { drop(task1); drop(task2); - // Wait for categories claimed by another thread. + // Perform I/O for categories we claimed (overlaps with the other thread's restore). + let storage_data1 = + do_data1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); + let storage_meta1 = + do_meta1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); + let storage_data2 = + do_data2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); + let storage_meta2 = + do_meta2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); + + // Wait for categories claimed by another thread (after our I/O, so they can overlap). let wait_category1 = match (data1_restoring, meta1_restoring) { (true, true) => Some(TaskDataCategory::All), (true, false) => Some(TaskDataCategory::Data), @@ -871,16 +869,6 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { drop(self.wait_for_restore_or_panic(task_id2, cat)); } - // Perform I/O for categories we claimed. - let storage_data1 = - do_data1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); - let storage_meta1 = - do_meta1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); - let storage_data2 = - do_data2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); - let storage_meta2 = - do_meta2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); - let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2); task1 = t1; task2 = t2; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 01168d25e7d61b..f2b8c67cadf07e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -371,14 +371,6 @@ impl Storage { ) } - /// Returns a read-only (shared) guard for `key`, or `None` if the task has no entry yet. - /// - /// Acquires a shared shard lock, allowing multiple concurrent readers on the same shard - /// (unlike `access_mut` which acquires an exclusive write lock). - pub fn access_read(&self, key: TaskId) -> Option> { - self.map.get(&key).map(|inner| StorageReadGuard { inner }) - } - pub fn drop_contents(&self) { drop_contents(&self.map); drop_contents(&self.snapshots); @@ -490,23 +482,6 @@ impl DerefMut for StorageWriteGuard<'_> { } } -/// A read-only guard for a task's storage entry. -/// -/// Holds a shared (read) lock on the DashMap shard — multiple threads may hold -/// `StorageReadGuard`s for tasks in the same shard concurrently, unlike -/// `StorageWriteGuard` which holds an exclusive write lock. -pub struct StorageReadGuard<'a> { - inner: dashmap::mapref::one::Ref<'a, TaskId, Box>, -} - -impl Deref for StorageReadGuard<'_> { - type Target = TaskStorage; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - /// How big of a buffer to allocate initially. Based on metrics from a large /// application this should cover about 98% of values with no resizes. const SCRATCH_BUFFER_INITIAL_SIZE: usize = 4096; From 1255dd33ee75171d13b1c7ccc5732db21f6e63d4 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 7 Apr 2026 23:30:10 +0000 Subject: [PATCH 7/9] single-task path: perform I/O before waiting; reuse write guard Move our own I/O ahead of the wait so it can overlap with the other thread's restore work. Reuse the write guard returned by wait_for_restore_or_panic instead of dropping it and re-acquiring. Co-Authored-By: Claude --- .../src/backend/operation/mod.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 6f03b0d0d6b11c..d9d02ac9f741ee 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -684,28 +684,28 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { } if do_data || do_meta || data_restoring || meta_restoring { - // Drop lock while doing I/O or waiting. + // Drop lock while doing I/O (our I/O can overlap with the other thread). drop(task); - // Wait for categories claimed by another thread. - let wait_category = match (data_restoring, meta_restoring) { - (true, true) => Some(TaskDataCategory::All), - (true, false) => Some(TaskDataCategory::Data), - (false, true) => Some(TaskDataCategory::Meta), - (false, false) => None, - }; - if let Some(cat) = wait_category { - // Returns a write guard; drop it since we re-acquire below. - drop(self.wait_for_restore_or_panic(task_id, cat)); - } - // Perform I/O for categories we claimed. let storage_data = do_data .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); let storage_meta = do_meta .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); - task = self.backend.storage.access_mut(task_id); + // Wait for categories claimed by another thread (after our I/O). + // Reuse the returned write guard to avoid a second lock acquisition. + let wait_category = match (data_restoring, meta_restoring) { + (true, true) => Some(TaskDataCategory::All), + (true, false) => Some(TaskDataCategory::Data), + (false, true) => Some(TaskDataCategory::Meta), + (false, false) => None, + }; + task = if let Some(cat) = wait_category { + self.wait_for_restore_or_panic(task_id, cat) + } else { + self.backend.storage.access_mut(task_id) + }; // Apply results and clear restoring bits. if let Some(result) = storage_data From d2a92f291c821e53611649777bb7678fa1238fe2 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 8 Apr 2026 17:13:55 +0000 Subject: [PATCH 8/9] address Luke's review comments - Remove dead set_restored calls in fast path and transient filter (flags are already set at allocation time); replaced with debug_asserts - Restore trace_span for prepare_tasks_with_callback - Replace should_restore() guards in restore_task_data/batch with debug_assert (callers already check); simplify batch return type to Result> - Use imported Result alias in TaskRestoreEntry fields - Extract wait_category() helper for the repeated match pattern Co-Authored-By: Claude --- .../src/backend/operation/mod.rs | 117 +++++++----------- 1 file changed, 47 insertions(+), 70 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index d9d02ac9f741ee..cf8aac7d277237 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -14,6 +14,7 @@ use std::{ use anyhow::{Context, Result, bail}; use bincode::{Decode, Encode}; +use tracing::trace_span; use turbo_tasks::{ CellId, FxIndexMap, TaskExecutionReason, TaskId, TaskPriority, TurboTasksBackendApi, TurboTasksCallApi, TypedSharedReference, backend::CachedTaskType, @@ -177,10 +178,10 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { task_id: TaskId, category: SpecificTaskDataCategory, ) -> Result { - if !self.backend.should_restore() { - // If we don't need to restore, we can just return an empty storage - return Ok(TaskStorage::default()); - } + debug_assert!( + self.backend.should_restore(), + "restore_task_data called when should_restore() is false" + ); let mut storage = TaskStorage::default(); self.backend .backing_storage @@ -193,12 +194,12 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Result>> { + ) -> Result> { debug_assert!(task_ids.len() > 1, "Use restore_task_data for single task"); - if !self.backend.should_restore() { - // If we don't need to restore, we return None - return Ok(None); - } + debug_assert!( + self.backend.should_restore(), + "restore_task_data_batch called when should_restore() is false" + ); let result = self .backend .backing_storage @@ -209,7 +210,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { task_ids.len() ) })?; - Ok(Some(result)) + Ok(result) } /// Waits for another thread's in-progress restore of a task to complete. @@ -286,7 +287,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &mut self, task_ids: impl IntoIterator, call_prepared_task_callback_for_transient_tasks: bool, - _reason: &'static str, + reason: &'static str, mut prepared_task_callback: impl FnMut( &mut Self, TaskId, @@ -294,24 +295,23 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { StorageWriteGuard<'e>, ), ) { - // Fast path: no backing storage to restore from — mark all tasks as restored - // and invoke callbacks directly, skipping the I/O pipeline. + let _span = trace_span!("prepare_tasks_with_callback", reason).entered(); + + // Fast path: no backing storage to restore from — all tasks should already + // have restored flags set at allocation time, so just invoke callbacks directly. if !self.backend.should_restore() { for (task_id, category) in task_ids { self.task_lock_counter.acquire(); - let mut task = self.backend.storage.access_mut(task_id); - if !task.flags.is_restored(category) { - task.flags.set_restored(if task_id.is_transient() { - TaskDataCategory::All - } else { - category - }); - } + let task = self.backend.storage.access_mut(task_id); + debug_assert!( + task.flags.is_restored(category), + "task {task_id} should already be marked restored when there is no backing \ + storage" + ); self.task_lock_counter.release(); if !task_id.is_transient() || call_prepared_task_callback_for_transient_tasks { prepared_task_callback(self, task_id, category, task); } - // else: task guard is dropped here } return; } @@ -323,11 +323,14 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { .into_iter() .filter(|&(id, category)| { if id.is_transient() { + // Transient tasks have restored flags set at allocation time, + // so they never need DB restoration. if call_prepared_task_callback_for_transient_tasks { - let mut task = self.backend.storage.access_mut(id); - if !task.flags.is_restored(category) { - task.flags.set_restored(TaskDataCategory::All); - } + let task = self.backend.storage.access_mut(id); + debug_assert!( + task.flags.is_restored(category), + "transient task {id} should already be marked restored" + ); prepared_task_callback(self, id, category, task); } false @@ -427,18 +430,12 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &tasks_to_restore_for_data, SpecificTaskDataCategory::Data, ) { - Ok(Some(data)) => { + Ok(data) => { for (item, &idx) in data.into_iter().zip(&tasks_to_restore_for_data_indices) { tasks[idx].data_restore_result = Some(Ok(item)); } } - Ok(None) => { - // should_restore() was false; treat as empty - for &idx in &tasks_to_restore_for_data_indices { - tasks[idx].data_restore_result = Some(Ok(TaskStorage::default())); - } - } Err(e) => { // Batch failure: distribute the error to each affected task let msg = format!("{e:?}"); @@ -465,18 +462,12 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &tasks_to_restore_for_meta, SpecificTaskDataCategory::Meta, ) { - Ok(Some(data)) => { + Ok(data) => { for (item, &idx) in data.into_iter().zip(&tasks_to_restore_for_meta_indices) { tasks[idx].meta_restore_result = Some(Ok(item)); } } - Ok(None) => { - // should_restore() was false; treat as empty - for &idx in &tasks_to_restore_for_meta_indices { - tasks[idx].meta_restore_result = Some(Ok(TaskStorage::default())); - } - } Err(e) => { let msg = format!("{e:?}"); for &idx in &tasks_to_restore_for_meta_indices { @@ -572,13 +563,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { // immediately call the callback with the already-acquired write guard. if any_waiting { for entry in &tasks { - let wait_category = match (entry.wait_data, entry.wait_meta) { - (true, true) => Some(TaskDataCategory::All), - (true, false) => Some(TaskDataCategory::Data), - (false, true) => Some(TaskDataCategory::Meta), - (false, false) => None, - }; - if let Some(cat) = wait_category { + if let Some(cat) = wait_category(entry.wait_data, entry.wait_meta) { // Blocks (using shared read locks) until this task is fully restored. // Returns the write guard so we call the callback without re-acquiring. self.task_lock_counter.acquire(); @@ -596,9 +581,9 @@ struct TaskRestoreEntry { task_id: TaskId, category: TaskDataCategory, /// Result of restoring the data category (set in Phase 1b, consumed in Phase 1c). - data_restore_result: Option>, + data_restore_result: Option>, /// Result of restoring the meta category (set in Phase 1b, consumed in Phase 1c). - meta_restore_result: Option>, + meta_restore_result: Option>, /// Another thread claimed the data restore; we must wait in Phase 3. wait_data: bool, /// Another thread claimed the meta restore; we must wait in Phase 3. @@ -609,6 +594,16 @@ struct TaskRestoreEntry { self_restored: bool, } +/// Combines per-category booleans into a single `TaskDataCategory` for waiting. +fn wait_category(wait_data: bool, wait_meta: bool) -> Option { + match (wait_data, wait_meta) { + (true, true) => Some(TaskDataCategory::All), + (true, false) => Some(TaskDataCategory::Data), + (false, true) => Some(TaskDataCategory::Meta), + (false, false) => None, + } +} + /// Applies a restore I/O result to a task's in-memory state. /// /// Clears the `*_restoring` flag for `category` unconditionally (success or error). @@ -695,13 +690,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { // Wait for categories claimed by another thread (after our I/O). // Reuse the returned write guard to avoid a second lock acquisition. - let wait_category = match (data_restoring, meta_restoring) { - (true, true) => Some(TaskDataCategory::All), - (true, false) => Some(TaskDataCategory::Data), - (false, true) => Some(TaskDataCategory::Meta), - (false, false) => None, - }; - task = if let Some(cat) = wait_category { + task = if let Some(cat) = wait_category(data_restoring, meta_restoring) { self.wait_for_restore_or_panic(task_id, cat) } else { self.backend.storage.access_mut(task_id) @@ -849,23 +838,11 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { do_meta2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); // Wait for categories claimed by another thread (after our I/O, so they can overlap). - let wait_category1 = match (data1_restoring, meta1_restoring) { - (true, true) => Some(TaskDataCategory::All), - (true, false) => Some(TaskDataCategory::Data), - (false, true) => Some(TaskDataCategory::Meta), - (false, false) => None, - }; - let wait_category2 = match (data2_restoring, meta2_restoring) { - (true, true) => Some(TaskDataCategory::All), - (true, false) => Some(TaskDataCategory::Data), - (false, true) => Some(TaskDataCategory::Meta), - (false, false) => None, - }; // Returns write guards; drop them since we re-acquire via access_pair_mut below. - if let Some(cat) = wait_category1 { + if let Some(cat) = wait_category(data1_restoring, meta1_restoring) { drop(self.wait_for_restore_or_panic(task_id1, cat)); } - if let Some(cat) = wait_category2 { + if let Some(cat) = wait_category(data2_restoring, meta2_restoring) { drop(self.wait_for_restore_or_panic(task_id2, cat)); } From 8e75ddbefe3ee84a237722d68c05baa2e6265b60 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 8 Apr 2026 19:40:26 +0000 Subject: [PATCH 9/9] fix: initialize task storage before inserting into task cache When a new task was created, the task_id was inserted into task_cache before initialize_new_task was called. This created a race window where another thread could see the task_id in the cache and call task() on it, finding storage with restored flags unset, triggering a restore attempt even when should_restore() is false. Fix by calling initialize_new_task before e.insert in both get_or_create_persistent_task and get_or_create_transient_task, so that any thread reading the task_id from the cache is guaranteed to see the storage entry already initialized with restored=All. Co-Authored-By: Claude --- .../crates/turbo-tasks-backend/src/backend/mod.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 56cbc185bf5171..807576da7ebc32 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1563,10 +1563,11 @@ impl TurboTasksBackendInner { // We're creating a new task. let task_type = Arc::new(task_type); let task_id = self.persisted_task_id_factory.get(); - e.insert(task_type.clone(), task_id); - // Mark the task as new in storage. - // Do this after e.insert so we aren't holding the task_cache lock + // Initialize storage BEFORE making task_id visible in the cache. + // This ensures any thread that reads task_id from the cache sees + // the storage entry already initialized (restored flags set). self.storage.initialize_new_task(task_id); + e.insert(task_type.clone(), task_id); // insert() consumes e, releasing the lock self.track_cache_miss(&task_type); is_new = true; @@ -1638,8 +1639,9 @@ impl TurboTasksBackendInner { RawEntry::Vacant(e) => { let task_type = Arc::new(task_type); let task_id = self.transient_task_id_factory.get(); - e.insert(task_type.clone(), task_id); + // Initialize storage BEFORE making task_id visible in the cache. self.storage.initialize_new_task(task_id); + e.insert(task_type.clone(), task_id); self.track_cache_miss(&task_type); if is_root {