diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 56cbc185bf517..807576da7ebc3 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1563,10 +1563,11 @@ impl TurboTasksBackendInner { // We're creating a new task. let task_type = Arc::new(task_type); let task_id = self.persisted_task_id_factory.get(); - e.insert(task_type.clone(), task_id); - // Mark the task as new in storage. - // Do this after e.insert so we aren't holding the task_cache lock + // Initialize storage BEFORE making task_id visible in the cache. + // This ensures any thread that reads task_id from the cache sees + // the storage entry already initialized (restored flags set). self.storage.initialize_new_task(task_id); + e.insert(task_type.clone(), task_id); // insert() consumes e, releasing the lock self.track_cache_miss(&task_type); is_new = true; @@ -1638,8 +1639,9 @@ impl TurboTasksBackendInner { RawEntry::Vacant(e) => { let task_type = Arc::new(task_type); let task_id = self.transient_task_id_factory.get(); - e.insert(task_type.clone(), task_id); + // Initialize storage BEFORE making task_id visible in the cache. self.storage.initialize_new_task(task_id); + e.insert(task_type.clone(), task_id); self.track_cache_miss(&task_type); if is_root { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 03111a29b0eb1..cf8aac7d27723 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -12,6 +12,7 @@ use std::{ sync::Arc, }; +use anyhow::{Context, Result, bail}; use bincode::{Decode, Encode}; use tracing::trace_span; use turbo_tasks::{ @@ -176,53 +177,108 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &self, task_id: TaskId, category: SpecificTaskDataCategory, - ) -> TaskStorage { + ) -> Result { + debug_assert!( + self.backend.should_restore(), + "restore_task_data called when should_restore() is false" + ); let mut storage = TaskStorage::default(); - if !self.backend.should_restore() { - return storage; - } - let result = self - .backend + self.backend .backing_storage - .lookup_data(task_id, category, &mut storage); - - match result { - Ok(()) => storage, - Err(e) => { - panic!( - "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!("{category:?} for {task_id})")) - ) - } - } + .lookup_data(task_id, category, &mut storage) + .with_context(|| format!("Failed to restore {category:?} for {task_id}"))?; + Ok(storage) } fn restore_task_data_batch( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Option> { + ) -> Result> { + debug_assert!(task_ids.len() > 1, "Use restore_task_data for single task"); debug_assert!( - task_ids.len() > 1, - "Use restore_task_data_typed for single task" + self.backend.should_restore(), + "restore_task_data_batch called when should_restore() is false" ); - if !self.backend.should_restore() { - return None; - } let result = self .backend .backing_storage - .batch_lookup_data(task_ids, category); - match result { - Ok(result) => Some(result), - Err(e) => { - panic!( - "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!( - "{category:?} for batch of {} tasks", - task_ids.len() - )) + .batch_lookup_data(task_ids, category) + .with_context(|| { + format!( + "Failed to restore {category:?} for batch of {} tasks", + task_ids.len() ) + })?; + Ok(result) + } + + /// Waits for another thread's in-progress restore of a task to complete. + /// + /// Precondition: the caller must have observed `is_restoring()` == true for + /// `task_id`+`category` and must have dropped the task lock before calling this. + /// + /// Returns the `StorageWriteGuard` acquired at the end of the wait when successful, + /// or `Err` if the restoring thread failed (restoring was cleared without setting restored). + fn wait_for_restoring_task( + &self, + task_id: TaskId, + category: TaskDataCategory, + ) -> Result> { + // Fast path: acquire the write guard and check flags directly. + // By the time this is called, some I/O has elapsed and the other thread has + // likely already finished restoring. + { + let task = self.backend.storage.access_mut(task_id); + let is_restoring = task.flags.is_restoring(category); + let is_restored = task.flags.is_restored(category); + if is_restored { + return Ok(task); + } + if !is_restoring { + bail!("restoring failed"); + } + // Still restoring — drop the write guard before waiting. + drop(task); + } + + // Slow path: register a listener and wait until the other thread signals completion. + loop { + // Register a listener BEFORE re-acquiring the lock (avoids a lost-wakeup race). + let listener = self.backend.storage.restored.listen(); + + let task = self.backend.storage.access_mut(task_id); + let is_restoring = task.flags.is_restoring(category); + let is_restored = task.flags.is_restored(category); + + if is_restored { + // The restoring thread finished successfully; return the write guard directly. + return Ok(task); + } + if !is_restoring { + // The restoring bit was cleared without setting the restored bit. + // This means the restoring thread encountered an error. + bail!("restoring failed"); + } + + // Still restoring; drop the lock and block until notified, then loop to re-check. + drop(task); + listener.wait(); + } + } + + /// Panics if waiting for another thread's restore of `task_id`+`category` fails. + /// Returns the `StorageWriteGuard` acquired at the end of the wait so callers can + /// use it directly without a second lock acquisition. + fn wait_for_restore_or_panic( + &self, + task_id: TaskId, + category: TaskDataCategory, + ) -> StorageWriteGuard<'e> { + match self.wait_for_restoring_task(task_id, category) { + Ok(guard) => guard, + Err(e) => { + panic!("Restore of {category:?} for task {task_id} failed in another thread: {e:?}") } } } @@ -239,13 +295,27 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { StorageWriteGuard<'e>, ), ) { - let span = trace_span!( - "prepare_tasks_with_callback", - reason, - requested_data = tracing::field::Empty, - requested_meta = tracing::field::Empty, - ); - let _guard = span.enter(); + let _span = trace_span!("prepare_tasks_with_callback", reason).entered(); + + // Fast path: no backing storage to restore from — all tasks should already + // have restored flags set at allocation time, so just invoke callbacks directly. + if !self.backend.should_restore() { + for (task_id, category) in task_ids { + self.task_lock_counter.acquire(); + let task = self.backend.storage.access_mut(task_id); + debug_assert!( + task.flags.is_restored(category), + "task {task_id} should already be marked restored when there is no backing \ + storage" + ); + self.task_lock_counter.release(); + if !task_id.is_transient() || call_prepared_task_callback_for_transient_tasks { + prepared_task_callback(self, task_id, category, task); + } + } + return; + } + let mut data_count = 0; let mut meta_count = 0; let mut all_count = 0; @@ -253,10 +323,14 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { .into_iter() .filter(|&(id, category)| { if id.is_transient() { - // Transient tasks don't need DB restoration (restored flags are - // set at allocation time), so just invoke the callback directly. + // Transient tasks have restored flags set at allocation time, + // so they never need DB restoration. if call_prepared_task_callback_for_transient_tasks { let task = self.backend.storage.access_mut(id); + debug_assert!( + task.flags.is_restored(category), + "transient task {id} should already be marked restored" + ); prepared_task_callback(self, id, category, task); } false @@ -269,127 +343,300 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { TaskDataCategory::Meta => meta_count += 1, TaskDataCategory::All => all_count += 1, }) - .map(|(id, category)| (id, category, None, None)) + .map(|(id, category)| TaskRestoreEntry { + task_id: id, + category, + data_restore_result: None, + meta_restore_result: None, + wait_data: false, + wait_meta: false, + task_type: None, + self_restored: false, + }) .collect::>(); data_count += all_count; meta_count += all_count; - span.record("requested_data", data_count); - span.record("requested_meta", meta_count); let mut tasks_to_restore_for_data = Vec::with_capacity(data_count); - let mut tasks_to_restore_for_data_indicies = Vec::with_capacity(data_count); + let mut tasks_to_restore_for_data_indices = Vec::with_capacity(data_count); let mut tasks_to_restore_for_meta = Vec::with_capacity(meta_count); - let mut tasks_to_restore_for_meta_indicies = Vec::with_capacity(meta_count); - for (i, &(task_id, category, _, _)) in tasks.iter().enumerate() { + let mut tasks_to_restore_for_meta_indices = Vec::with_capacity(meta_count); + + // --- Phase 1a: Classify tasks under lock --- + // For each task, determine whether we will restore it ourselves or wait for another thread. + let mut any_waiting = false; + for (i, entry) in tasks.iter_mut().enumerate() { + let task_id = entry.task_id; + let category = entry.category; self.task_lock_counter.acquire(); - - let task = self.backend.storage.access_mut(task_id); + let mut task = self.backend.storage.access_mut(task_id); let mut ready = true; - if matches!(category, TaskDataCategory::Data | TaskDataCategory::All) - && !task.flags.is_restored(TaskDataCategory::Data) - { - tasks_to_restore_for_data.push(task_id); - tasks_to_restore_for_data_indicies.push(i); + + if category.includes_data() && !task.flags.data_restored() { ready = false; + if task.flags.data_restoring() { + // Another thread is restoring data; we'll wait in Phase 3 + entry.wait_data = true; + any_waiting = true; + } else { + // We claim responsibility for restoring data + task.flags.set_data_restoring(true); + tasks_to_restore_for_data.push(task_id); + tasks_to_restore_for_data_indices.push(i); + } } - if matches!(category, TaskDataCategory::Meta | TaskDataCategory::All) - && !task.flags.is_restored(TaskDataCategory::Meta) - { - tasks_to_restore_for_meta.push(task_id); - tasks_to_restore_for_meta_indicies.push(i); + + if category.includes_meta() && !task.flags.meta_restored() { ready = false; + if task.flags.meta_restoring() { + // Another thread is restoring meta; we'll wait in Phase 3 + entry.wait_meta = true; + any_waiting = true; + } else { + // We claim responsibility for restoring meta + task.flags.set_meta_restoring(true); + tasks_to_restore_for_meta.push(task_id); + tasks_to_restore_for_meta_indices.push(i); + } } + self.task_lock_counter.release(); if ready { prepared_task_callback(self, task_id, category, task); } + // else: task guard is dropped here } - if tasks_to_restore_for_meta.is_empty() && tasks_to_restore_for_data.is_empty() { + + if tasks_to_restore_for_data.is_empty() + && tasks_to_restore_for_meta.is_empty() + && !any_waiting + { return; } + // --- Phase 1b: Batch I/O for tasks we claimed --- + + // Data I/O match tasks_to_restore_for_data.len() { 0 => {} 1 => { let task_id = tasks_to_restore_for_data[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Data); - let idx = tasks_to_restore_for_data_indicies[0]; - tasks[idx].2 = Some(data); + let idx = tasks_to_restore_for_data_indices[0]; + tasks[idx].data_restore_result = + Some(self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); } _ => { - if let Some(data) = self.restore_task_data_batch( + match self.restore_task_data_batch( &tasks_to_restore_for_data, SpecificTaskDataCategory::Data, ) { - data.into_iter() - .zip(tasks_to_restore_for_data_indicies) - .for_each(|(item, idx)| { - tasks[idx].2 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_data_indicies { - tasks[idx].2 = Some(TaskStorage::default()); + Ok(data) => { + for (item, &idx) in data.into_iter().zip(&tasks_to_restore_for_data_indices) + { + tasks[idx].data_restore_result = Some(Ok(item)); + } + } + Err(e) => { + // Batch failure: distribute the error to each affected task + let msg = format!("{e:?}"); + for &idx in &tasks_to_restore_for_data_indices { + tasks[idx].data_restore_result = + Some(Err(anyhow::anyhow!("Batch data restore failed: {msg}"))); + } } } } } + + // Meta I/O match tasks_to_restore_for_meta.len() { 0 => {} 1 => { let task_id = tasks_to_restore_for_meta[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Meta); - let idx = tasks_to_restore_for_meta_indicies[0]; - tasks[idx].3 = Some(data); + let idx = tasks_to_restore_for_meta_indices[0]; + tasks[idx].meta_restore_result = + Some(self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); } _ => { - if let Some(data) = self.restore_task_data_batch( + match self.restore_task_data_batch( &tasks_to_restore_for_meta, SpecificTaskDataCategory::Meta, ) { - data.into_iter() - .zip(tasks_to_restore_for_meta_indicies) - .for_each(|(item, idx)| { - tasks[idx].3 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_meta_indicies { - tasks[idx].3 = Some(TaskStorage::new()); + Ok(data) => { + for (item, &idx) in data.into_iter().zip(&tasks_to_restore_for_meta_indices) + { + tasks[idx].meta_restore_result = Some(Ok(item)); + } + } + Err(e) => { + let msg = format!("{e:?}"); + for &idx in &tasks_to_restore_for_meta_indices { + tasks[idx].meta_restore_result = + Some(Err(anyhow::anyhow!("Batch meta restore failed: {msg}"))); + } } } } } - for (task_id, category, storage_for_data, storage_for_meta) in tasks { - if storage_for_data.is_none() && storage_for_meta.is_none() { + // --- Phase 1c: Apply I/O results for tasks we restored --- + // (callbacks are deferred to Phase 2 so we finish restoring — and notify waiters — + // as early as possible) + // Errors are collected rather than panicking immediately so that all tasks' restoring + // bits are cleared first. Otherwise other threads waiting on those bits would hang. + let mut any_self_restored = false; + let mut restore_errors: Vec<(TaskId, &str, anyhow::Error)> = Vec::new(); + for entry in &mut tasks { + if entry.data_restore_result.is_none() && entry.meta_restore_result.is_none() { continue; } - self.task_lock_counter.acquire(); + entry.self_restored = true; + any_self_restored = true; + let task_id = entry.task_id; - let mut task_type = None; + self.task_lock_counter.acquire(); let mut task = self.backend.storage.access_mut(task_id); - if let Some(storage) = storage_for_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); - task.flags.set_restored(TaskDataCategory::Data); - task_type = task.get_persistent_task_type().cloned() + + if let Some(result) = entry.data_restore_result.take() { + match apply_restore_result(&mut task, result, SpecificTaskDataCategory::Data) { + Ok(()) => { + // Since we claimed this restore (data_restored() was false under the lock), + // the task type is always fresh here. + entry.task_type = task.get_persistent_task_type().cloned(); + } + Err(e) => { + restore_errors.push((task_id, "data", e)); + } + } } - if let Some(storage) = storage_for_meta - && !task.flags.is_restored(TaskDataCategory::Meta) + + if let Some(result) = entry.meta_restore_result.take() + && let Err(e) = + apply_restore_result(&mut task, result, SpecificTaskDataCategory::Meta) { - task.restore_from(storage, TaskDataCategory::Meta); - task.flags.set_restored(TaskDataCategory::Meta); + restore_errors.push((task_id, "meta", e)); } + + // Drop the lock before notifying so woken threads don't + // immediately contend on the same DashMap shard. + drop(task); self.task_lock_counter.release(); - prepared_task_callback(self, task_id, category, task); - if let Some(task_type) = task_type { + } + + // Notify all waiting threads once, after all tasks have been restored + // (or had their restoring bits cleared on error). + if any_self_restored { + self.backend.storage.restored.notify(usize::MAX); + } + + if !restore_errors.is_empty() { + let msgs: Vec = restore_errors + .iter() + .map(|(id, cat, e)| format!("Failed to restore {cat} for task {id}: {e:?}")) + .collect(); + panic!("Restore failures:\n{}", msgs.join("\n")); + } + + // --- Phase 2: Callbacks for tasks we restored ourselves --- + // Separated from Phase 1c so that other threads are unblocked as early as possible. + for entry in &tasks { + if !entry.self_restored { + continue; + } + if let Some(task_type) = entry.task_type.clone() { // Insert into the task cache to avoid future lookups - self.backend.task_cache.entry(task_type).or_insert(task_id); + self.backend + .task_cache + .entry(task_type) + .or_insert(entry.task_id); + } + // Only call the callback if no category is still being restored by another thread. + // If so, Phase 3 calls the callback after all categories are fully restored. + if !entry.wait_data && !entry.wait_meta { + let task = self.backend.storage.access_mut(entry.task_id); + prepared_task_callback(self, entry.task_id, entry.category, task); + } + } + + // --- Phase 3: Wait for tasks being restored by other threads, then call callbacks --- + // Process each waiting task individually: block until it is restored, then + // immediately call the callback with the already-acquired write guard. + if any_waiting { + for entry in &tasks { + if let Some(cat) = wait_category(entry.wait_data, entry.wait_meta) { + // Blocks (using shared read locks) until this task is fully restored. + // Returns the write guard so we call the callback without re-acquiring. + self.task_lock_counter.acquire(); + let task = self.wait_for_restore_or_panic(entry.task_id, cat); + self.task_lock_counter.release(); + prepared_task_callback(self, entry.task_id, entry.category, task); + } } } } } +/// Per-task state threaded through the phases of `prepare_tasks_with_callback`. +struct TaskRestoreEntry { + task_id: TaskId, + category: TaskDataCategory, + /// Result of restoring the data category (set in Phase 1b, consumed in Phase 1c). + data_restore_result: Option>, + /// Result of restoring the meta category (set in Phase 1b, consumed in Phase 1c). + meta_restore_result: Option>, + /// Another thread claimed the data restore; we must wait in Phase 3. + wait_data: bool, + /// Another thread claimed the meta restore; we must wait in Phase 3. + wait_meta: bool, + /// Task type discovered during Phase 1c data restore (used to update task cache in Phase 2). + task_type: Option>, + /// This thread performed the restore for at least one category (set in Phase 1c). + self_restored: bool, +} + +/// Combines per-category booleans into a single `TaskDataCategory` for waiting. +fn wait_category(wait_data: bool, wait_meta: bool) -> Option { + match (wait_data, wait_meta) { + (true, true) => Some(TaskDataCategory::All), + (true, false) => Some(TaskDataCategory::Data), + (false, true) => Some(TaskDataCategory::Meta), + (false, false) => None, + } +} + +/// Applies a restore I/O result to a task's in-memory state. +/// +/// Clears the `*_restoring` flag for `category` unconditionally (success or error). +/// On success, merges `storage` into the task if not already marked restored, then sets +/// the restored flag. On error, returns the error so the caller can drop the task lock, +/// notify waiters, and panic. +fn apply_restore_result( + task: &mut StorageWriteGuard<'_>, + result: Result, + category: SpecificTaskDataCategory, +) -> Result<()> { + let task_category = TaskDataCategory::from(category); + match result { + Ok(storage) => { + if task.flags.is_restored(task_category) { + // Already restored by another path (e.g., initialize_new_task racing + // with our I/O). Just clear the restoring bit so waiting threads + // unblock; our result is redundant. + task.flags.set_restoring(task_category, false); + return Ok(()); + } + task.restore_from(storage, task_category); + task.flags.set_restored(task_category); + task.flags.set_restoring(task_category, false); + Ok(()) + } + Err(e) => { + task.flags.set_restoring(task_category, false); + Err(e) + } + } +} + impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { type TaskGuardImpl = TaskGuardImpl<'e>; @@ -408,43 +655,72 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { let mut task = self.backend.storage.access_mut(task_id); if !task.flags.is_restored(category) { - // New tasks (transient and persistent) have restored flags set at allocation - // time, so this path is only hit for persistent tasks being restored from DB. - debug_assert!( - !task.flags.new_task() && !task_id.is_transient(), - "new or transient task should already be marked restored" - ); - - // Collect which categories need restoring while we have the lock - let needs_data = - category.includes_data() && !task.flags.is_restored(TaskDataCategory::Data); - let needs_meta = - category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta); - - if needs_data || needs_meta { - // Avoid holding the lock too long since this can also affect other tasks - // Drop lock once, do all I/O, then re-acquire once - drop(task); - - let storage_data = needs_data - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); - let storage_meta = needs_meta - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); - - task = self.backend.storage.access_mut(task_id); - - // Handle race conditions and merge - if let Some(storage) = storage_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); - task.flags.set_restored(TaskDataCategory::Data); + if task_id.is_transient() { + task.flags.set_restored(TaskDataCategory::All); + } else { + // Collect which categories need restoring while we have the lock + let needs_data = + category.includes_data() && !task.flags.is_restored(TaskDataCategory::Data); + let needs_meta = + category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta); + + // Check whether another thread is currently restoring each category. + let data_restoring = needs_data && task.flags.data_restoring(); + let meta_restoring = needs_meta && task.flags.meta_restoring(); + + // Claim categories no one else is restoring. + let do_data = needs_data && !data_restoring; + let do_meta = needs_meta && !meta_restoring; + if do_data { + task.flags.set_data_restoring(true); + } + if do_meta { + task.flags.set_meta_restoring(true); } - if let Some(storage) = storage_meta - && !task.flags.is_restored(TaskDataCategory::Meta) - { - task.restore_from(storage, TaskDataCategory::Meta); - task.flags.set_restored(TaskDataCategory::Meta); + + if do_data || do_meta || data_restoring || meta_restoring { + // Drop lock while doing I/O (our I/O can overlap with the other thread). + drop(task); + + // Perform I/O for categories we claimed. + let storage_data = do_data + .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); + let storage_meta = do_meta + .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); + + // Wait for categories claimed by another thread (after our I/O). + // Reuse the returned write guard to avoid a second lock acquisition. + task = if let Some(cat) = wait_category(data_restoring, meta_restoring) { + self.wait_for_restore_or_panic(task_id, cat) + } else { + self.backend.storage.access_mut(task_id) + }; + + // Apply results and clear restoring bits. + if let Some(result) = storage_data + && let Err(e) = + apply_restore_result(&mut task, result, SpecificTaskDataCategory::Data) + { + drop(task); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore data for task {task_id}: {e:?}"); + } + if let Some(result) = storage_meta + && let Err(e) = + apply_restore_result(&mut task, result, SpecificTaskDataCategory::Meta) + { + drop(task); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore meta for task {task_id}: {e:?}"); + } + + if do_data || do_meta { + // Drop the lock before notifying so woken threads don't + // immediately contend on the same DashMap shard. + drop(task); + self.backend.storage.restored.notify(usize::MAX); + task = self.backend.storage.access_mut(task_id); + } } } } @@ -504,7 +780,7 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { let (mut task1, mut task2) = self.backend.storage.access_pair_mut(task_id1, task_id2); - // Collect what needs restoring for each task + // Collect what needs restoring for each task. let needs_data1 = category.includes_data() && !task1.flags.is_restored(TaskDataCategory::Data); let needs_meta1 = @@ -514,51 +790,117 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { let needs_meta2 = category.includes_meta() && !task2.flags.is_restored(TaskDataCategory::Meta); - if needs_data1 || needs_meta1 || needs_data2 || needs_meta2 { - // Avoid holding the lock too long since this can also affect other tasks - // Drop locks once, do all I/O, then re-acquire once + // Check whether another thread is restoring each category. + let data1_restoring = needs_data1 && task1.flags.data_restoring(); + let meta1_restoring = needs_meta1 && task1.flags.meta_restoring(); + let data2_restoring = needs_data2 && task2.flags.data_restoring(); + let meta2_restoring = needs_meta2 && task2.flags.meta_restoring(); + + // Claim categories no one else is restoring. + let do_data1 = needs_data1 && !data1_restoring; + let do_meta1 = needs_meta1 && !meta1_restoring; + let do_data2 = needs_data2 && !data2_restoring; + let do_meta2 = needs_meta2 && !meta2_restoring; + if do_data1 { + task1.flags.set_data_restoring(true); + } + if do_meta1 { + task1.flags.set_meta_restoring(true); + } + if do_data2 { + task2.flags.set_data_restoring(true); + } + if do_meta2 { + task2.flags.set_meta_restoring(true); + } + + if do_data1 + || do_meta1 + || do_data2 + || do_meta2 + || data1_restoring + || meta1_restoring + || data2_restoring + || meta2_restoring + { + // Drop both locks while doing I/O or waiting. drop(task1); drop(task2); - let storage_data1 = needs_data1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); - let storage_meta1 = needs_meta1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); - let storage_data2 = needs_data2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); - let storage_meta2 = needs_meta2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); + // Perform I/O for categories we claimed (overlaps with the other thread's restore). + let storage_data1 = + do_data1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); + let storage_meta1 = + do_meta1.then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); + let storage_data2 = + do_data2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); + let storage_meta2 = + do_meta2.then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); + + // Wait for categories claimed by another thread (after our I/O, so they can overlap). + // Returns write guards; drop them since we re-acquire via access_pair_mut below. + if let Some(cat) = wait_category(data1_restoring, meta1_restoring) { + drop(self.wait_for_restore_or_panic(task_id1, cat)); + } + if let Some(cat) = wait_category(data2_restoring, meta2_restoring) { + drop(self.wait_for_restore_or_panic(task_id2, cat)); + } let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2); task1 = t1; task2 = t2; - // Merge results, handling race conditions - if let Some(storage) = storage_data1 - && !task1.flags.is_restored(TaskDataCategory::Data) + // Apply results and clear restoring bits. + // On error: drop both locks, notify waiters, then panic. + if let Some(result) = storage_data1 + && let Err(e) = + apply_restore_result(&mut task1, result, SpecificTaskDataCategory::Data) { - task1.restore_from(storage, TaskDataCategory::Data); - task1.flags.set_restored(TaskDataCategory::Data); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore data for task {task_id1}: {e:?}"); } - if let Some(storage) = storage_meta1 - && !task1.flags.is_restored(TaskDataCategory::Meta) + if let Some(result) = storage_meta1 + && let Err(e) = + apply_restore_result(&mut task1, result, SpecificTaskDataCategory::Meta) { - task1.restore_from(storage, TaskDataCategory::Meta); - task1.flags.set_restored(TaskDataCategory::Meta); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore meta for task {task_id1}: {e:?}"); } - if let Some(storage) = storage_data2 - && !task2.flags.is_restored(TaskDataCategory::Data) + if let Some(result) = storage_data2 + && let Err(e) = + apply_restore_result(&mut task2, result, SpecificTaskDataCategory::Data) { - task2.restore_from(storage, TaskDataCategory::Data); - task2.flags.set_restored(TaskDataCategory::Data); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore data for task {task_id2}: {e:?}"); } - if let Some(storage) = storage_meta2 - && !task2.flags.is_restored(TaskDataCategory::Meta) + if let Some(result) = storage_meta2 + && let Err(e) = + apply_restore_result(&mut task2, result, SpecificTaskDataCategory::Meta) { - task2.restore_from(storage, TaskDataCategory::Meta); - task2.flags.set_restored(TaskDataCategory::Meta); + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + panic!("Failed to restore meta for task {task_id2}: {e:?}"); + } + + if do_data1 || do_meta1 || do_data2 || do_meta2 { + // Drop both locks before notifying so woken threads don't + // immediately contend on the same DashMap shards. + drop(task1); + drop(task2); + self.backend.storage.restored.notify(usize::MAX); + let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2); + task1 = t1; + task2 = t2; } } + ( TaskGuardImpl { task: task1, diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index a19e75b1e0914..f2b8c67cadf07 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -10,7 +10,7 @@ use std::{ use thread_local::ThreadLocal; use turbo_bincode::TurboBincodeBuffer; -use turbo_tasks::{FxDashMap, TaskId, parallel}; +use turbo_tasks::{FxDashMap, TaskId, event::Event, parallel}; use crate::{ backend::storage_schema::TaskStorage, @@ -53,6 +53,15 @@ pub enum SpecificTaskDataCategory { Data, } +impl From for TaskDataCategory { + fn from(category: SpecificTaskDataCategory) -> Self { + match category { + SpecificTaskDataCategory::Meta => TaskDataCategory::Meta, + SpecificTaskDataCategory::Data => TaskDataCategory::Data, + } + } +} + impl SpecificTaskDataCategory { /// Returns the KeySpace for storing data of this category pub fn key_space(self) -> KeySpace { @@ -83,6 +92,11 @@ pub struct Storage { /// be marked as modified at the beginning of the next snapshot cycle. snapshots: FxDashMap>>, map: FxDashMap>, + /// A shared event notified whenever any task finishes restoring (successfully or not). + /// + /// Threads waiting for another thread's in-progress restore subscribe to this event, + /// then re-check the specific task's `restoring`/`restored` bits after waking. + pub(crate) restored: Event, } impl Storage { @@ -115,6 +129,7 @@ impl Storage { shard_amount, ), map, + restored: Event::new(|| || "Storage::restored".to_string()), } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs index e12a5309dd9d4..31f811945981e 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs @@ -161,6 +161,14 @@ struct TaskStorageSchema { #[field(storage = "flag", category = "transient")] data_restored: bool, + /// Whether meta data restoration is currently in progress by another thread. + #[field(storage = "flag", category = "transient")] + meta_restoring: bool, + + /// Whether data restoration is currently in progress by another thread. + #[field(storage = "flag", category = "transient")] + data_restoring: bool, + /// Whether meta was modified before snapshot mode was entered. #[field(storage = "flag", category = "transient")] meta_modified: bool, @@ -348,6 +356,31 @@ impl TaskFlags { } } + /// Check if the category's restoration is currently in progress by another thread + pub fn is_restoring(&self, category: TaskDataCategory) -> bool { + match category { + TaskDataCategory::Meta => self.meta_restoring(), + TaskDataCategory::Data => self.data_restoring(), + TaskDataCategory::All => self.meta_restoring() || self.data_restoring(), + } + } + + /// Set or clear the restoring bits for the given category + pub fn set_restoring(&mut self, category: TaskDataCategory, value: bool) { + match category { + TaskDataCategory::Meta => { + self.set_meta_restoring(value); + } + TaskDataCategory::Data => { + self.set_data_restoring(value); + } + TaskDataCategory::All => { + self.set_meta_restoring(value); + self.set_data_restoring(value); + } + } + } + /// Check if any snapshot flag is set pub fn any_modified_during_snapshot(&self) -> bool { self.meta_modified_during_snapshot() || self.data_modified_during_snapshot() diff --git a/turbopack/crates/turbo-tasks/src/event.rs b/turbopack/crates/turbo-tasks/src/event.rs index 8ed124cc1a19c..370665ec4c0bc 100644 --- a/turbopack/crates/turbo-tasks/src/event.rs +++ b/turbopack/crates/turbo-tasks/src/event.rs @@ -11,6 +11,7 @@ use std::{ time::Duration, }; +use event_listener::Listener as _; #[cfg(feature = "hanging_detection")] use tokio::time::{Timeout, timeout}; @@ -201,6 +202,17 @@ impl Future for EventListener { } } +#[cfg(not(feature = "hanging_detection"))] +impl EventListener { + /// Blocks the current thread until the event is notified. + /// + /// This is the synchronous equivalent of `.await`-ing the `EventListener`. + /// Only valid in synchronous contexts (e.g. backend operations). + pub fn wait(self) { + self.listener.wait(); + } +} + #[cfg(feature = "hanging_detection")] pub struct EventListener { description: Arc String + Sync + Send>, @@ -273,6 +285,22 @@ impl Future for EventListener { } } +#[cfg(feature = "hanging_detection")] +impl EventListener { + /// Blocks the current thread until the event is notified. + /// + /// Note: In `hanging_detection` builds, timeout warnings are not emitted + /// for sync waits (only for async `.await` usage). + pub fn wait(mut self) { + if let Some(future) = self.future.take() { + // SAFETY: EventListener is Unpin, so it's safe to move out of the Pin. + unsafe { std::pin::Pin::into_inner_unchecked(future) } + .into_inner() + .wait(); + } + } +} + #[cfg(all(test, not(feature = "hanging_detection")))] mod tests { use std::hint::black_box;