diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 0957abe259140f..43183881445130 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -106,6 +106,45 @@ struct TrackedStats { miss_global: std::sync::atomic::AtomicU64, } +/// A compact bitset for tracking which batch-lookup cells have been resolved. +/// +/// Uses `u64` words (1 bit per cell) rather than `Vec` (1 byte per cell) for an 8x +/// reduction in memory and better cache behaviour on large batches. +pub struct FoundBitset { + words: Box<[u64]>, + len: usize, +} + +impl FoundBitset { + pub(crate) fn new(len: usize) -> Self { + let words = vec![0u64; len.div_ceil(64)].into_boxed_slice(); + Self { words, len } + } + + #[inline] + pub(crate) fn get(&self, index: usize) -> bool { + debug_assert!(index < self.len); + (self.words[index / 64] >> (index % 64)) & 1 == 1 + } + + #[inline] + pub(crate) fn set(&mut self, index: usize) { + debug_assert!(index < self.len); + let wi = index / 64; + let bi = index % 64; + self.words[wi] |= 1 << bi; + } + + /// Returns the number of bits that are set. + pub(crate) fn count_ones(&self) -> usize { + self.words.iter().map(|w| w.count_ones()).sum::() as usize + } + + pub(crate) fn len(&self) -> usize { + self.len + } +} + /// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time /// using a single write batch. It allows for concurrent reads. pub struct TurboPersistence { @@ -1603,11 +1642,36 @@ impl TurboPersistence Ok(output) } + /// Looks up multiple keys in batch and collects results into a `Vec`. + /// + /// For large batches where memory pressure matters, prefer + /// [`batch_get_with`][Self::batch_get_with] which calls a callback per entry without + /// accumulating all decoded bytes simultaneously. pub fn batch_get( &self, family: usize, keys: &[K], ) -> Result>> { + let mut results = vec![None; keys.len()]; + self.batch_get_with(family, keys, |index, opt_bytes| { + results[index] = opt_bytes.map(|b| ArcBytes::from(b.to_vec().into_boxed_slice())); + Ok(()) + })?; + Ok(results) + } + + /// Looks up multiple keys in batch, calling `callback(index, Option<&[u8]>)` for each entry + /// immediately after it is resolved rather than accumulating all results into a `Vec`. + /// + /// This keeps at most one decompressed value block live at a time, significantly reducing + /// peak memory when the batch is large. The callback receives the 0-based key index and the + /// value bytes (`None` for not-found or deleted). Callback errors propagate immediately. + pub fn batch_get_with( + &self, + family: usize, + keys: &[K], + mut callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<()> { debug_assert!(family < FAMILIES, "Family index out of bounds"); if self.config.family_configs[family].kind != FamilyKind::SingleValue { // This is an error in our caller so just panic @@ -1622,22 +1686,38 @@ impl TurboPersistence result_size = tracing::field::Empty ) .entered(); - let mut cells: Vec<(u64, usize, Option)> = Vec::with_capacity(keys.len()); - let mut empty_cells = keys.len(); + let mut cells: Vec<(u64, usize)> = Vec::with_capacity(keys.len()); for (index, key) in keys.iter().enumerate() { let hash = hash_key(key); - cells.push((hash, index, None)); + cells.push((hash, index)); } - cells.sort_by_key(|(hash, _, _)| *hash); + cells.sort_unstable_by_key(|(hash, _)| *hash); + let mut found = FoundBitset::new(cells.len()); let inner = self.inner.read(); + let mut not_found = 0; + let mut deleted = 0; + let mut result_size = 0; + let mut read_blob = |seq| self.read_blob(seq); + // Wrap the callback to track stats (deleted/result_size) for found keys. + // not_found is tracked separately in the post-loop over cells. + let mut stats_callback = |index: usize, opt_bytes: Option<&[u8]>| -> Result<()> { + if let Some(bytes) = opt_bytes { + result_size += bytes.len(); + } else { + deleted += 1; + } + callback(index, opt_bytes) + }; for meta in inner.meta_files.iter().rev() { - let _result = meta.batch_lookup( + let (all_found, _result) = meta.batch_lookup_with( family as u32, keys, - &mut cells, - &mut empty_cells, + &cells, + &mut found, &self.key_block_cache, &self.value_block_cache, + &mut read_blob, + &mut stats_callback, )?; #[cfg(feature = "stats")] @@ -1669,49 +1749,25 @@ impl TurboPersistence } } - if empty_cells == 0 { + if all_found { break; } } - let mut deleted = 0; - let mut not_found = 0; - let mut result_size = 0; - let mut results = vec![None; keys.len()]; - for (hash, index, result) in cells { - if let Some(result) = result { + // Record accessed hashes for found keys, and fire callback for not-found keys. + for (hash, index) in cells.into_iter() { + if found.get(index) { inner.accessed_key_hashes[family].insert(hash); - let result = match result { - LookupValue::Deleted => { - #[cfg(feature = "stats")] - self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed); - deleted += 1; - None - } - LookupValue::Slice { value } => { - #[cfg(feature = "stats")] - self.stats.hits_small.fetch_add(1, Ordering::Relaxed); - result_size += value.len(); - Some(value) - } - LookupValue::Blob { sequence_number } => { - #[cfg(feature = "stats")] - self.stats.hits_blob.fetch_add(1, Ordering::Relaxed); - let blob = self.read_blob(sequence_number)?; - result_size += blob.len(); - Some(blob) - } - }; - results[index] = result; } else { #[cfg(feature = "stats")] self.stats.miss_global.fetch_add(1, Ordering::Relaxed); not_found += 1; + callback(index, None)?; } } span.record("not_found", not_found); span.record("deleted", deleted); span.record("result_size", result_size); - Ok(results) + Ok(()) } /// Returns database statistics. diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs index 86ee8fdf5ee6d0..2832cbb08ac7db 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file.rs @@ -14,7 +14,8 @@ use smallvec::SmallVec; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, big_endian as be}; use crate::{ - QueryKey, + ArcBytes, QueryKey, + db::FoundBitset, lookup_entry::LookupValue, mmap_helper::advise_mmap_for_persistence, static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData}, @@ -498,33 +499,44 @@ impl MetaFile { Ok(miss_result) } - pub fn batch_lookup( + pub fn batch_lookup_with( &self, key_family: u32, keys: &[K], - cells: &mut [(u64, usize, Option)], - empty_cells: &mut usize, + cells: &[(u64, usize)], + found: &mut FoundBitset, key_block_cache: &BlockCache, value_block_cache: &BlockCache, - ) -> Result { + read_blob: &mut impl FnMut(u32) -> Result, + callback: &mut impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<(bool, MetaBatchLookupResult)> { + debug_assert_eq!( + cells.len(), + found.len(), + "cells and found must have the same length" + ); if key_family != self.family { #[cfg(feature = "stats")] - return Ok(MetaBatchLookupResult { - family_miss: true, - ..Default::default() - }); + return Ok(( + false, + MetaBatchLookupResult { + family_miss: true, + ..Default::default() + }, + )); #[cfg(not(feature = "stats"))] - return Ok(MetaBatchLookupResult {}); + return Ok((false, MetaBatchLookupResult {})); } debug_assert!( - cells.is_sorted_by_key(|(hash, _, _)| *hash), + cells.is_sorted_by_key(|(hash, _)| *hash), "Cells must be sorted by key hash" ); #[allow(unused_mut, reason = "It's used when stats are enabled")] let mut lookup_result = MetaBatchLookupResult::default(); + let mut empty_cells = found.len() - found.count_ones(); for entry in self.entries.iter().rev() { - let start_index = cells - .binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater)) + let mut start_index = cells + .binary_search_by(|(hash, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater)) .err() .unwrap(); if start_index >= cells.len() { @@ -534,11 +546,23 @@ impl MetaFile { } continue; } - let end_index = cells - .binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less)) + // Advance to the first not found cell. + while start_index < cells.len() && found.get(cells[start_index].1) { + start_index += 1; + } + if start_index >= cells.len() { + #[cfg(feature = "stats")] + { + lookup_result.range_misses += 1; + } + continue; + } + let end_index = cells[start_index..] + .binary_search_by(|(hash, _)| hash.cmp(&entry.max_hash).then(Ordering::Less)) .err() .unwrap() - .checked_sub(1); + .checked_sub(1) + .map(|i| i + start_index); let Some(end_index) = end_index else { #[cfg(feature = "stats")] { @@ -553,12 +577,13 @@ impl MetaFile { } continue; } - for (hash, index, result) in &mut cells[start_index..=end_index] { + for (hash, index) in cells[start_index..=end_index].iter() { debug_assert!( *hash >= entry.min_hash && *hash <= entry.max_hash, "Key hash out of range" ); - if result.is_some() { + if found.get(*index) { + // we already found this key in a different meta-file continue; } if !entry.amqf.contains_fingerprint(*hash) { @@ -580,14 +605,27 @@ impl MetaFile { let Some(value) = values.pop() else { unreachable!() }; - *result = Some(value); - *empty_cells -= 1; + // Resolve and callback immediately — the ArcBytes/blob is dropped before + // the next iteration so we never hold more than one decompressed block at a + // time. + let blob; + let opt_bytes: Option<&[u8]> = match &value { + LookupValue::Deleted => None, + LookupValue::Slice { value } => Some(value), + LookupValue::Blob { sequence_number } => { + blob = read_blob(*sequence_number)?; + Some(&*blob) + } + }; + found.set(*index); + empty_cells -= 1; #[cfg(feature = "stats")] { lookup_result.hits += 1; } - if *empty_cells == 0 { - return Ok(lookup_result); + callback(*index, opt_bytes)?; + if empty_cells == 0 { + return Ok((true, lookup_result)); } } else { #[cfg(feature = "stats")] @@ -597,6 +635,6 @@ impl MetaFile { } } } - Ok(lookup_result) + Ok((false, lookup_result)) } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 9c89569aa3ad4a..b54bdae841df30 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -14,6 +14,7 @@ use std::{ use bincode::{Decode, Encode}; use tracing::trace_span; +use turbo_bincode::new_turbo_bincode_decoder; use turbo_tasks::{ CellId, FxIndexMap, TaskExecutionReason, TaskId, TaskPriority, TurboTasksBackendApi, TurboTasksCallApi, TypedSharedReference, backend::CachedTaskType, @@ -31,6 +32,11 @@ use crate::{ data::{ActivenessState, CollectibleRef, Dirtyness, InProgressState, TransientTask}, }; +struct TaskRestoreEntry { + task_id: TaskId, + category: TaskDataCategory, +} + pub trait Operation: Encode + Decode<()> + Default + TryFrom { fn execute(self, ctx: &mut impl ExecuteContext<'_>); } @@ -176,58 +182,20 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { self.backend.should_restore() && self.backend.local_is_partial } - fn restore_task_data( + fn lookup_task_data( &self, task_id: TaskId, category: SpecificTaskDataCategory, - ) -> TaskStorage { - if !self.should_check_backing_storage() { - // If we don't need to restore, we can just return an empty storage - return TaskStorage::default(); - } - let mut storage = TaskStorage::default(); - let result = self - .backend - .backing_storage - .lookup_data(task_id, category, &mut storage); - - match result { - Ok(()) => storage, - Err(e) => { - panic!( - "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!("{category:?} for {task_id})")) - ) - } - } - } - - fn restore_task_data_batch( - &self, - task_ids: &[TaskId], - category: SpecificTaskDataCategory, - ) -> Option> { - debug_assert!( - task_ids.len() > 1, - "Use restore_task_data_typed for single task" - ); + ) -> Option { if !self.should_check_backing_storage() { - // If we don't need to restore, we return None return None; } - let result = self - .backend - .backing_storage - .batch_lookup_data(task_ids, category); - match result { - Ok(result) => Some(result), + match self.backend.backing_storage.lookup_data(task_id, category) { + Ok(bytes) => bytes, Err(e) => { panic!( "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!( - "{category:?} for batch of {} tasks", - task_ids.len() - )) + e.context(format!("{category:?} for {task_id})")) ) } } @@ -255,7 +223,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { let mut data_count = 0; let mut meta_count = 0; let mut all_count = 0; - let mut tasks = task_ids + let tasks = task_ids .into_iter() .filter(|&(id, category)| { if id.is_transient() { @@ -278,18 +246,22 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { TaskDataCategory::Meta => meta_count += 1, TaskDataCategory::All => all_count += 1, }) - .map(|(id, category)| (id, category, None, None)) + .map(|(id, category)| TaskRestoreEntry { + task_id: id, + category, + }) .collect::>(); data_count += all_count; meta_count += all_count; span.record("requested_data", data_count); span.record("requested_meta", meta_count); - let mut tasks_to_restore_for_data = Vec::with_capacity(data_count); - let mut tasks_to_restore_for_data_indicies = Vec::with_capacity(data_count); - let mut tasks_to_restore_for_meta = Vec::with_capacity(meta_count); - let mut tasks_to_restore_for_meta_indicies = Vec::with_capacity(meta_count); - for (i, &(task_id, category, _, _)) in tasks.iter().enumerate() { + let mut tasks_to_restore_for_data: Vec = Vec::with_capacity(data_count); + let mut tasks_to_restore_for_data_indices: Vec = Vec::with_capacity(data_count); + let mut tasks_to_restore_for_meta: Vec = Vec::with_capacity(meta_count); + let mut tasks_to_restore_for_meta_indices: Vec = Vec::with_capacity(meta_count); + for (i, entry) in tasks.iter().enumerate() { + let (task_id, category) = (entry.task_id, entry.category); self.task_lock_counter.acquire(); let task = self.backend.storage.access_mut(task_id); @@ -298,14 +270,14 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { && !task.flags.is_restored(TaskDataCategory::Data) { tasks_to_restore_for_data.push(task_id); - tasks_to_restore_for_data_indicies.push(i); + tasks_to_restore_for_data_indices.push(i); ready = false; } if matches!(category, TaskDataCategory::Meta | TaskDataCategory::All) && !task.flags.is_restored(TaskDataCategory::Meta) { tasks_to_restore_for_meta.push(task_id); - tasks_to_restore_for_meta_indicies.push(i); + tasks_to_restore_for_meta_indices.push(i); ready = false; } self.task_lock_counter.release(); @@ -317,83 +289,141 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { return; } - match tasks_to_restore_for_data.len() { - 0 => {} - 1 => { - let task_id = tasks_to_restore_for_data[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Data); - let idx = tasks_to_restore_for_data_indicies[0]; - tasks[idx].2 = Some(data); - } - _ => { - if let Some(data) = self.restore_task_data_batch( - &tasks_to_restore_for_data, - SpecificTaskDataCategory::Data, - ) { - data.into_iter() - .zip(tasks_to_restore_for_data_indicies) - .for_each(|(item, idx)| { - tasks[idx].2 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_data_indicies { - tasks[idx].2 = Some(TaskStorage::default()); - } - } - } - } - match tasks_to_restore_for_meta.len() { - 0 => {} - 1 => { - let task_id = tasks_to_restore_for_meta[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Meta); - let idx = tasks_to_restore_for_meta_indicies[0]; - tasks[idx].3 = Some(data); - } - _ => { - if let Some(data) = self.restore_task_data_batch( - &tasks_to_restore_for_meta, - SpecificTaskDataCategory::Meta, - ) { - data.into_iter() - .zip(tasks_to_restore_for_meta_indicies) - .for_each(|(item, idx)| { - tasks[idx].3 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_meta_indicies { - tasks[idx].3 = Some(TaskStorage::new()); - } - } - } - } + // Restore data and meta batches. + // Each decoded TaskStorage is written into backend.storage immediately inside the + // batch_lookup_data callback, and prepared_task_callback is called as soon as + // task.flags.is_restored(entry.category) becomes true (i.e. all required categories + // for that entry are satisfied). The guard is passed directly to the callback while + // still held — no second pass needed. + self.restore_batch( + &tasks, + &tasks_to_restore_for_data, + &tasks_to_restore_for_data_indices, + SpecificTaskDataCategory::Data, + &mut prepared_task_callback, + ); + self.restore_batch( + &tasks, + &tasks_to_restore_for_meta, + &tasks_to_restore_for_meta_indices, + SpecificTaskDataCategory::Meta, + &mut prepared_task_callback, + ); + } - for (task_id, category, storage_for_data, storage_for_meta) in tasks { - if storage_for_data.is_none() && storage_for_meta.is_none() { - continue; - } - self.task_lock_counter.acquire(); + /// Decodes a batch of task data from backing storage, writing each `TaskStorage` immediately + /// into `backend.storage` as it arrives so it can be freed before the next one is decoded. + /// + /// After the database read completes, calls `prepared_task_callback` for each entry whose + /// `task.flags.is_restored(entry.category)` is now true — i.e. all required categories are + /// satisfied. For `All`-category entries this fires in the meta batch (after both categories). + fn restore_batch( + &mut self, + tasks: &[TaskRestoreEntry], + task_ids: &[TaskId], + task_indices: &[usize], + category: SpecificTaskDataCategory, + prepared_task_callback: &mut impl FnMut( + &mut Self, + TaskId, + TaskDataCategory, + StorageWriteGuard<'e>, + ), + ) { + if task_ids.is_empty() { + return; + } + // Shared per-task logic: decode `opt_bytes` into the live task (if not already restored), + // set the restored flag, and fire `prepared_task_callback` when `entry.category` is fully + // satisfied. + let process_task = |this: &mut Self, + entry: &TaskRestoreEntry, + opt_bytes: Option<&[u8]>, + prepared_task_callback: &mut dyn FnMut( + &mut Self, + TaskId, + TaskDataCategory, + StorageWriteGuard<'e>, + )| { + this.task_lock_counter.acquire(); + let mut task = this.backend.storage.access_mut(entry.task_id); + let task_data_category = match category { + SpecificTaskDataCategory::Data => TaskDataCategory::Data, + SpecificTaskDataCategory::Meta => TaskDataCategory::Meta, + }; let mut task_type = None; - let mut task = self.backend.storage.access_mut(task_id); - if let Some(storage) = storage_for_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); - task.flags.set_restored(TaskDataCategory::Data); - task_type = task.get_persistent_task_type().cloned() + if !task.flags.is_restored(task_data_category) { + if let Some(bytes) = opt_bytes { + let mut decoder = new_turbo_bincode_decoder(bytes); + task.decode(category, &mut decoder).unwrap_or_else(|e| { + panic!( + "Failed to decode {category:?} for {:?}: {e:?}", + entry.task_id + ) + }); + } + // the else branch is simple, there was no backing data so mark as restored so we + // don't try again + task.flags.set_restored(task_data_category); + // Cache the task type mapping after restoring data (only data has task type). + if category == SpecificTaskDataCategory::Data { + task_type = task.get_persistent_task_type().cloned(); + } } - if let Some(storage) = storage_for_meta - && !task.flags.is_restored(TaskDataCategory::Meta) - { - task.restore_from(storage, TaskDataCategory::Meta); - task.flags.set_restored(TaskDataCategory::Meta); + if task.flags.is_restored(entry.category) { + this.task_lock_counter.release(); + prepared_task_callback(this, entry.task_id, entry.category, task); + } else { + drop(task); + this.task_lock_counter.release(); } - self.task_lock_counter.release(); - prepared_task_callback(self, task_id, category, task); if let Some(task_type) = task_type { - // Insert into the task cache to avoid future lookups - self.backend.task_cache.entry(task_type).or_insert(task_id); + this.backend + .task_cache + .entry(task_type) + .or_insert(entry.task_id); + } + }; + + if self.backend.should_restore() && self.backend.local_is_partial { + if task_ids.len() == 1 { + // Avoid batch_lookup_data overhead for the single-key case. + let entry = &tasks[task_indices[0]]; + let opt_bytes = self + .backend + .backing_storage + .lookup_data(entry.task_id, category) + .unwrap_or_else(|e| { + panic!( + "Failed to restore task data (corrupted database or bug): {:?}", + e.context(format!("{category:?} for {:?}", entry.task_id)) + ) + }); + process_task(self, entry, opt_bytes.as_deref(), prepared_task_callback); + } else { + let result = self.backend.backing_storage.batch_lookup_data( + task_ids, + category, + &mut |index, opt_bytes| { + let entry = &tasks[task_indices[index]]; + process_task(self, entry, opt_bytes, prepared_task_callback); + }, + ); + if let Err(e) = result { + panic!( + "Failed to restore task data (corrupted database or bug): {:?}", + e.context(format!( + "{category:?} for batch of {} tasks", + task_ids.len() + )) + ) + } + } + } else { + // No backing storage — restore all tasks with empty data. + for &task_index in task_indices { + process_task(self, &tasks[task_index], None, prepared_task_callback); } } } @@ -427,28 +457,39 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta); if needs_data || needs_meta { - // Avoid holding the lock too long since this can also affect other tasks - // Drop lock once, do all I/O, then re-acquire once + // Avoid holding the lock too long since this can also affect other tasks. + // Drop lock once, do all I/O (returning raw bytes), then re-acquire once + // and decode directly into the live task — no scratch TaskStorage needed. drop(task); - let storage_data = needs_data - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); - let storage_meta = needs_meta - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); + let bytes_data = needs_data + .then(|| self.lookup_task_data(task_id, SpecificTaskDataCategory::Data)); + let bytes_meta = needs_meta + .then(|| self.lookup_task_data(task_id, SpecificTaskDataCategory::Meta)); task = self.backend.storage.access_mut(task_id); - // Handle race conditions and merge - if let Some(storage) = storage_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); + // Handle race conditions and decode directly into live storage. + // set_restored is called even when no bytes found (task simply has no + // persisted data for this category). + if needs_data && !task.flags.is_restored(TaskDataCategory::Data) { + if let Some(bytes) = bytes_data.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task.decode(SpecificTaskDataCategory::Data, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Data for {task_id:?}: {e:?}") + }); + } task.flags.set_restored(TaskDataCategory::Data); } - if let Some(storage) = storage_meta - && !task.flags.is_restored(TaskDataCategory::Meta) - { - task.restore_from(storage, TaskDataCategory::Meta); + if needs_meta && !task.flags.is_restored(TaskDataCategory::Meta) { + if let Some(bytes) = bytes_meta.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task.decode(SpecificTaskDataCategory::Meta, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Meta for {task_id:?}: {e:?}") + }); + } task.flags.set_restored(TaskDataCategory::Meta); } } @@ -521,47 +562,69 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { category.includes_meta() && !task2.flags.is_restored(TaskDataCategory::Meta); if needs_data1 || needs_meta1 || needs_data2 || needs_meta2 { - // Avoid holding the lock too long since this can also affect other tasks - // Drop locks once, do all I/O, then re-acquire once + // Avoid holding the lock too long since this can also affect other tasks. + // Drop locks once, do all I/O (returning raw bytes), then re-acquire once + // and decode directly into the live tasks — no scratch TaskStorage needed. drop(task1); drop(task2); - let storage_data1 = needs_data1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); - let storage_meta1 = needs_meta1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); - let storage_data2 = needs_data2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); - let storage_meta2 = needs_meta2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); + let bytes_data1 = needs_data1 + .then(|| self.lookup_task_data(task_id1, SpecificTaskDataCategory::Data)); + let bytes_meta1 = needs_meta1 + .then(|| self.lookup_task_data(task_id1, SpecificTaskDataCategory::Meta)); + let bytes_data2 = needs_data2 + .then(|| self.lookup_task_data(task_id2, SpecificTaskDataCategory::Data)); + let bytes_meta2 = needs_meta2 + .then(|| self.lookup_task_data(task_id2, SpecificTaskDataCategory::Meta)); let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2); task1 = t1; task2 = t2; - // Merge results, handling race conditions - if let Some(storage) = storage_data1 - && !task1.flags.is_restored(TaskDataCategory::Data) - { - task1.restore_from(storage, TaskDataCategory::Data); + // Decode directly into live storage, handling race conditions. + // set_restored is called even when no bytes found. + if needs_data1 && !task1.flags.is_restored(TaskDataCategory::Data) { + if let Some(bytes) = bytes_data1.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task1 + .decode(SpecificTaskDataCategory::Data, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Data for {task_id1:?}: {e:?}") + }); + } task1.flags.set_restored(TaskDataCategory::Data); } - if let Some(storage) = storage_meta1 - && !task1.flags.is_restored(TaskDataCategory::Meta) - { - task1.restore_from(storage, TaskDataCategory::Meta); + if needs_meta1 && !task1.flags.is_restored(TaskDataCategory::Meta) { + if let Some(bytes) = bytes_meta1.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task1 + .decode(SpecificTaskDataCategory::Meta, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Meta for {task_id1:?}: {e:?}") + }); + } task1.flags.set_restored(TaskDataCategory::Meta); } - if let Some(storage) = storage_data2 - && !task2.flags.is_restored(TaskDataCategory::Data) - { - task2.restore_from(storage, TaskDataCategory::Data); + if needs_data2 && !task2.flags.is_restored(TaskDataCategory::Data) { + if let Some(bytes) = bytes_data2.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task2 + .decode(SpecificTaskDataCategory::Data, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Data for {task_id2:?}: {e:?}") + }); + } task2.flags.set_restored(TaskDataCategory::Data); } - if let Some(storage) = storage_meta2 - && !task2.flags.is_restored(TaskDataCategory::Meta) - { - task2.restore_from(storage, TaskDataCategory::Meta); + if needs_meta2 && !task2.flags.is_restored(TaskDataCategory::Meta) { + if let Some(bytes) = bytes_meta2.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task2 + .decode(SpecificTaskDataCategory::Meta, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Meta for {task_id2:?}: {e:?}") + }); + } task2.flags.set_restored(TaskDataCategory::Meta); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index 8fd957c354f409..9e3a3179cc80b2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -4,10 +4,11 @@ use anyhow::Result; use either::Either; use smallvec::SmallVec; use turbo_bincode::TurboBincodeBuffer; +use turbo_persistence::ArcBytes; use turbo_tasks::{TaskId, backend::CachedTaskType}; use crate::{ - backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage}, + backend::{AnyOperation, SpecificTaskDataCategory}, utils::chunked_vec::ChunkedVec, }; @@ -69,22 +70,33 @@ pub trait BackingStorageSealed: 'static + Send + Sync { /// The caller must verify each returned TaskId by comparing the stored task type which will /// require a second database read fn lookup_task_candidates(&self, key: &CachedTaskType) -> Result>; - /// Looks up and decodes persisted data for a single task, updating the provided storage with - /// data from the database in the given category. + /// Looks up raw persisted bytes for a single task. + /// + /// Returns `None` if the task has no persisted data for the given category. + /// The caller is responsible for decoding the returned bytes. The returned + /// [`ArcBytes`] is ref-counted and zero-copy when backed by a memory-mapped file. fn lookup_data( &self, task_id: TaskId, category: SpecificTaskDataCategory, - storage: &mut TaskStorage, - ) -> Result<()>; + ) -> Result>; - /// Batch lookup and decode data for multiple tasks directly into TypedStorage instances. - /// Returns a vector of TypedStorage, one for each task_id in the input slice. + /// Batch lookup raw bytes for multiple tasks, calling + /// `callback(index, Option<&[u8]>)` for every task in `task_ids` immediately after its + /// entry is resolved. Index corresponds to the position in `task_ids`. `None` means the key + /// was not found in the database (no allocation is made for missing keys). + /// + /// The caller is responsible for decoding the bytes. The byte slice is only valid for the + /// duration of the callback invocation and must not be stored. + /// + /// This avoids allocating an intermediate `TaskStorage` per entry — callers can decode + /// directly into the live in-memory storage, reducing peak memory usage for large batches. fn batch_lookup_data( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Result>; + callback: &mut dyn FnMut(usize, Option<&[u8]>), + ) -> Result<()>; fn compact(&self) -> Result { Ok(false) @@ -142,17 +154,17 @@ where &self, task_id: TaskId, category: SpecificTaskDataCategory, - storage: &mut TaskStorage, - ) -> Result<()> { - either::for_both!(self, this => this.lookup_data(task_id, category, storage)) + ) -> Result> { + either::for_both!(self, this => this.lookup_data(task_id, category)) } fn batch_lookup_data( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Result> { - either::for_both!(self, this => this.batch_lookup_data(task_ids, category)) + callback: &mut dyn FnMut(usize, Option<&[u8]>), + ) -> Result<()> { + either::for_both!(self, this => this.batch_lookup_data(task_ids, category, callback)) } fn compact(&self) -> Result { diff --git a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs index c5f5da9c35b3b4..8154f4415b8724 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs @@ -1,6 +1,6 @@ use anyhow::Result; use smallvec::SmallVec; -use turbo_persistence::{FamilyConfig, FamilyKind}; +use turbo_persistence::{ArcBytes, FamilyConfig, FamilyKind}; use crate::database::write_batch::ConcurrentWriteBatch; @@ -57,36 +57,26 @@ pub trait KeyValueDatabase { false } - type ValueBuffer<'l>: std::borrow::Borrow<[u8]> - where - Self: 'l; - - fn get(&self, key_space: KeySpace, key: &[u8]) -> Result>>; + fn get(&self, key_space: KeySpace, key: &[u8]) -> Result>; /// Looks up a key and returns all matching values. /// /// Useful for keyspaces where keys are hashes and collisions are possible (e.g., TaskCache). /// The default implementation returns at most one value (from `get`), but implementations /// that support multiple values per key should override this. - fn get_multiple( - &self, - key_space: KeySpace, - key: &[u8], - ) -> Result; 1]>> { - Ok(self.get(key_space, key)?.into_iter().collect()) - } + fn get_multiple(&self, key_space: KeySpace, key: &[u8]) -> Result>; - fn batch_get( + /// Calls `callback(index, Option<&[u8]>)` for each key immediately after it is resolved, + /// rather than accumulating all results into a `Vec`. This avoids holding every decompressed + /// value simultaneously in memory. + /// + /// The default implementation falls back to sequential `get` calls. Implementations backed by + /// [`TurboPersistence`] override this to use the native streaming path. + fn batch_get_with( &self, key_space: KeySpace, keys: &[&[u8]], - ) -> Result>>> { - let mut results = Vec::with_capacity(keys.len()); - for key in keys { - let value = self.get(key_space, key)?; - results.push(value); - } - Ok(results) - } + callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<()>; type ConcurrentWriteBatch<'l>: ConcurrentWriteBatch<'l> where diff --git a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs index 893b29fae04149..8cbf6d5ecca50b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use smallvec::SmallVec; +use turbo_persistence::ArcBytes; use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, @@ -8,14 +10,23 @@ use crate::database::{ pub struct NoopKvDb; impl KeyValueDatabase for NoopKvDb { - type ValueBuffer<'l> - = &'l [u8] - where - Self: 'l; - - fn get(&self, _key_space: KeySpace, _key: &[u8]) -> Result>> { + fn get(&self, _key_space: KeySpace, _key: &[u8]) -> Result> { Ok(None) } + fn get_multiple(&self, _key_space: KeySpace, _key: &[u8]) -> Result> { + Ok(SmallVec::new()) + } + fn batch_get_with( + &self, + _key_space: KeySpace, + keys: &[&[u8]], + mut callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<()> { + for (index, _key) in keys.iter().enumerate() { + callback(index, None)?; + } + Ok(()) + } type ConcurrentWriteBatch<'l> = NoopWriteBatch diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs index a28fb211e05af3..8b62799a692ca6 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs @@ -87,28 +87,20 @@ impl KeyValueDatabase for TurboKeyValueDatabase { self.db.is_empty() } - type ValueBuffer<'l> - = ArcBytes - where - Self: 'l; - - fn get(&self, key_space: KeySpace, key: &[u8]) -> Result>> { + fn get(&self, key_space: KeySpace, key: &[u8]) -> Result> { self.db.get(key_space as usize, &key) } - fn batch_get( + fn batch_get_with( &self, key_space: KeySpace, keys: &[&[u8]], - ) -> Result>>> { - self.db.batch_get(key_space as usize, keys) + callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<()> { + self.db.batch_get_with(key_space as usize, keys, callback) } - fn get_multiple( - &self, - key_space: KeySpace, - key: &[u8], - ) -> Result; 1]>> { + fn get_multiple(&self, key_space: KeySpace, key: &[u8]) -> Result> { self.db.get_multiple(key_space as usize, &key) } diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 5df10a67875fcc..eff88cd005dd1f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -7,7 +7,8 @@ use std::{ use anyhow::{Context, Result}; use smallvec::SmallVec; -use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode}; +use turbo_bincode::{turbo_bincode_decode, turbo_bincode_encode}; +use turbo_persistence::ArcBytes; use turbo_tasks::{ TaskId, backend::CachedTaskType, @@ -18,7 +19,7 @@ use turbo_tasks_hash::Xxh3Hash64Hasher; use crate::{ GitVersionInfo, - backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage}, + backend::{AnyOperation, SpecificTaskDataCategory}, backing_storage::{BackingStorage, BackingStorageSealed, SnapshotItem}, database::{ db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db}, @@ -47,11 +48,6 @@ impl AsRef<[u8]> for IntKey { } } -fn as_u32(bytes: impl Borrow<[u8]>) -> Result { - let n = u32::from_le_bytes(bytes.borrow().try_into()?); - Ok(n) -} - // We want to invalidate the cache on panic for most users, but this is a band-aid to underlying // problems in turbo-tasks. // @@ -187,7 +183,7 @@ impl KeyValueDatabaseBackingStorageInner { fn get_infra_u32(&self, key: u32) -> Result> { self.database .get(KeySpace::Infra, IntKey::new(key).as_ref())? - .map(as_u32) + .map(|b| Ok(u32::from_le_bytes((*b).try_into()?))) .transpose() } } @@ -212,16 +208,13 @@ impl BackingStorageSealed } fn uncompleted_operations(&self) -> Result> { - fn get(database: &impl KeyValueDatabase) -> Result> { - let Some(operations) = - database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())? - else { - return Ok(Vec::new()); - }; - let operations = turbo_bincode_decode(operations.borrow())?; - Ok(operations) - } - get(&self.inner.database).context("Unable to read uncompleted operations from database") + self.inner + .database + .get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())? + .map(|b| turbo_bincode_decode(&b)) + .transpose() + .map(Option::unwrap_or_default) + .context("Unable to read uncompleted operations from database") } fn save_snapshot( @@ -298,14 +291,14 @@ impl BackingStorageSealed } fn lookup_task_candidates(&self, task_type: &CachedTaskType) -> Result> { - let inner = &*self.inner; - if inner.database.is_empty() { + if self.inner.database.is_empty() { // Checking if the database is empty is a performance optimization // to avoid computing the hash. return Ok(SmallVec::new()); } let hash = compute_task_type_hash(task_type); - let buffers = inner + let buffers = self + .inner .database .get_multiple(KeySpace::TaskCache, &hash.to_le_bytes()) .with_context(|| { @@ -314,7 +307,7 @@ impl BackingStorageSealed let mut task_ids = SmallVec::with_capacity(buffers.len()); for bytes in buffers { - let bytes = bytes.borrow().try_into()?; + let bytes = (*bytes).try_into()?; let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap(); task_ids.push(id); } @@ -325,54 +318,35 @@ impl BackingStorageSealed &self, task_id: TaskId, category: SpecificTaskDataCategory, - storage: &mut TaskStorage, - ) -> Result<()> { - let inner = &*self.inner; - let Some(bytes) = inner + ) -> Result> { + self.inner .database .get(category.key_space(), IntKey::new(*task_id).as_ref()) - .with_context(|| { - format!("Looking up task storage for {task_id} from database failed") - })? - else { - return Ok(()); - }; - let mut decoder = new_turbo_bincode_decoder(bytes.borrow()); - storage - .decode(category, &mut decoder) - .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}")) + .with_context(|| format!("Looking up task storage for {task_id} from database failed")) } fn batch_lookup_data( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Result> { - let inner = &*self.inner; + callback: &mut dyn FnMut(usize, Option<&[u8]>), + ) -> Result<()> { let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect(); let keys = int_keys.iter().map(|k| k.as_ref()).collect::>(); - let bytes = inner + self.inner .database - .batch_get(category.key_space(), &keys) + .batch_get_with(category.key_space(), &keys, |index, opt_bytes| { + callback(index, opt_bytes); + // `opt_bytes` (and any underlying ArcBytes) is dropped here before the next + // iteration, so only one decompressed block is live at a time. + Ok(()) + }) .with_context(|| { format!( "Looking up typed data for {} tasks from database failed", task_ids.len() ) - })?; - bytes - .into_iter() - .map(|opt_bytes| { - let mut storage = TaskStorage::new(); - if let Some(bytes) = opt_bytes { - let mut decoder = new_turbo_bincode_decoder(bytes.borrow()); - storage - .decode(category, &mut decoder) - .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?; - } - Ok(storage) }) - .collect::>>() } fn compact(&self) -> Result { diff --git a/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs b/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs index 247c0482c11775..cd651dbf19ea76 100644 --- a/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs @@ -662,18 +662,6 @@ fn gen_clone_inline_fields<'a>(fields: impl Iterator) -> V .collect() } -/// Generate inline field restore assignments: `self.field = source.field;` -fn gen_restore_inline_fields<'a>(fields: impl Iterator) -> Vec { - fields - .map(|field| { - let field_name = &field.field_name; - quote! { - self.#field_name = source.#field_name; - } - }) - .collect() -} - /// Generate lazy field match arms with a custom body that also receives the index. /// `LazyField::Variant(data) => { }` /// @@ -3080,14 +3068,6 @@ fn gen_clone_lazy_arms_for_category( }) } -/// Generate restore inline statements for a category. -fn gen_restore_inline_for_category( - grouped_fields: &GroupedFields, - category: Category, -) -> Vec { - gen_restore_inline_fields(grouped_fields.persistent_inline(category)) -} - /// Generate snapshot clone and restore methods for TaskStorage. /// /// Generates: @@ -3098,56 +3078,12 @@ fn gen_restore_inline_for_category( /// - `restore_data_from(&mut self, source)` - Restore data fields from source /// - `restore_all_from(&mut self, source)` - Restore all fields from source fn generate_snapshot_restore_methods(grouped_fields: &GroupedFields) -> TokenStream { - let has_meta_flags = grouped_fields.persisted_meta_flags().next().is_some(); - let has_data_flags = grouped_fields.persisted_data_flags().next().is_some(); - let has_any_flags = has_meta_flags || has_data_flags; - // Generate field operations by category let clone_meta_inline = gen_clone_inline_for_category(grouped_fields, Category::Meta); let clone_data_inline = gen_clone_inline_for_category(grouped_fields, Category::Data); let clone_meta_lazy_arms = gen_clone_lazy_arms_for_category(grouped_fields, Category::Meta); let clone_data_lazy_arms = gen_clone_lazy_arms_for_category(grouped_fields, Category::Data); - let restore_meta_inline = gen_restore_inline_for_category(grouped_fields, Category::Meta); - let restore_data_inline = gen_restore_inline_for_category(grouped_fields, Category::Data); - - let clone_all_flags = if has_any_flags { - quote! { - // Clone all persisted flags - snapshot.flags.set_persisted_bits(self.flags.persisted_bits()); - } - } else { - quote! {} - }; - - // Generate flags handling for restore - per category - let restore_meta_flags = if has_meta_flags { - quote! { - // Restore persisted meta flags (preserve other flags) - self.flags.set_persisted_meta_bits(source.flags.persisted_meta_bits()); - } - } else { - quote! {} - }; - - let restore_data_flags = if has_data_flags { - quote! { - // Restore persisted data flags (preserve other flags) - self.flags.set_persisted_data_bits(source.flags.persisted_data_bits()); - } - } else { - quote! {} - }; - - let restore_all_flags = if has_any_flags { - quote! { - // Restore all persisted flags (preserve transient flags) - self.flags.set_persisted_bits(source.flags.persisted_bits()); - } - } else { - quote! {} - }; - quote! { #[automatically_derived] impl TaskStorage { @@ -3161,7 +3097,8 @@ fn generate_snapshot_restore_methods(grouped_fields: &GroupedFields) -> TokenStr // Clone inline data fields #(#clone_data_inline)* - #clone_all_flags + // Clone all persisted flags + snapshot.flags.set_persisted_bits(self.flags.persisted_bits()); // Pre-allocate lazy vec (upper bound - some may be transient and skipped) snapshot.lazy.reserve(self.lazy.len()); @@ -3179,103 +3116,6 @@ fn generate_snapshot_restore_methods(grouped_fields: &GroupedFields) -> TokenStr snapshot } - /// Restore persisted data from a decoded TaskStorage. - /// - /// This is used during restore operations to copy decoded persisted data - /// into the task's existing storage. It preserves transient state (flags, - /// transient fields) while restoring the persisted data. - /// - /// # Invariant - /// - /// This method assumes the target does NOT already have the persistent fields - /// being restored. This is guaranteed by the restore protocol which only calls - /// this once per category when the task is first accessed. Debug assertions - /// verify this invariant. - /// - /// The `category` parameter specifies which category of data to restore: - /// - `Meta`: Restore meta fields (aggregation_number, output, upper, dirty, etc.) - /// - `Data`: Restore data fields (output_dependent, dependencies, cell_data, etc.) - /// - `All`: Restore both meta and data fields - pub fn restore_from( - &mut self, - source: TaskStorage, - category: crate::backend::TaskDataCategory, - ) { - match category { - crate::backend::TaskDataCategory::Meta => self.restore_meta_from(source), - crate::backend::TaskDataCategory::Data => self.restore_data_from(source), - crate::backend::TaskDataCategory::All => self.restore_all_from(source), - } - } - - /// Restore meta category fields from source. - /// - /// Debug assertions verify that the target doesn't already have the lazy fields - /// being restored. - fn restore_meta_from(&mut self, source: TaskStorage) { - // Debug assertion: verify target doesn't already have persistent meta lazy fields - debug_assert!( - !self.lazy.iter().any(|f| f.is_persistent() && f.is_meta()), - "restore_meta_from called on storage that already has persistent meta lazy fields" - ); - - // Inline meta fields - direct assignment - #(#restore_meta_inline)* - - #restore_meta_flags - - // Extend lazy vec with persistent meta fields from source - self.lazy.extend( - source.lazy.into_iter().filter(|f| f.is_persistent() && f.is_meta()) - ); - } - - /// Restore data category fields from source. - /// - /// Debug assertions verify that the target doesn't already have the lazy fields - /// being restored. - fn restore_data_from(&mut self, source: TaskStorage) { - // Debug assertion: verify target doesn't already have persistent data lazy fields - debug_assert!( - !self.lazy.iter().any(|f| f.is_persistent() && f.is_data()), - "restore_data_from called on storage that already has persistent data lazy fields" - ); - - // Inline data fields - direct assignment - #(#restore_data_inline)* - - #restore_data_flags - - // Extend lazy vec with persistent data fields from source - self.lazy.extend( - source.lazy.into_iter().filter(|f| f.is_persistent() && f.is_data()) - ); - } - - /// Restore all fields from source (both meta and data). - /// - /// Debug assertions verify that the target doesn't already have the lazy fields - /// being restored. - fn restore_all_from(&mut self, source: TaskStorage) { - // Debug assertion: verify target doesn't already have any persistent lazy fields - debug_assert!( - !self.lazy.iter().any(|f| f.is_persistent()), - "restore_all_from called on storage that already has persistent lazy fields" - ); - - // Inline meta fields - direct assignment - #(#restore_meta_inline)* - - // Inline data fields - direct assignment - #(#restore_data_inline)* - - #restore_all_flags - - // Extend lazy vec with all persistent fields from source - self.lazy.extend( - source.lazy.into_iter().filter(|f| f.is_persistent()) - ); - } } } }