diff --git a/packages/next/src/build/swc/generated-native.d.ts b/packages/next/src/build/swc/generated-native.d.ts index 9d86d75cc5961..615d6fbe6ab0d 100644 --- a/packages/next/src/build/swc/generated-native.d.ts +++ b/packages/next/src/build/swc/generated-native.d.ts @@ -35,6 +35,43 @@ export declare class ExternalObject { [K: symbol]: T } } +/** + * Called by worker after WASM instantiation to register a dispatch callback. + * + * The `ops` object must have these methods: + * - transform(programPtr, programLen, unresolvedMark, commentsProxy) → number + * - getDiag() → number + * - readBuf(ptr, len) → Buffer + * - writeBuf(ptr, data: Buffer) → void + * - alloc(size) → number + * - free(ptr, size) → number + * + * A TSFN is created targeting this worker's event loop. When Rust needs to + * call a WASM export, it posts to this TSFN, which runs the appropriate + * method on `ops` and sends the result back via a sync channel. + */ +export declare function wasmWorkerRegisterCallback( + runtimeId: number, + instanceId: number, + ops: object +): void +/** + * Called by worker when WASM hits a host function import. + * Runs the Func closure synchronously on the worker thread and returns the result. + * + * The worker provides a `memoryAccessor` object with methods to read/write WASM memory: + * - readBuf(ptr, len) → Buffer + * - writeBuf(ptr, data: Buffer) → void + * - alloc(size) → number + * - free(ptr, size) → number + */ +export declare function wasmWorkerDispatchHostFn( + runtimeId: number, + instanceId: number, + fnIndex: number, + args: Array, + memoryAccessor: object +): unknown export declare function registerWorkerScheduler( creator: (arg: NapiWorkerCreation) => any, terminator: (arg: NapiWorkerTermination) => any diff --git a/turbopack/crates/turbo-persistence/benches/mod.rs b/turbopack/crates/turbo-persistence/benches/mod.rs index d207fa5dbb4f3..dfa44cc9a23b5 100644 --- a/turbopack/crates/turbo-persistence/benches/mod.rs +++ b/turbopack/crates/turbo-persistence/benches/mod.rs @@ -576,6 +576,7 @@ fn prefill_multi_value_database( ) -> Result>> { let db_config = TpDbConfig { family_configs: [FamilyConfig { + name: "test", kind: FamilyKind::MultiValue, }], }; @@ -648,6 +649,7 @@ fn setup_prefilled_multi_value_db( fn open_multi_value_db(path: &Path) -> TurboPersistence { let db_config = TpDbConfig { family_configs: [FamilyConfig { + name: "test", kind: FamilyKind::MultiValue, }], }; @@ -914,6 +916,7 @@ fn bench_write_multi_value(c: &mut Criterion) { |(tempdir, keys, random_data)| { let db_config = TpDbConfig { family_configs: [FamilyConfig { + name: "test", kind: FamilyKind::MultiValue, }], }; @@ -1203,7 +1206,7 @@ fn bench_static_sorted_file_lookup(c: &mut Criterion) { }, |(key, hash)| { let result = sst - .lookup::<_, false>(hash, &key, key_block_cache, value_block_cache) + .lookup::<_, _, _, false>(hash, &key, key_block_cache, value_block_cache) .unwrap(); black_box(result) }, @@ -1222,7 +1225,7 @@ fn bench_static_sorted_file_lookup(c: &mut Criterion) { |i| keys[i as usize % keys.len()], |(key, hash)| { let result = sst - .lookup::<_, false>(hash, &key, key_block_cache, value_block_cache) + .lookup::<_, _, _, false>(hash, &key, key_block_cache, value_block_cache) .unwrap(); black_box(result) }, @@ -1248,7 +1251,7 @@ fn bench_static_sorted_file_lookup(c: &mut Criterion) { }, |(key, hash)| { let result = sst - .lookup::<_, false>(hash, &key, key_block_cache, value_block_cache) + .lookup::<_, _, _, false>(hash, &key, key_block_cache, value_block_cache) .unwrap(); black_box(result) }, @@ -1281,7 +1284,7 @@ fn bench_static_sorted_file_lookup(c: &mut Criterion) { }, |(key, hash)| { let result = sst - .lookup::<_, false>(*hash, &key, key_block_cache, value_block_cache) + .lookup::<_, _, _, false>(*hash, &key, key_block_cache, value_block_cache) .unwrap(); black_box(result) }, diff --git a/turbopack/crates/turbo-persistence/src/bin/sst_inspect.rs b/turbopack/crates/turbo-persistence/src/bin/sst_inspect.rs index dc23188d0109e..0a2666ba7d51b 100644 --- a/turbopack/crates/turbo-persistence/src/bin/sst_inspect.rs +++ b/turbopack/crates/turbo-persistence/src/bin/sst_inspect.rs @@ -24,6 +24,7 @@ use turbo_persistence::{ BLOCK_HEADER_SIZE, checksum_block, meta_file::MetaFile, mmap_helper::advise_mmap_for_persistence, + sst_filter::SstFilter, static_sorted_file::{ BLOCK_TYPE_FIXED_KEY_NO_HASH, BLOCK_TYPE_FIXED_KEY_WITH_HASH, BLOCK_TYPE_KEY_NO_HASH, BLOCK_TYPE_KEY_WITH_HASH, KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, @@ -205,33 +206,68 @@ fn format_bytes(bytes: u64) -> String { } } -/// Collect SST info from all meta files in the database directory +/// Collect SST info from all active meta files in the database directory, +/// mirroring the DB's own open logic: read CURRENT, filter by .del files, +/// and apply SstFilter to skip superseded entries. fn collect_sst_info(db_path: &Path) -> Result>> { - let mut meta_files: Vec = fs::read_dir(db_path)? - .filter_map(|entry| entry.ok()) - .map(|entry| entry.path()) - .filter(|path| path.extension().is_some_and(|ext| ext == "meta")) - .collect(); + // Read the CURRENT sequence number — only files with seq <= current are valid. + let current: u32 = File::open(db_path.join("CURRENT")) + .context("Failed to open CURRENT file")? + .read_u32::() + .context("Failed to read CURRENT file")?; + + // Read .del files to find sequences that were deleted but not yet cleaned up. + let mut deleted_seqs: HashSet = HashSet::new(); + for entry in fs::read_dir(db_path)? { + let path = entry?.path(); + if path.extension().and_then(|s| s.to_str()) == Some("del") { + let content = fs::read(&path)?; + let mut cursor: &[u8] = &content; + while !cursor.is_empty() { + deleted_seqs.insert(cursor.read_u32::()?); + } + } + } - meta_files.sort(); + // Collect valid meta sequence numbers. + let mut meta_seqs: Vec = fs::read_dir(db_path)? + .filter_map(|e| e.ok()) + .filter_map(|e| { + let path = e.path(); + if path.extension().and_then(|s| s.to_str()) != Some("meta") { + return None; + } + let seq: u32 = path.file_stem()?.to_str()?.parse().ok()?; + if seq > current || deleted_seqs.contains(&seq) { + return None; + } + Some(seq) + }) + .collect(); - if meta_files.is_empty() { - bail!("No .meta files found in {}", db_path.display()); + if meta_seqs.is_empty() { + bail!("No active .meta files found in {}", db_path.display()); } - let mut family_sst_info: BTreeMap> = BTreeMap::new(); - - for meta_path in &meta_files { - // Extract sequence number from filename - let filename = meta_path.file_stem().and_then(|s| s.to_str()).unwrap_or(""); - let seq_num: u32 = filename.parse().unwrap_or(0); + meta_seqs.sort_unstable(); - let meta_file = MetaFile::open(db_path, seq_num) - .with_context(|| format!("Failed to open {}", meta_path.display()))?; + let mut meta_files: Vec = meta_seqs + .iter() + .map(|&seq| { + MetaFile::open(db_path, seq).with_context(|| format!("Failed to open {seq:08}.meta")) + }) + .collect::>()?; - let family = meta_file.family(); + // Apply SstFilter (newest first) to drop entries superseded by a newer meta file. + let mut sst_filter = SstFilter::new(); + for meta in meta_files.iter_mut().rev() { + sst_filter.apply_filter(meta); + } - for entry in meta_file.entries() { + let mut family_sst_info: BTreeMap> = BTreeMap::new(); + for meta in &meta_files { + let family = meta.family(); + for entry in meta.entries() { family_sst_info.entry(family).or_default().push(SstInfo { sequence_number: entry.sequence_number(), block_count: entry.block_count(), @@ -317,7 +353,7 @@ fn read_block( /// /// Index block format: `[1B type][2B first_block][N * (8B hash + 2B block_index)]`. fn parse_key_block_indices(index_block: &[u8]) -> HashSet { - assert!(index_block.len() >= 4, "Index block too small"); + assert!(index_block.len() >= 3, "Index block too small"); let mut data = &index_block[1..]; // skip block type byte let first_block = data.read_u16::().unwrap(); let mut indices = HashSet::new(); diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 3ed8c8d9d6a0e..e029911ce91ca 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -106,6 +106,49 @@ struct TrackedStats { miss_global: std::sync::atomic::AtomicU64, } +/// A compact bitset for tracking which batch-lookup cells have been resolved. +/// +/// Uses `u64` words (1 bit per cell) rather than `Vec` (1 byte per cell) for an 8x +/// reduction in memory and better cache behaviour on large batches. +pub struct FoundBitset { + words: Box<[u64]>, + len: usize, +} + +impl FoundBitset { + pub(crate) fn new(len: usize) -> Self { + let words = vec![0u64; len.div_ceil(64)].into_boxed_slice(); + Self { words, len } + } + + #[inline] + pub(crate) fn get(&self, index: usize) -> bool { + debug_assert!(index < self.len); + (self.words[index / 64] >> (index % 64)) & 1 == 1 + } + + #[inline] + pub(crate) fn set(&mut self, index: usize) { + debug_assert!(index < self.len); + self.words[index / 64] |= 1u64 << (index % 64); + } + + /// Returns the number of bits that are NOT set (i.e. not yet found). + pub(crate) fn count_unset(&self) -> usize { + let set: usize = self.words.iter().map(|w| w.count_ones() as usize).sum(); + self.len - set + } + + pub(crate) fn len(&self) -> usize { + self.len + } + + /// Iterates over all indices, yielding `(index, is_set)`. + pub(crate) fn iter(&self) -> impl Iterator + '_ { + (0..self.len).map(|i| self.get(i)) + } +} + /// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time /// using a single write batch. It allows for concurrent reads. pub struct TurboPersistence { @@ -1561,41 +1604,81 @@ impl TurboPersistence Ok(output) } + /// Looks up multiple keys in batch and collects results into a `Vec`. + /// + /// For large batches where memory pressure matters, prefer + /// [`batch_get_with`][Self::batch_get_with] which calls a callback per entry without + /// accumulating all decoded bytes simultaneously. pub fn batch_get( &self, family: usize, keys: &[K], ) -> Result>> { + let mut results = vec![None; keys.len()]; + self.batch_get_with(family, keys, |index, opt_bytes| { + results[index] = opt_bytes.map(|b| ArcBytes::from(b.to_vec().into_boxed_slice())); + Ok(()) + })?; + Ok(results) + } + + /// Looks up multiple keys in batch, calling `callback(index, Option<&[u8]>)` for each entry + /// immediately after it is resolved rather than accumulating all results into a `Vec`. + /// + /// This keeps at most one decompressed value block live at a time, significantly reducing + /// peak memory when the batch is large. The callback receives the 0-based key index and the + /// value bytes (`None` for not-found or deleted). Callback errors propagate immediately. + pub fn batch_get_with( + &self, + family: usize, + keys: &[K], + mut callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<()> { debug_assert!(family < FAMILIES, "Family index out of bounds"); if self.config.family_configs[family].kind != FamilyKind::SingleValue { - // This is an error in our caller so just panic - panic!("only single valued tables can be queried with `batch_get'") + panic!("only single valued tables can be queried with `batch_get_with'") } let span = tracing::trace_span!( "database batch read", - name = family, + name = self.config.family_configs[family].name, keys = keys.len(), not_found = tracing::field::Empty, deleted = tracing::field::Empty, result_size = tracing::field::Empty ) .entered(); - let mut cells: Vec<(u64, usize, Option)> = Vec::with_capacity(keys.len()); - let mut empty_cells = keys.len(); + let mut cells: Vec<(u64, usize)> = Vec::with_capacity(keys.len()); for (index, key) in keys.iter().enumerate() { let hash = hash_key(key); - cells.push((hash, index, None)); + cells.push((hash, index)); } - cells.sort_by_key(|(hash, _, _)| *hash); + cells.sort_by_key(|(hash, _)| *hash); + let mut found = FoundBitset::new(cells.len()); let inner = self.inner.read(); + let mut not_found = 0; + let mut deleted = 0; + let mut result_size = 0; + let mut read_blob = |seq| self.read_blob(seq); + // Wrap the callback to track stats (deleted/result_size) for found keys. + // not_found is tracked separately in the post-loop over cells. + let mut stats_callback = |index: usize, opt_bytes: Option<&[u8]>| -> Result<()> { + if let Some(bytes) = opt_bytes { + result_size += bytes.len(); + } else { + deleted += 1; + } + callback(index, opt_bytes) + }; for meta in inner.meta_files.iter().rev() { - let _result = meta.batch_lookup( + let (all_found, _result) = meta.batch_lookup_with( family as u32, keys, - &mut cells, - &mut empty_cells, + &cells, + &mut found, &self.key_block_cache, &self.value_block_cache, + &mut read_blob, + &mut stats_callback, )?; #[cfg(feature = "stats")] @@ -1627,49 +1710,25 @@ impl TurboPersistence } } - if empty_cells == 0 { + if all_found { break; } } - let mut deleted = 0; - let mut not_found = 0; - let mut result_size = 0; - let mut results = vec![None; keys.len()]; - for (hash, index, result) in cells { - if let Some(result) = result { - inner.accessed_key_hashes[family].insert(hash); - let result = match result { - LookupValue::Deleted => { - #[cfg(feature = "stats")] - self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed); - deleted += 1; - None - } - LookupValue::Slice { value } => { - #[cfg(feature = "stats")] - self.stats.hits_small.fetch_add(1, Ordering::Relaxed); - result_size += value.len(); - Some(value) - } - LookupValue::Blob { sequence_number } => { - #[cfg(feature = "stats")] - self.stats.hits_blob.fetch_add(1, Ordering::Relaxed); - let blob = self.read_blob(sequence_number)?; - result_size += blob.len(); - Some(blob) - } - }; - results[index] = result; + // Record accessed hashes for found keys, and fire callback for not-found keys. + for ((hash, index), is_found) in cells.iter().zip(found.iter()) { + if is_found { + inner.accessed_key_hashes[family].insert(*hash); } else { #[cfg(feature = "stats")] self.stats.miss_global.fetch_add(1, Ordering::Relaxed); not_found += 1; + callback(*index, None)?; } } span.record("not_found", not_found); span.record("deleted", deleted); span.record("result_size", result_size); - Ok(results) + Ok(()) } /// Returns database statistics. diff --git a/turbopack/crates/turbo-persistence/src/lib.rs b/turbopack/crates/turbo-persistence/src/lib.rs index 3a877ae5ee7f9..9212c573bee65 100644 --- a/turbopack/crates/turbo-persistence/src/lib.rs +++ b/turbopack/crates/turbo-persistence/src/lib.rs @@ -18,7 +18,7 @@ pub mod mmap_helper; mod parallel_scheduler; mod rc_bytes; mod shared_bytes; -mod sst_filter; +pub mod sst_filter; pub mod static_sorted_file; mod static_sorted_file_builder; mod value_block_count_tracker; @@ -49,6 +49,7 @@ pub enum FamilyKind { /// Configuration for a single family to describe how the data is stored. #[derive(Clone, Copy, Debug)] pub struct FamilyConfig { + pub name: &'static str, pub kind: FamilyKind, } @@ -65,6 +66,7 @@ impl Default for DbConfig { fn default() -> Self { Self { family_configs: [FamilyConfig { + name: "unknown", kind: FamilyKind::SingleValue, }; FAMILIES], } diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs index 86ee8fdf5ee6d..8620e6a0a0b78 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file.rs @@ -14,10 +14,13 @@ use smallvec::SmallVec; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, big_endian as be}; use crate::{ - QueryKey, + ArcBytes, QueryKey, + db::FoundBitset, lookup_entry::LookupValue, mmap_helper::advise_mmap_for_persistence, - static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData}, + static_sorted_file::{ + BlockCache, LayeredBlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData, + }, }; bitfield! { @@ -459,7 +462,7 @@ impl MetaFile { continue; } - let result = entry.sst(self)?.lookup::( + let result = entry.sst(self)?.lookup::( key_hash, key, key_block_cache, @@ -498,33 +501,60 @@ impl MetaFile { Ok(miss_result) } - pub fn batch_lookup( + /// Looks up multiple keys in batch, calling `callback(key_index, Option<&[u8]>)` for each + /// found key immediately after resolution. This avoids keeping every decompressed value block + /// alive across all SST file iterations. + /// + /// `cells` is `(hash, key_index)` sorted by hash. `found` is a parallel bitset — bit `i` + /// is set once `cells[i]` has been resolved (either by this call or a prior one on the same + /// batch). On return, bits are set for every cell resolved by this call. + /// + /// Returns `true` if all cells are now resolved (caller can stop iterating meta files). + /// `read_blob` is called to obtain the bytes for large blob values. + pub fn batch_lookup_with( &self, key_family: u32, keys: &[K], - cells: &mut [(u64, usize, Option)], - empty_cells: &mut usize, + cells: &[(u64, usize)], + found: &mut FoundBitset, key_block_cache: &BlockCache, value_block_cache: &BlockCache, - ) -> Result { + read_blob: &mut impl FnMut(u32) -> Result, + callback: &mut impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<(bool, MetaBatchLookupResult)> { + debug_assert_eq!( + cells.len(), + found.len(), + "cells and found must have the same length" + ); if key_family != self.family { #[cfg(feature = "stats")] - return Ok(MetaBatchLookupResult { - family_miss: true, - ..Default::default() - }); + return Ok(( + false, + MetaBatchLookupResult { + family_miss: true, + ..Default::default() + }, + )); #[cfg(not(feature = "stats"))] - return Ok(MetaBatchLookupResult {}); + return Ok((false, MetaBatchLookupResult {})); } debug_assert!( - cells.is_sorted_by_key(|(hash, _, _)| *hash), + cells.is_sorted_by_key(|(hash, _)| *hash), "Cells must be sorted by key hash" ); #[allow(unused_mut, reason = "It's used when stats are enabled")] let mut lookup_result = MetaBatchLookupResult::default(); + let mut empty_cells = found.count_unset(); + // Local single-entry caches layered on top of the global block caches. + // Sequential keys in a batch often hit the same key or value block; keeping + // the most recently used block locally avoids re-reading it if the global + // cache evicts it under concurrent pressure. + let mut local_key_cache = LayeredBlockCache::new(key_block_cache); + let mut local_value_cache = LayeredBlockCache::new(value_block_cache); for entry in self.entries.iter().rev() { let start_index = cells - .binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater)) + .binary_search_by(|(hash, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater)) .err() .unwrap(); if start_index >= cells.len() { @@ -535,7 +565,7 @@ impl MetaFile { continue; } let end_index = cells - .binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less)) + .binary_search_by(|(hash, _)| hash.cmp(&entry.max_hash).then(Ordering::Less)) .err() .unwrap() .checked_sub(1); @@ -553,12 +583,13 @@ impl MetaFile { } continue; } - for (hash, index, result) in &mut cells[start_index..=end_index] { + for (cell_pos, (hash, index)) in cells[start_index..=end_index].iter().enumerate() { + let cell_pos = start_index + cell_pos; debug_assert!( *hash >= entry.min_hash && *hash <= entry.max_hash, "Key hash out of range" ); - if result.is_some() { + if found.get(cell_pos) { continue; } if !entry.amqf.contains_fingerprint(*hash) { @@ -568,11 +599,11 @@ impl MetaFile { } continue; } - let sst_result = entry.sst(self)?.lookup::<_, false>( + let sst_result = entry.sst(self)?.lookup::<_, _, _, false>( *hash, &keys[*index], - key_block_cache, - value_block_cache, + &mut local_key_cache, + &mut local_value_cache, )?; if let SstLookupResult::Found(mut values) = sst_result { // find_all=false guarantees exactly one result @@ -580,14 +611,39 @@ impl MetaFile { let Some(value) = values.pop() else { unreachable!() }; - *result = Some(value); - *empty_cells -= 1; + // Resolve and callback immediately — the ArcBytes is dropped before the + // next iteration so we never hold more than one decompressed block at a time. + let opt_bytes: Option<&[u8]> = match &value { + LookupValue::Deleted => None, + LookupValue::Slice { value } => Some(value), + LookupValue::Blob { sequence_number } => { + // Blob resolution: read_blob is called inline. The blob ArcBytes + // must outlive the callback call, so we bind it here. + let blob = read_blob(*sequence_number)?; + found.set(cell_pos); + empty_cells -= 1; + #[cfg(feature = "stats")] + { + lookup_result.hits += 1; + } + callback(*index, Some(&*blob))?; + // blob dropped here + if empty_cells == 0 { + return Ok((true, lookup_result)); + } + continue; + } + }; + found.set(cell_pos); + empty_cells -= 1; #[cfg(feature = "stats")] { lookup_result.hits += 1; } - if *empty_cells == 0 { - return Ok(lookup_result); + callback(*index, opt_bytes)?; + // value (ArcBytes) dropped here + if empty_cells == 0 { + return Ok((true, lookup_result)); } } else { #[cfg(feature = "stats")] @@ -597,6 +653,6 @@ impl MetaFile { } } } - Ok(lookup_result) + Ok((false, lookup_result)) } } diff --git a/turbopack/crates/turbo-persistence/src/sst_filter.rs b/turbopack/crates/turbo-persistence/src/sst_filter.rs index ca562880666d4..dbba126af7a59 100644 --- a/turbopack/crates/turbo-persistence/src/sst_filter.rs +++ b/turbopack/crates/turbo-persistence/src/sst_filter.rs @@ -67,3 +67,9 @@ impl SstFilter { !used && !meta.has_active_entries() } } + +impl Default for SstFilter { + fn default() -> Self { + Self::new() + } +} diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs index c14063deea35c..8593b2afd2e1b 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs @@ -96,19 +96,30 @@ pub type BlockCache = /// blocks). Generic over the byte type so it works for both the lookup path /// (`ArcBytes` with `BlockCache`) and the iteration path (`RcBytes` with a /// single-entry `Option` cache). -trait ValueBlockCache { +pub trait ValueBlockCache { fn get_or_read( - self, + &mut self, mmap: &B::MmapHandle, meta: &StaticSortedFileMetaData, block_index: u16, ) -> Result; } +impl> ValueBlockCache for &mut V { + fn get_or_read( + &mut self, + mmap: &B::MmapHandle, + meta: &StaticSortedFileMetaData, + block_index: u16, + ) -> Result { + (**self).get_or_read(mmap, meta, block_index) + } +} + /// Lookup-path: concurrent `BlockCache`. impl ValueBlockCache for &BlockCache { fn get_or_read( - self, + &mut self, mmap: &Arc, meta: &StaticSortedFileMetaData, block_index: u16, @@ -130,7 +141,7 @@ impl ValueBlockCache for &BlockCache { /// Iteration-path: lightweight single-entry cache for sequential reads. impl ValueBlockCache for &mut Option<(u16, RcBytes)> { fn get_or_read( - self, + &mut self, mmap: &Rc, meta: &StaticSortedFileMetaData, block_index: u16, @@ -141,7 +152,52 @@ impl ValueBlockCache for &mut Option<(u16, RcBytes)> { return Ok(block.clone()); } let block: RcBytes = read_block_generic(mmap, meta, block_index)?; - *self = Some((block_index, block.clone())); + **self = Some((block_index, block.clone())); + Ok(block) + } +} + +/// Batch-lookup-path: layered cache that checks a local single-entry cache first, then +/// falls back to the global `BlockCache`. Used in `batch_lookup_with` to prevent +/// concurrent eviction of just-read blocks between consecutive key lookups in a batch. +/// Sequential keys in the same batch often hit the same key or value block, so keeping +/// the most recently read block locally avoids redundant decompression. +pub struct LayeredBlockCache<'a> { + /// `(sequence_number, block_index)` — must include sequence number since the same block + /// index can refer to different data in different SST files. + local: Option<((u32, u16), ArcBytes)>, + global: &'a BlockCache, +} + +impl<'a> LayeredBlockCache<'a> { + pub fn new(global: &'a BlockCache) -> Self { + Self { + local: None, + global, + } + } +} + +impl ValueBlockCache for &mut LayeredBlockCache<'_> { + fn get_or_read( + &mut self, + mmap: &Arc, + meta: &StaticSortedFileMetaData, + block_index: u16, + ) -> Result { + let key = (meta.sequence_number, block_index); + if let Some((cached_key, block)) = self.local.as_ref() + && *cached_key == key + { + return Ok(block.clone()); + } + let block = <&BlockCache as ValueBlockCache>::get_or_read( + &mut self.global, + mmap, + meta, + block_index, + )?; + self.local = Some((key, block.clone())); Ok(block) } } @@ -204,41 +260,46 @@ impl StaticSortedFile { /// If `FIND_ALL` is false, returns after finding the first match. /// If `FIND_ALL` is true, returns all entries with the same key (useful for /// keyspaces where keys are hashes and collisions are possible). - pub fn lookup( + pub fn lookup< + K: QueryKey, + KC: ValueBlockCache, + VC: ValueBlockCache, + const FIND_ALL: bool, + >( &self, key_hash: u64, key: &K, - key_block_cache: &BlockCache, - value_block_cache: &BlockCache, + mut key_block_cache: KC, + mut value_block_cache: VC, ) -> Result { // There is exactly one index block per file (always the last block). // Read it first, then dispatch directly to the key block it points to. let index_block_index = self.meta.block_count - 1; - let index_block = self.get_key_block(index_block_index, key_block_cache)?; + let index_block = self.get_key_block(index_block_index, &mut key_block_cache)?; let key_block_index = self.lookup_index_block(&index_block, key_hash)?; - let key_block_arc = self.get_key_block(key_block_index, key_block_cache)?; + let key_block_arc = self.get_key_block(key_block_index, &mut key_block_cache)?; let block_type = be::read_u8(&key_block_arc); match block_type { BLOCK_TYPE_KEY_WITH_HASH | BLOCK_TYPE_KEY_NO_HASH => { let has_hash = block_type == BLOCK_TYPE_KEY_WITH_HASH; - self.lookup_key_block::( + self.lookup_key_block::( key_block_arc, key_hash, key, has_hash, - value_block_cache, + &mut value_block_cache, ) } BLOCK_TYPE_FIXED_KEY_WITH_HASH | BLOCK_TYPE_FIXED_KEY_NO_HASH => { let has_hash = block_type == BLOCK_TYPE_FIXED_KEY_WITH_HASH; - self.lookup_fixed_key_block::( + self.lookup_fixed_key_block::( key_block_arc, key_hash, key, has_hash, - value_block_cache, + &mut value_block_cache, ) } _ => { @@ -273,13 +334,13 @@ impl StaticSortedFile { /// /// If `FIND_ALL` is false, returns after finding the first match. /// If `FIND_ALL` is true, collects all entries with the same key. - fn lookup_key_block( + fn lookup_key_block, const FIND_ALL: bool>( &self, block: ArcBytes, key_hash: u64, key: &K, has_hash: bool, - value_block_cache: &BlockCache, + value_block_cache: &mut VC, ) -> Result { let hash_len: u8 = if has_hash { 8 } else { 0 }; ensure!(block.len() >= 4, "key block too short"); @@ -292,7 +353,7 @@ impl StaticSortedFile { let offsets = &data[..entry_count * 4]; let entries = &data[entry_count * 4..]; - self.lookup_block_inner::( + self.lookup_block_inner::( &block, entry_count, key_hash, @@ -306,13 +367,13 @@ impl StaticSortedFile { /// /// Fixed-size key blocks store entries at predictable offsets (no offset table), /// enabling direct indexing during binary search. - fn lookup_fixed_key_block( + fn lookup_fixed_key_block, const FIND_ALL: bool>( &self, block: ArcBytes, key_hash: u64, key: &K, has_hash: bool, - value_block_cache: &BlockCache, + value_block_cache: &mut VC, ) -> Result { let hash_len: u8 = if has_hash { 8 } else { 0 }; ensure!(block.len() >= 6, "fixed key block too short"); @@ -327,7 +388,7 @@ impl StaticSortedFile { "fixed key block for {entry_count} entries must is the wrong size" ); - self.lookup_block_inner::( + self.lookup_block_inner::( &block, entry_count, key_hash, @@ -345,13 +406,13 @@ impl StaticSortedFile { /// /// The `get_entry` closure abstracts over the difference between variable-size /// key blocks (offset table lookup) and fixed-size key blocks (stride-based indexing). - fn lookup_block_inner<'a, K: QueryKey, const FIND_ALL: bool>( + fn lookup_block_inner<'a, K: QueryKey, VC: ValueBlockCache, const FIND_ALL: bool>( &self, block: &ArcBytes, entry_count: usize, key_hash: u64, key: &K, - value_block_cache: &BlockCache, + value_block_cache: &mut VC, get_entry: impl Fn(usize) -> Result>, ) -> Result { let mut l = 0; @@ -426,12 +487,12 @@ impl StaticSortedFile { } /// Handles a key match by looking up the value. - fn handle_key_match( + fn handle_key_match>( &self, ty: u8, val: &[u8], key_block_arc: &ArcBytes, - value_block_cache: &BlockCache, + value_block_cache: &mut VC, ) -> Result { handle_key_match_generic( &self.mmap, @@ -444,30 +505,12 @@ impl StaticSortedFile { } /// Gets a key block from the cache or reads it from the file. - fn get_key_block( + fn get_key_block>( &self, block: u16, - key_block_cache: &BlockCache, + key_block_cache: &mut KC, ) -> Result { - Ok( - match key_block_cache.get_value_or_guard(&(self.meta.sequence_number, block), None) { - GuardResult::Value(block) => block, - GuardResult::Guard(guard) => { - let block = self.read_block(block)?; - let _ = guard.insert(block.clone()); - block - } - GuardResult::Timeout => unreachable!(), - }, - ) - } - - /// Reads a block from the file, decompressing if needed, and verifies its checksum. - /// - /// The checksum is verified on the raw on-disk data **before** decompression, so - /// corruption is caught before passing data to LZ4. - fn read_block(&self, block_index: u16) -> Result { - read_block_generic(&self.mmap, &self.meta, block_index) + key_block_cache.get_or_read(&self.mmap, &self.meta, block) } } @@ -605,7 +648,7 @@ fn handle_key_match_generic( ty: u8, val: &[u8], key_block: &B, - reader: impl ValueBlockCache, + mut reader: impl ValueBlockCache, ) -> Result> { Ok(match ty { KEY_BLOCK_ENTRY_TYPE_SMALL => { diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs index b0e8b986303f4..9a743964c3b04 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs @@ -1381,7 +1381,7 @@ mod tests { kc: &TestBlockCache, vc: &TestBlockCache, ) -> Result<()> { - let result = sst.lookup::<_, false>(entry.hash, &entry.key, kc, vc)?; + let result = sst.lookup::<_, _, _, false>(entry.hash, &entry.key, kc, vc)?; match (&entry.value_kind, result) { (_, SstLookupResult::Found(values)) if values.len() == 1 && matches!(values[0], LookupValue::Slice { .. }) => @@ -1690,8 +1690,8 @@ mod tests { let vc = make_cache(); for entry in &entries { - let r1 = sst1.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?; - let r2 = sst2.lookup::<_, false>(entry.hash, &entry.key, &kc, &vc)?; + let r1 = sst1.lookup::<_, _, _, false>(entry.hash, &entry.key, &kc, &vc)?; + let r2 = sst2.lookup::<_, _, _, false>(entry.hash, &entry.key, &kc, &vc)?; match (&r1, &r2) { (SstLookupResult::Found(v1), SstLookupResult::Found(v2)) if v1.len() == 1 && v2.len() == 1 => @@ -1816,7 +1816,7 @@ mod tests { let sst = open_sst(dir, seq, meta).unwrap(); let kc = make_cache(); let vc = make_cache(); - match sst.lookup::<_, false>(entries[0].hash, &entries[0].key, &kc, &vc) { + match sst.lookup::<_, _, _, false>(entries[0].hash, &entries[0].key, &kc, &vc) { Err(err) => { let msg = format!("{err}"); assert!( diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index 3ed65ec995204..a1f63cafcc3c0 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -1521,6 +1521,7 @@ fn compaction_multi_value_preserves_different_values() -> Result<()> { fn multi_value_config() -> DbConfig<1> { let mut config = DbConfig::<1>::default(); config.family_configs[0] = FamilyConfig { + name: "test", kind: FamilyKind::MultiValue, }; config @@ -2097,6 +2098,7 @@ fn compaction_deletes_blob_multi_value_tombstone() -> Result<()> { let config = DbConfig { family_configs: [FamilyConfig { + name: "test", kind: FamilyKind::MultiValue, }], }; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 34a8024821f4c..37d2df24a0ead 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -1912,7 +1912,7 @@ impl TurboTasksBackendInner { let once_task = matches!(task_type, TaskType::Transient(ref tt) if matches!(&**tt, TransientTask::Once(_))); if let Some(tasks) = task.prefetch() { drop(task); - ctx.prepare_tasks(tasks); + ctx.prepare_tasks(tasks, "prefetch"); task = ctx.task(task_id, TaskDataCategory::All); } let in_progress = task.take_in_progress()?; @@ -2399,6 +2399,7 @@ impl TurboTasksBackendInner { output_dependent_tasks .iter() .map(|&id| (id, TaskDataCategory::All)), + "invalidate output dependents", ); } @@ -2491,20 +2492,24 @@ impl TurboTasksBackendInner { debug_assert!(!new_children.is_empty()); let mut queue = AggregationUpdateQueue::new(); - ctx.for_each_task_all(new_children.iter().copied(), |child_task, ctx| { - if !child_task.has_output() { - let child_id = child_task.id(); - make_task_dirty_internal( - child_task, - child_id, - false, - #[cfg(feature = "trace_task_dirty")] - TaskDirtyCause::InitialDirty, - &mut queue, - ctx, - ); - } - }); + ctx.for_each_task_all( + new_children.iter().copied(), + "unfinished children dirty", + |child_task, ctx| { + if !child_task.has_output() { + let child_id = child_task.id(); + make_task_dirty_internal( + child_task, + child_id, + false, + #[cfg(feature = "trace_task_dirty")] + TaskDirtyCause::InitialDirty, + &mut queue, + ctx, + ); + } + }, + ); queue.execute(ctx); } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index a545a68ac794e..97ecd1c2d38c6 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -1256,10 +1256,14 @@ impl AggregationUpdateQueue { self.find_and_schedule_dirty(jobs, ctx); false } else if !self.scheduled_tasks.is_empty() { - ctx.for_each_task_all(self.scheduled_tasks.keys().copied(), |task, ctx| { - let parent_priority = self.scheduled_tasks[&task.id()]; - ctx.schedule_task(task, parent_priority); - }); + ctx.for_each_task_all( + self.scheduled_tasks.keys().copied(), + "schedule tasks", + |task, ctx| { + let parent_priority = self.scheduled_tasks[&task.id()]; + ctx.schedule_task(task, parent_priority); + }, + ); self.scheduled_tasks.clear(); false } else { @@ -1444,18 +1448,22 @@ impl AggregationUpdateQueue { .map(|job| (job.task_id, job.span.clone())) .collect(); // For performance reasons this should stay `Meta` and not `All` - ctx.for_each_task_meta(jobs.into_iter().map(|job| job.task_id), |task, ctx| { - let task_id = task.id(); - // Enter the enqueue-time span and create a per-task child span with the - // task description. Both guards must live until the end of the closure. - #[cfg(feature = "trace_find_and_schedule")] - let _trace = ( - spans.remove(&task_id).flatten().map(|s| s.entered()), - trace_span!("find and schedule", %task_id, name = task.get_task_description()) - .entered(), - ); - self.find_and_schedule_dirty_internal(task_id, task, ctx); - }); + ctx.for_each_task_meta( + jobs.into_iter().map(|job| job.task_id), + "find and schedule dirty", + |task, ctx| { + let task_id = task.id(); + // Enter the enqueue-time span and create a per-task child span with the + // task description. Both guards must live until the end of the closure. + #[cfg(feature = "trace_find_and_schedule")] + let _trace = ( + spans.remove(&task_id).flatten().map(|s| s.entered()), + trace_span!("find and schedule", %task_id, name = task.get_task_description()) + .entered(), + ); + self.find_and_schedule_dirty_internal(task_id, task, ctx); + }, + ); } fn find_and_schedule_dirty_internal( @@ -1507,21 +1515,25 @@ impl AggregationUpdateQueue { update: AggregatedDataUpdate, ) { // For performance reasons this should stay `Meta` and not `All` - ctx.for_each_task_meta(upper_ids.iter().copied(), |mut upper, ctx| { - let diff = update.apply(&mut upper, ctx.should_track_activeness(), self); - if !diff.is_empty() { - let upper_ids = get_uppers(&upper); - if !upper_ids.is_empty() { - self.push( - AggregatedDataUpdateJob { - upper_ids, - update: diff, - } - .into(), - ); + ctx.for_each_task_meta( + upper_ids.iter().copied(), + "aggregated data update", + |mut upper, ctx| { + let diff = update.apply(&mut upper, ctx.should_track_activeness(), self); + if !diff.is_empty() { + let upper_ids = get_uppers(&upper); + if !upper_ids.is_empty() { + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ); + } } - } - }); + }, + ); } fn inner_of_uppers_lost_follower( @@ -1574,20 +1586,24 @@ impl AggregationUpdateQueue { if !data.is_empty() { // remove data from upper // For performance reasons this should stay `Meta` and not `All` - ctx.for_each_task_meta(removed_uppers.iter().copied(), |mut upper, ctx| { - // STEP 6 - let diff = data.apply(&mut upper, ctx.should_track_activeness(), self); - if !diff.is_empty() { - let upper_ids = get_uppers(&upper); - self.push( - AggregatedDataUpdateJob { - upper_ids, - update: diff, - } - .into(), - ) - } - }); + ctx.for_each_task_meta( + removed_uppers.iter().copied(), + "remove data from uppers", + |mut upper, ctx| { + // STEP 6 + let diff = data.apply(&mut upper, ctx.should_track_activeness(), self); + if !diff.is_empty() { + let upper_ids = get_uppers(&upper); + self.push( + AggregatedDataUpdateJob { + upper_ids, + update: diff, + } + .into(), + ) + } + }, + ); } // STEP 7 if !followers.is_empty() { @@ -2061,6 +2077,7 @@ impl AggregationUpdateQueue { upper_ids_with_min_aggregation_number .iter() .map(|(entry, _)| entry.task_id()), + "add data to uppers", |mut upper, ctx| { // STEP 6d if has_data { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index e06baba14cb68..331bf773957e2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -13,6 +13,8 @@ use std::{ }; use bincode::{Decode, Encode}; +use tracing::trace_span; +use turbo_bincode::new_turbo_bincode_decoder; use turbo_tasks::{ CellId, FxIndexMap, TaskExecutionReason, TaskId, TaskPriority, TurboTasksBackendApi, TurboTasksCallApi, TypedSharedReference, backend::CachedTaskType, @@ -30,6 +32,11 @@ use crate::{ data::{ActivenessState, CollectibleRef, Dirtyness, InProgressState, TransientTask}, }; +struct TaskRestoreEntry { + task_id: TaskId, + category: TaskDataCategory, +} + pub trait Operation: Encode + Decode<()> + Default + TryFrom { fn execute(self, ctx: &mut impl ExecuteContext<'_>); } @@ -45,29 +52,35 @@ pub trait ExecuteContext<'e>: Sized { fn prepare_tasks( &mut self, task_ids: impl IntoIterator + Clone, + reason: &'static str, ); fn for_each_task( &mut self, task_ids: impl IntoIterator, + reason: &'static str, func: impl FnMut(Self::TaskGuardImpl, &mut Self), ); fn for_each_task_meta( &mut self, task_ids: impl IntoIterator, + reason: &'static str, func: impl FnMut(Self::TaskGuardImpl, &mut Self), ) { self.for_each_task( task_ids.into_iter().map(|id| (id, TaskDataCategory::Meta)), + reason, func, ) } fn for_each_task_all( &mut self, task_ids: impl IntoIterator, + reason: &'static str, func: impl FnMut(Self::TaskGuardImpl, &mut Self), ) { self.for_each_task( task_ids.into_iter().map(|id| (id, TaskDataCategory::All)), + reason, func, ) } @@ -169,58 +182,20 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { self.backend.should_restore() && self.backend.local_is_partial } - fn restore_task_data( + fn lookup_task_data( &self, task_id: TaskId, category: SpecificTaskDataCategory, - ) -> TaskStorage { + ) -> Option { if !self.should_check_backing_storage() { - // If we don't need to restore, we can just return an empty storage - return TaskStorage::default(); - } - let mut storage = TaskStorage::default(); - let result = self - .backend - .backing_storage - .lookup_data(task_id, category, &mut storage); - - match result { - Ok(()) => storage, - Err(e) => { - panic!( - "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!("{category:?} for {task_id})")) - ) - } - } - } - - fn restore_task_data_batch( - &self, - task_ids: &[TaskId], - category: SpecificTaskDataCategory, - ) -> Option> { - debug_assert!( - task_ids.len() > 1, - "Use restore_task_data_typed for single task" - ); - if !self.should_check_backing_storage() { - // If we don't need to restore, we return None return None; } - let result = self - .backend - .backing_storage - .batch_lookup_data(task_ids, category); - match result { - Ok(result) => Some(result), + match self.backend.backing_storage.lookup_data(task_id, category) { + Ok(bytes) => bytes, Err(e) => { panic!( "Failed to restore task data (corrupted database or bug): {:?}", - e.context(format!( - "{category:?} for batch of {} tasks", - task_ids.len() - )) + e.context(format!("{category:?} for {task_id})")) ) } } @@ -230,6 +205,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { &mut self, task_ids: impl IntoIterator, call_prepared_task_callback_for_transient_tasks: bool, + reason: &'static str, mut prepared_task_callback: impl FnMut( &mut Self, TaskId, @@ -237,10 +213,19 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { StorageWriteGuard<'e>, ), ) { + let span = trace_span!( + "prepare_tasks_with_callback", + reason, + requested_data = tracing::field::Empty, + requested_meta = tracing::field::Empty, + races_data = tracing::field::Empty, + races_meta = tracing::field::Empty, + ); + let _guard = span.enter(); let mut data_count = 0; let mut meta_count = 0; let mut all_count = 0; - let mut tasks = task_ids + let tasks = task_ids .into_iter() .filter(|&(id, category)| { if id.is_transient() { @@ -263,16 +248,22 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { TaskDataCategory::Meta => meta_count += 1, TaskDataCategory::All => all_count += 1, }) - .map(|(id, category)| (id, category, None, None)) + .map(|(id, category)| TaskRestoreEntry { + task_id: id, + category, + }) .collect::>(); data_count += all_count; meta_count += all_count; - - let mut tasks_to_restore_for_data = Vec::with_capacity(data_count); - let mut tasks_to_restore_for_data_indicies = Vec::with_capacity(data_count); - let mut tasks_to_restore_for_meta = Vec::with_capacity(meta_count); - let mut tasks_to_restore_for_meta_indicies = Vec::with_capacity(meta_count); - for (i, &(task_id, category, _, _)) in tasks.iter().enumerate() { + span.record("requested_data", data_count); + span.record("requested_meta", meta_count); + + let mut tasks_to_restore_for_data: Vec = Vec::with_capacity(data_count); + let mut tasks_to_restore_for_data_indices: Vec = Vec::with_capacity(data_count); + let mut tasks_to_restore_for_meta: Vec = Vec::with_capacity(meta_count); + let mut tasks_to_restore_for_meta_indices: Vec = Vec::with_capacity(meta_count); + for (i, entry) in tasks.iter().enumerate() { + let (task_id, category) = (entry.task_id, entry.category); self.task_lock_counter.acquire(); let task = self.backend.storage.access_mut(task_id); @@ -281,14 +272,14 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { && !task.flags.is_restored(TaskDataCategory::Data) { tasks_to_restore_for_data.push(task_id); - tasks_to_restore_for_data_indicies.push(i); + tasks_to_restore_for_data_indices.push(i); ready = false; } if matches!(category, TaskDataCategory::Meta | TaskDataCategory::All) && !task.flags.is_restored(TaskDataCategory::Meta) { tasks_to_restore_for_meta.push(task_id); - tasks_to_restore_for_meta_indicies.push(i); + tasks_to_restore_for_meta_indices.push(i); ready = false; } self.task_lock_counter.release(); @@ -300,85 +291,208 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { return; } - match tasks_to_restore_for_data.len() { - 0 => {} - 1 => { - let task_id = tasks_to_restore_for_data[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Data); - let idx = tasks_to_restore_for_data_indicies[0]; - tasks[idx].2 = Some(data); - } - _ => { - if let Some(data) = self.restore_task_data_batch( - &tasks_to_restore_for_data, - SpecificTaskDataCategory::Data, - ) { - data.into_iter() - .zip(tasks_to_restore_for_data_indicies) - .for_each(|(item, idx)| { - tasks[idx].2 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_data_indicies { - tasks[idx].2 = Some(TaskStorage::default()); - } - } - } + // Restore data and meta batches. + // Each decoded TaskStorage is written into backend.storage immediately inside the + // batch_lookup_data callback, and prepared_task_callback is called as soon as + // task.flags.is_restored(entry.category) becomes true (i.e. all required categories + // for that entry are satisfied). The guard is passed directly to the callback while + // still held — no second pass needed. + let mut races_data = 0u32; + let mut races_meta = 0u32; + self.restore_batch( + &tasks, + &tasks_to_restore_for_data, + &tasks_to_restore_for_data_indices, + SpecificTaskDataCategory::Data, + reason, + &mut races_data, + &mut races_meta, + &mut prepared_task_callback, + ); + self.restore_batch( + &tasks, + &tasks_to_restore_for_meta, + &tasks_to_restore_for_meta_indices, + SpecificTaskDataCategory::Meta, + reason, + &mut races_data, + &mut races_meta, + &mut prepared_task_callback, + ); + span.record("races_data", races_data); + span.record("races_meta", races_meta); + } + + /// Decodes a batch of task data from backing storage, writing each `TaskStorage` immediately + /// into `backend.storage` as it arrives so it can be freed before the next one is decoded. + /// + /// Calls `prepared_task_callback` immediately (while the task write guard is still held) + /// as soon as `task.flags.is_restored(entry.category)` becomes true — i.e. all categories + /// required by the entry are satisfied. For `All`-category entries this means the callback + /// fires in the meta batch (after both data and meta have been set). + fn restore_batch( + &mut self, + tasks: &[TaskRestoreEntry], + task_ids: &[TaskId], + task_indices: &[usize], + category: SpecificTaskDataCategory, + reason: &'static str, + races_data: &mut u32, + races_meta: &mut u32, + prepared_task_callback: &mut impl FnMut( + &mut Self, + TaskId, + TaskDataCategory, + StorageWriteGuard<'e>, + ), + ) { + if task_ids.is_empty() { + return; } - match tasks_to_restore_for_meta.len() { - 0 => {} - 1 => { - let task_id = tasks_to_restore_for_meta[0]; - let data = self.restore_task_data(task_id, SpecificTaskDataCategory::Meta); - let idx = tasks_to_restore_for_meta_indicies[0]; - tasks[idx].3 = Some(data); + let _span = trace_span!( + "restore_task_data_batch", + reason, + category = ?category, + keys = task_ids.len(), + ) + .entered(); + + if self.backend.should_restore() && self.backend.local_is_partial { + let result = self.backend.backing_storage.batch_lookup_data( + task_ids, + category, + &mut |index, opt_bytes| { + // opt_bytes is Some(&[u8]) when found, None when not in DB. + // The byte slice (and any underlying decompressed block) is dropped at the + // end of this closure, before the next key is fetched. + let task_index = task_indices[index]; + let entry = &tasks[task_index]; + self.task_lock_counter.acquire(); + let mut task = self.backend.storage.access_mut(entry.task_id); + match category { + SpecificTaskDataCategory::Data => { + if !task.flags.is_restored(TaskDataCategory::Data) { + if let Some(bytes) = opt_bytes { + let mut decoder = new_turbo_bincode_decoder(bytes); + task.decode(SpecificTaskDataCategory::Data, &mut decoder) + .unwrap_or_else(|e| { + panic!( + "Failed to decode Data for {:?}: {e:?}", + entry.task_id + ) + }); + } + task.flags.set_restored(TaskDataCategory::Data); + let task_type = task.get_persistent_task_type().cloned(); + if task.flags.is_restored(entry.category) { + self.task_lock_counter.release(); + prepared_task_callback( + self, + entry.task_id, + entry.category, + task, + ); + } else { + drop(task); + self.task_lock_counter.release(); + } + if let Some(task_type) = task_type { + self.backend + .task_cache + .entry(task_type) + .or_insert(entry.task_id); + } + } else { + *races_data += 1; + drop(task); + self.task_lock_counter.release(); + } + } + SpecificTaskDataCategory::Meta => { + if !task.flags.is_restored(TaskDataCategory::Meta) { + if let Some(bytes) = opt_bytes { + let mut decoder = new_turbo_bincode_decoder(bytes); + task.decode(SpecificTaskDataCategory::Meta, &mut decoder) + .unwrap_or_else(|e| { + panic!( + "Failed to decode Meta for {:?}: {e:?}", + entry.task_id + ) + }); + } + task.flags.set_restored(TaskDataCategory::Meta); + if task.flags.is_restored(entry.category) { + self.task_lock_counter.release(); + prepared_task_callback( + self, + entry.task_id, + entry.category, + task, + ); + } else { + drop(task); + self.task_lock_counter.release(); + } + } else { + *races_meta += 1; + drop(task); + self.task_lock_counter.release(); + } + } + } + }, + ); + if let Err(e) = result { + panic!( + "Failed to restore task data (corrupted database or bug): {:?}", + e.context(format!( + "{category:?} for batch of {} tasks", + task_ids.len() + )) + ) } - _ => { - if let Some(data) = self.restore_task_data_batch( - &tasks_to_restore_for_meta, - SpecificTaskDataCategory::Meta, - ) { - data.into_iter() - .zip(tasks_to_restore_for_meta_indicies) - .for_each(|(item, idx)| { - tasks[idx].3 = Some(item); - }); - } else { - for idx in tasks_to_restore_for_meta_indicies { - tasks[idx].3 = Some(TaskStorage::new()); + } else { + // No backing storage — mark all tasks as restored with empty data. + for &task_index in task_indices { + let entry = &tasks[task_index]; + self.task_lock_counter.acquire(); + let mut task = self.backend.storage.access_mut(entry.task_id); + match category { + SpecificTaskDataCategory::Data => { + if !task.flags.is_restored(TaskDataCategory::Data) { + task.flags.set_restored(TaskDataCategory::Data); + if task.flags.is_restored(entry.category) { + self.task_lock_counter.release(); + prepared_task_callback(self, entry.task_id, entry.category, task); + } else { + drop(task); + self.task_lock_counter.release(); + } + } else { + *races_data += 1; + drop(task); + self.task_lock_counter.release(); + } + } + SpecificTaskDataCategory::Meta => { + if !task.flags.is_restored(TaskDataCategory::Meta) { + task.flags.set_restored(TaskDataCategory::Meta); + if task.flags.is_restored(entry.category) { + self.task_lock_counter.release(); + prepared_task_callback(self, entry.task_id, entry.category, task); + } else { + drop(task); + self.task_lock_counter.release(); + } + } else { + *races_meta += 1; + drop(task); + self.task_lock_counter.release(); + } } } } } - - for (task_id, category, storage_for_data, storage_for_meta) in tasks { - if storage_for_data.is_none() && storage_for_meta.is_none() { - continue; - } - self.task_lock_counter.acquire(); - - let mut task_type = None; - let mut task = self.backend.storage.access_mut(task_id); - if let Some(storage) = storage_for_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); - task.flags.set_restored(TaskDataCategory::Data); - task_type = task.get_persistent_task_type().cloned() - } - if let Some(storage) = storage_for_meta - && !task.flags.is_restored(TaskDataCategory::Meta) - { - task.restore_from(storage, TaskDataCategory::Meta); - task.flags.set_restored(TaskDataCategory::Meta); - } - self.task_lock_counter.release(); - prepared_task_callback(self, task_id, category, task); - if let Some(task_type) = task_type { - // Insert into the task cache to avoid future lookups - self.backend.task_cache.entry(task_type).or_insert(task_id); - } - } } } @@ -410,28 +524,39 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { category.includes_meta() && !task.flags.is_restored(TaskDataCategory::Meta); if needs_data || needs_meta { - // Avoid holding the lock too long since this can also affect other tasks - // Drop lock once, do all I/O, then re-acquire once + // Avoid holding the lock too long since this can also affect other tasks. + // Drop lock once, do all I/O (returning raw bytes), then re-acquire once + // and decode directly into the live task — no scratch TaskStorage needed. drop(task); - let storage_data = needs_data - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Data)); - let storage_meta = needs_meta - .then(|| self.restore_task_data(task_id, SpecificTaskDataCategory::Meta)); + let bytes_data = needs_data + .then(|| self.lookup_task_data(task_id, SpecificTaskDataCategory::Data)); + let bytes_meta = needs_meta + .then(|| self.lookup_task_data(task_id, SpecificTaskDataCategory::Meta)); task = self.backend.storage.access_mut(task_id); - // Handle race conditions and merge - if let Some(storage) = storage_data - && !task.flags.is_restored(TaskDataCategory::Data) - { - task.restore_from(storage, TaskDataCategory::Data); + // Handle race conditions and decode directly into live storage. + // set_restored is called even when no bytes found (task simply has no + // persisted data for this category). + if needs_data && !task.flags.is_restored(TaskDataCategory::Data) { + if let Some(bytes) = bytes_data.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task.decode(SpecificTaskDataCategory::Data, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Data for {task_id:?}: {e:?}") + }); + } task.flags.set_restored(TaskDataCategory::Data); } - if let Some(storage) = storage_meta - && !task.flags.is_restored(TaskDataCategory::Meta) - { - task.restore_from(storage, TaskDataCategory::Meta); + if needs_meta && !task.flags.is_restored(TaskDataCategory::Meta) { + if let Some(bytes) = bytes_meta.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task.decode(SpecificTaskDataCategory::Meta, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Meta for {task_id:?}: {e:?}") + }); + } task.flags.set_restored(TaskDataCategory::Meta); } } @@ -446,30 +571,41 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { } } - fn prepare_tasks(&mut self, task_ids: impl IntoIterator) { - self.prepare_tasks_with_callback(task_ids, false, |_, _, _, _| {}); + fn prepare_tasks( + &mut self, + task_ids: impl IntoIterator + Clone, + reason: &'static str, + ) { + self.prepare_tasks_with_callback(task_ids, false, reason, |_, _, _, _| {}); } fn for_each_task( &mut self, task_ids: impl IntoIterator, + reason: &'static str, mut func: impl FnMut(Self::TaskGuardImpl, &mut Self), ) { let task_lock_counter = self.task_lock_counter.clone(); - self.prepare_tasks_with_callback(task_ids, true, |this, task_id, _category, task| { - // prepare_tasks_with_callback releases the counter before calling this callback, - // so the counter is 0 here. Acquire for the TaskGuardImpl that will release on Drop. - task_lock_counter.acquire(); - - let guard = TaskGuardImpl { - task, - task_id, - #[cfg(debug_assertions)] - category: _category, - task_lock_counter: task_lock_counter.clone(), - }; - func(guard, this); - }); + self.prepare_tasks_with_callback( + task_ids, + true, + reason, + |this, task_id, _category, task| { + // prepare_tasks_with_callback releases the counter before calling this callback, + // so the counter is 0 here. Acquire for the TaskGuardImpl that will release on + // Drop. + task_lock_counter.acquire(); + + let guard = TaskGuardImpl { + task, + task_id, + #[cfg(debug_assertions)] + category: _category, + task_lock_counter: task_lock_counter.clone(), + }; + func(guard, this); + }, + ); } fn task_pair( @@ -493,47 +629,69 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> { category.includes_meta() && !task2.flags.is_restored(TaskDataCategory::Meta); if needs_data1 || needs_meta1 || needs_data2 || needs_meta2 { - // Avoid holding the lock too long since this can also affect other tasks - // Drop locks once, do all I/O, then re-acquire once + // Avoid holding the lock too long since this can also affect other tasks. + // Drop locks once, do all I/O (returning raw bytes), then re-acquire once + // and decode directly into the live tasks — no scratch TaskStorage needed. drop(task1); drop(task2); - let storage_data1 = needs_data1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Data)); - let storage_meta1 = needs_meta1 - .then(|| self.restore_task_data(task_id1, SpecificTaskDataCategory::Meta)); - let storage_data2 = needs_data2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Data)); - let storage_meta2 = needs_meta2 - .then(|| self.restore_task_data(task_id2, SpecificTaskDataCategory::Meta)); + let bytes_data1 = needs_data1 + .then(|| self.lookup_task_data(task_id1, SpecificTaskDataCategory::Data)); + let bytes_meta1 = needs_meta1 + .then(|| self.lookup_task_data(task_id1, SpecificTaskDataCategory::Meta)); + let bytes_data2 = needs_data2 + .then(|| self.lookup_task_data(task_id2, SpecificTaskDataCategory::Data)); + let bytes_meta2 = needs_meta2 + .then(|| self.lookup_task_data(task_id2, SpecificTaskDataCategory::Meta)); let (t1, t2) = self.backend.storage.access_pair_mut(task_id1, task_id2); task1 = t1; task2 = t2; - // Merge results, handling race conditions - if let Some(storage) = storage_data1 - && !task1.flags.is_restored(TaskDataCategory::Data) - { - task1.restore_from(storage, TaskDataCategory::Data); + // Decode directly into live storage, handling race conditions. + // set_restored is called even when no bytes found. + if needs_data1 && !task1.flags.is_restored(TaskDataCategory::Data) { + if let Some(bytes) = bytes_data1.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task1 + .decode(SpecificTaskDataCategory::Data, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Data for {task_id1:?}: {e:?}") + }); + } task1.flags.set_restored(TaskDataCategory::Data); } - if let Some(storage) = storage_meta1 - && !task1.flags.is_restored(TaskDataCategory::Meta) - { - task1.restore_from(storage, TaskDataCategory::Meta); + if needs_meta1 && !task1.flags.is_restored(TaskDataCategory::Meta) { + if let Some(bytes) = bytes_meta1.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task1 + .decode(SpecificTaskDataCategory::Meta, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Meta for {task_id1:?}: {e:?}") + }); + } task1.flags.set_restored(TaskDataCategory::Meta); } - if let Some(storage) = storage_data2 - && !task2.flags.is_restored(TaskDataCategory::Data) - { - task2.restore_from(storage, TaskDataCategory::Data); + if needs_data2 && !task2.flags.is_restored(TaskDataCategory::Data) { + if let Some(bytes) = bytes_data2.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task2 + .decode(SpecificTaskDataCategory::Data, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Data for {task_id2:?}: {e:?}") + }); + } task2.flags.set_restored(TaskDataCategory::Data); } - if let Some(storage) = storage_meta2 - && !task2.flags.is_restored(TaskDataCategory::Meta) - { - task2.restore_from(storage, TaskDataCategory::Meta); + if needs_meta2 && !task2.flags.is_restored(TaskDataCategory::Meta) { + if let Some(bytes) = bytes_meta2.flatten() { + let mut decoder = new_turbo_bincode_decoder(&bytes); + task2 + .decode(SpecificTaskDataCategory::Meta, &mut decoder) + .unwrap_or_else(|e| { + panic!("Failed to decode Meta for {task_id2:?}: {e:?}") + }); + } task2.flags.set_restored(TaskDataCategory::Meta); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs index 427820b6e0c7a..0072f843d5033 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs @@ -153,6 +153,7 @@ impl UpdateCellOperation { dependent_tasks .keys() .map(|&id| (id, TaskDataCategory::All)), + "invalidate cell dependents", ); UpdateCellOperation::InvalidateWhenCellDependency { diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index 8fd957c354f40..9e3a3179cc80b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -4,10 +4,11 @@ use anyhow::Result; use either::Either; use smallvec::SmallVec; use turbo_bincode::TurboBincodeBuffer; +use turbo_persistence::ArcBytes; use turbo_tasks::{TaskId, backend::CachedTaskType}; use crate::{ - backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage}, + backend::{AnyOperation, SpecificTaskDataCategory}, utils::chunked_vec::ChunkedVec, }; @@ -69,22 +70,33 @@ pub trait BackingStorageSealed: 'static + Send + Sync { /// The caller must verify each returned TaskId by comparing the stored task type which will /// require a second database read fn lookup_task_candidates(&self, key: &CachedTaskType) -> Result>; - /// Looks up and decodes persisted data for a single task, updating the provided storage with - /// data from the database in the given category. + /// Looks up raw persisted bytes for a single task. + /// + /// Returns `None` if the task has no persisted data for the given category. + /// The caller is responsible for decoding the returned bytes. The returned + /// [`ArcBytes`] is ref-counted and zero-copy when backed by a memory-mapped file. fn lookup_data( &self, task_id: TaskId, category: SpecificTaskDataCategory, - storage: &mut TaskStorage, - ) -> Result<()>; + ) -> Result>; - /// Batch lookup and decode data for multiple tasks directly into TypedStorage instances. - /// Returns a vector of TypedStorage, one for each task_id in the input slice. + /// Batch lookup raw bytes for multiple tasks, calling + /// `callback(index, Option<&[u8]>)` for every task in `task_ids` immediately after its + /// entry is resolved. Index corresponds to the position in `task_ids`. `None` means the key + /// was not found in the database (no allocation is made for missing keys). + /// + /// The caller is responsible for decoding the bytes. The byte slice is only valid for the + /// duration of the callback invocation and must not be stored. + /// + /// This avoids allocating an intermediate `TaskStorage` per entry — callers can decode + /// directly into the live in-memory storage, reducing peak memory usage for large batches. fn batch_lookup_data( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Result>; + callback: &mut dyn FnMut(usize, Option<&[u8]>), + ) -> Result<()>; fn compact(&self) -> Result { Ok(false) @@ -142,17 +154,17 @@ where &self, task_id: TaskId, category: SpecificTaskDataCategory, - storage: &mut TaskStorage, - ) -> Result<()> { - either::for_both!(self, this => this.lookup_data(task_id, category, storage)) + ) -> Result> { + either::for_both!(self, this => this.lookup_data(task_id, category)) } fn batch_lookup_data( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Result> { - either::for_both!(self, this => this.batch_lookup_data(task_ids, category)) + callback: &mut dyn FnMut(usize, Option<&[u8]>), + ) -> Result<()> { + either::for_both!(self, this => this.batch_lookup_data(task_ids, category, callback)) } fn compact(&self) -> Result { diff --git a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs index 19ab820051bd9..1749271640f56 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs @@ -1,6 +1,6 @@ use anyhow::Result; use smallvec::SmallVec; -use turbo_persistence::{FamilyConfig, FamilyKind}; +use turbo_persistence::{ArcBytes, FamilyConfig, FamilyKind}; use crate::database::write_batch::ConcurrentWriteBatch; @@ -27,13 +27,24 @@ impl KeySpace { } } + const fn name(&self) -> &'static str { + match self { + KeySpace::Infra => "Infra", + KeySpace::TaskMeta => "TaskMeta", + KeySpace::TaskData => "TaskData", + KeySpace::TaskCache => "TaskCache", + } + } + /// Returns the persistence configuration for this keyspace. pub const fn family_config(&self) -> FamilyConfig { match self { KeySpace::Infra | KeySpace::TaskMeta | KeySpace::TaskData => FamilyConfig { + name: self.name(), kind: FamilyKind::SingleValue, }, KeySpace::TaskCache => FamilyConfig { + name: self.name(), // TaskCache uses hash-based lookups with potential collisions. kind: FamilyKind::MultiValue, }, @@ -46,35 +57,34 @@ pub trait KeyValueDatabase { false } - type ValueBuffer<'l>: std::borrow::Borrow<[u8]> - where - Self: 'l; - - fn get(&self, key_space: KeySpace, key: &[u8]) -> Result>>; + fn get(&self, key_space: KeySpace, key: &[u8]) -> Result>; /// Looks up a key and returns all matching values. /// /// Useful for keyspaces where keys are hashes and collisions are possible (e.g., TaskCache). /// The default implementation returns at most one value (from `get`), but implementations /// that support multiple values per key should override this. - fn get_multiple( - &self, - key_space: KeySpace, - key: &[u8], - ) -> Result; 1]>> { + fn get_multiple(&self, key_space: KeySpace, key: &[u8]) -> Result> { Ok(self.get(key_space, key)?.into_iter().collect()) } - fn batch_get( + /// Calls `callback(index, Option<&[u8]>)` for each key immediately after it is resolved, + /// rather than accumulating all results into a `Vec`. This avoids holding every decompressed + /// value simultaneously in memory. + /// + /// The default implementation falls back to sequential `get` calls. Implementations backed by + /// [`TurboPersistence`] override this to use the native streaming path. + fn batch_get_with( &self, key_space: KeySpace, keys: &[&[u8]], - ) -> Result>>> { - let mut results = Vec::with_capacity(keys.len()); - for key in keys { + mut callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<()> { + for (index, key) in keys.iter().enumerate() { let value = self.get(key_space, key)?; - results.push(value); + let opt_bytes = value.as_deref(); + callback(index, opt_bytes)?; } - Ok(results) + Ok(()) } type ConcurrentWriteBatch<'l>: ConcurrentWriteBatch<'l> diff --git a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs index 893b29fae0414..955fe874828b0 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use turbo_persistence::ArcBytes; use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, @@ -8,12 +9,7 @@ use crate::database::{ pub struct NoopKvDb; impl KeyValueDatabase for NoopKvDb { - type ValueBuffer<'l> - = &'l [u8] - where - Self: 'l; - - fn get(&self, _key_space: KeySpace, _key: &[u8]) -> Result>> { + fn get(&self, _key_space: KeySpace, _key: &[u8]) -> Result> { Ok(None) } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs index a28fb211e05af..8b62799a692ca 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs @@ -87,28 +87,20 @@ impl KeyValueDatabase for TurboKeyValueDatabase { self.db.is_empty() } - type ValueBuffer<'l> - = ArcBytes - where - Self: 'l; - - fn get(&self, key_space: KeySpace, key: &[u8]) -> Result>> { + fn get(&self, key_space: KeySpace, key: &[u8]) -> Result> { self.db.get(key_space as usize, &key) } - fn batch_get( + fn batch_get_with( &self, key_space: KeySpace, keys: &[&[u8]], - ) -> Result>>> { - self.db.batch_get(key_space as usize, keys) + callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>, + ) -> Result<()> { + self.db.batch_get_with(key_space as usize, keys, callback) } - fn get_multiple( - &self, - key_space: KeySpace, - key: &[u8], - ) -> Result; 1]>> { + fn get_multiple(&self, key_space: KeySpace, key: &[u8]) -> Result> { self.db.get_multiple(key_space as usize, &key) } diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 5df10a67875fc..eff88cd005dd1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -7,7 +7,8 @@ use std::{ use anyhow::{Context, Result}; use smallvec::SmallVec; -use turbo_bincode::{new_turbo_bincode_decoder, turbo_bincode_decode, turbo_bincode_encode}; +use turbo_bincode::{turbo_bincode_decode, turbo_bincode_encode}; +use turbo_persistence::ArcBytes; use turbo_tasks::{ TaskId, backend::CachedTaskType, @@ -18,7 +19,7 @@ use turbo_tasks_hash::Xxh3Hash64Hasher; use crate::{ GitVersionInfo, - backend::{AnyOperation, SpecificTaskDataCategory, storage_schema::TaskStorage}, + backend::{AnyOperation, SpecificTaskDataCategory}, backing_storage::{BackingStorage, BackingStorageSealed, SnapshotItem}, database::{ db_invalidation::{StartupCacheState, check_db_invalidation_and_cleanup, invalidate_db}, @@ -47,11 +48,6 @@ impl AsRef<[u8]> for IntKey { } } -fn as_u32(bytes: impl Borrow<[u8]>) -> Result { - let n = u32::from_le_bytes(bytes.borrow().try_into()?); - Ok(n) -} - // We want to invalidate the cache on panic for most users, but this is a band-aid to underlying // problems in turbo-tasks. // @@ -187,7 +183,7 @@ impl KeyValueDatabaseBackingStorageInner { fn get_infra_u32(&self, key: u32) -> Result> { self.database .get(KeySpace::Infra, IntKey::new(key).as_ref())? - .map(as_u32) + .map(|b| Ok(u32::from_le_bytes((*b).try_into()?))) .transpose() } } @@ -212,16 +208,13 @@ impl BackingStorageSealed } fn uncompleted_operations(&self) -> Result> { - fn get(database: &impl KeyValueDatabase) -> Result> { - let Some(operations) = - database.get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())? - else { - return Ok(Vec::new()); - }; - let operations = turbo_bincode_decode(operations.borrow())?; - Ok(operations) - } - get(&self.inner.database).context("Unable to read uncompleted operations from database") + self.inner + .database + .get(KeySpace::Infra, IntKey::new(META_KEY_OPERATIONS).as_ref())? + .map(|b| turbo_bincode_decode(&b)) + .transpose() + .map(Option::unwrap_or_default) + .context("Unable to read uncompleted operations from database") } fn save_snapshot( @@ -298,14 +291,14 @@ impl BackingStorageSealed } fn lookup_task_candidates(&self, task_type: &CachedTaskType) -> Result> { - let inner = &*self.inner; - if inner.database.is_empty() { + if self.inner.database.is_empty() { // Checking if the database is empty is a performance optimization // to avoid computing the hash. return Ok(SmallVec::new()); } let hash = compute_task_type_hash(task_type); - let buffers = inner + let buffers = self + .inner .database .get_multiple(KeySpace::TaskCache, &hash.to_le_bytes()) .with_context(|| { @@ -314,7 +307,7 @@ impl BackingStorageSealed let mut task_ids = SmallVec::with_capacity(buffers.len()); for bytes in buffers { - let bytes = bytes.borrow().try_into()?; + let bytes = (*bytes).try_into()?; let id = TaskId::try_from(u32::from_le_bytes(bytes)).unwrap(); task_ids.push(id); } @@ -325,54 +318,35 @@ impl BackingStorageSealed &self, task_id: TaskId, category: SpecificTaskDataCategory, - storage: &mut TaskStorage, - ) -> Result<()> { - let inner = &*self.inner; - let Some(bytes) = inner + ) -> Result> { + self.inner .database .get(category.key_space(), IntKey::new(*task_id).as_ref()) - .with_context(|| { - format!("Looking up task storage for {task_id} from database failed") - })? - else { - return Ok(()); - }; - let mut decoder = new_turbo_bincode_decoder(bytes.borrow()); - storage - .decode(category, &mut decoder) - .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}")) + .with_context(|| format!("Looking up task storage for {task_id} from database failed")) } fn batch_lookup_data( &self, task_ids: &[TaskId], category: SpecificTaskDataCategory, - ) -> Result> { - let inner = &*self.inner; + callback: &mut dyn FnMut(usize, Option<&[u8]>), + ) -> Result<()> { let int_keys: Vec<_> = task_ids.iter().map(|&id| IntKey::new(*id)).collect(); let keys = int_keys.iter().map(|k| k.as_ref()).collect::>(); - let bytes = inner + self.inner .database - .batch_get(category.key_space(), &keys) + .batch_get_with(category.key_space(), &keys, |index, opt_bytes| { + callback(index, opt_bytes); + // `opt_bytes` (and any underlying ArcBytes) is dropped here before the next + // iteration, so only one decompressed block is live at a time. + Ok(()) + }) .with_context(|| { format!( "Looking up typed data for {} tasks from database failed", task_ids.len() ) - })?; - bytes - .into_iter() - .map(|opt_bytes| { - let mut storage = TaskStorage::new(); - if let Some(bytes) = opt_bytes { - let mut decoder = new_turbo_bincode_decoder(bytes.borrow()); - storage - .decode(category, &mut decoder) - .map_err(|e| anyhow::anyhow!("Failed to decode {category:?}: {e:?}"))?; - } - Ok(storage) }) - .collect::>>() } fn compact(&self) -> Result {