Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 93 additions & 37 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,45 @@ struct TrackedStats {
miss_global: std::sync::atomic::AtomicU64,
}

/// A compact bitset for tracking which batch-lookup cells have been resolved.
///
/// Uses `u64` words (1 bit per cell) rather than `Vec<bool>` (1 byte per cell) for an 8x
/// reduction in memory and better cache behaviour on large batches.
pub struct FoundBitset {
words: Box<[u64]>,
len: usize,
}

impl FoundBitset {
pub(crate) fn new(len: usize) -> Self {
let words = vec![0u64; len.div_ceil(64)].into_boxed_slice();
Self { words, len }
}

#[inline]
pub(crate) fn get(&self, index: usize) -> bool {
debug_assert!(index < self.len);
(self.words[index / 64] >> (index % 64)) & 1 == 1
}

#[inline]
pub(crate) fn set(&mut self, index: usize) {
debug_assert!(index < self.len);
let wi = index / 64;
let bi = index % 64;
self.words[wi] |= 1 << bi;
}

/// Returns the number of bits that are set.
pub(crate) fn count_ones(&self) -> usize {
self.words.iter().map(|w| w.count_ones()).sum::<u32>() as usize
}

pub(crate) fn len(&self) -> usize {
self.len
}
}

/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
/// using a single write batch. It allows for concurrent reads.
pub struct TurboPersistence<S: ParallelScheduler, const FAMILIES: usize> {
Expand Down Expand Up @@ -1603,11 +1642,36 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
Ok(output)
}

/// Looks up multiple keys in batch and collects results into a `Vec`.
///
/// For large batches where memory pressure matters, prefer
/// [`batch_get_with`][Self::batch_get_with] which calls a callback per entry without
/// accumulating all decoded bytes simultaneously.
pub fn batch_get<K: QueryKey>(
&self,
family: usize,
keys: &[K],
) -> Result<Vec<Option<ArcBytes>>> {
let mut results = vec![None; keys.len()];
self.batch_get_with(family, keys, |index, opt_bytes| {
results[index] = opt_bytes.map(|b| ArcBytes::from(b.to_vec().into_boxed_slice()));
Ok(())
})?;
Ok(results)
}

/// Looks up multiple keys in batch, calling `callback(index, Option<&[u8]>)` for each entry
/// immediately after it is resolved rather than accumulating all results into a `Vec`.
///
/// This keeps at most one decompressed value block live at a time, significantly reducing
/// peak memory when the batch is large. The callback receives the 0-based key index and the
/// value bytes (`None` for not-found or deleted). Callback errors propagate immediately.
pub fn batch_get_with<K: QueryKey>(
&self,
family: usize,
keys: &[K],
mut callback: impl FnMut(usize, Option<&[u8]>) -> Result<()>,
) -> Result<()> {
debug_assert!(family < FAMILIES, "Family index out of bounds");
if self.config.family_configs[family].kind != FamilyKind::SingleValue {
// This is an error in our caller so just panic
Expand All @@ -1622,22 +1686,38 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
result_size = tracing::field::Empty
)
.entered();
let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
let mut empty_cells = keys.len();
let mut cells: Vec<(u64, usize)> = Vec::with_capacity(keys.len());
for (index, key) in keys.iter().enumerate() {
let hash = hash_key(key);
cells.push((hash, index, None));
cells.push((hash, index));
}
cells.sort_by_key(|(hash, _, _)| *hash);
cells.sort_unstable_by_key(|(hash, _)| *hash);
let mut found = FoundBitset::new(cells.len());
let inner = self.inner.read();
let mut not_found = 0;
let mut deleted = 0;
let mut result_size = 0;
let mut read_blob = |seq| self.read_blob(seq);
// Wrap the callback to track stats (deleted/result_size) for found keys.
// not_found is tracked separately in the post-loop over cells.
let mut stats_callback = |index: usize, opt_bytes: Option<&[u8]>| -> Result<()> {
if let Some(bytes) = opt_bytes {
result_size += bytes.len();
} else {
deleted += 1;
}
callback(index, opt_bytes)
};
for meta in inner.meta_files.iter().rev() {
let _result = meta.batch_lookup(
let (all_found, _result) = meta.batch_lookup_with(
family as u32,
keys,
&mut cells,
&mut empty_cells,
&cells,
&mut found,
&self.key_block_cache,
&self.value_block_cache,
&mut read_blob,
&mut stats_callback,
)?;

#[cfg(feature = "stats")]
Expand Down Expand Up @@ -1669,49 +1749,25 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
}
}

if empty_cells == 0 {
if all_found {
break;
}
}
let mut deleted = 0;
let mut not_found = 0;
let mut result_size = 0;
let mut results = vec![None; keys.len()];
for (hash, index, result) in cells {
if let Some(result) = result {
// Record accessed hashes for found keys, and fire callback for not-found keys.
for (hash, index) in cells.into_iter() {
if found.get(index) {
inner.accessed_key_hashes[family].insert(hash);
let result = match result {
LookupValue::Deleted => {
#[cfg(feature = "stats")]
self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
deleted += 1;
None
}
LookupValue::Slice { value } => {
#[cfg(feature = "stats")]
self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
result_size += value.len();
Some(value)
}
LookupValue::Blob { sequence_number } => {
#[cfg(feature = "stats")]
self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
let blob = self.read_blob(sequence_number)?;
result_size += blob.len();
Some(blob)
}
};
results[index] = result;
} else {
#[cfg(feature = "stats")]
self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
not_found += 1;
callback(index, None)?;
}
}
span.record("not_found", not_found);
span.record("deleted", deleted);
span.record("result_size", result_size);
Ok(results)
Ok(())
}

/// Returns database statistics.
Expand Down
84 changes: 61 additions & 23 deletions turbopack/crates/turbo-persistence/src/meta_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use smallvec::SmallVec;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Ref, big_endian as be};

use crate::{
QueryKey,
ArcBytes, QueryKey,
db::FoundBitset,
lookup_entry::LookupValue,
mmap_helper::advise_mmap_for_persistence,
static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData},
Expand Down Expand Up @@ -498,33 +499,44 @@ impl MetaFile {
Ok(miss_result)
}

pub fn batch_lookup<K: QueryKey>(
pub fn batch_lookup_with<K: QueryKey>(
&self,
key_family: u32,
keys: &[K],
cells: &mut [(u64, usize, Option<LookupValue>)],
empty_cells: &mut usize,
cells: &[(u64, usize)],
found: &mut FoundBitset,
key_block_cache: &BlockCache,
value_block_cache: &BlockCache,
) -> Result<MetaBatchLookupResult> {
read_blob: &mut impl FnMut(u32) -> Result<ArcBytes>,
callback: &mut impl FnMut(usize, Option<&[u8]>) -> Result<()>,
) -> Result<(bool, MetaBatchLookupResult)> {
debug_assert_eq!(
cells.len(),
found.len(),
"cells and found must have the same length"
);
if key_family != self.family {
#[cfg(feature = "stats")]
return Ok(MetaBatchLookupResult {
family_miss: true,
..Default::default()
});
return Ok((
false,
MetaBatchLookupResult {
family_miss: true,
..Default::default()
},
));
#[cfg(not(feature = "stats"))]
return Ok(MetaBatchLookupResult {});
return Ok((false, MetaBatchLookupResult {}));
}
debug_assert!(
cells.is_sorted_by_key(|(hash, _, _)| *hash),
cells.is_sorted_by_key(|(hash, _)| *hash),
"Cells must be sorted by key hash"
);
#[allow(unused_mut, reason = "It's used when stats are enabled")]
let mut lookup_result = MetaBatchLookupResult::default();
let mut empty_cells = found.len() - found.count_ones();
for entry in self.entries.iter().rev() {
let start_index = cells
.binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater))
let mut start_index = cells
.binary_search_by(|(hash, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater))
.err()
.unwrap();
if start_index >= cells.len() {
Expand All @@ -534,11 +546,23 @@ impl MetaFile {
}
continue;
}
let end_index = cells
.binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less))
// Advance to the first not found cell.
while start_index < cells.len() && found.get(cells[start_index].1) {
start_index += 1;
}
if start_index >= cells.len() {
#[cfg(feature = "stats")]
{
lookup_result.range_misses += 1;
}
continue;
}
let end_index = cells[start_index..]
.binary_search_by(|(hash, _)| hash.cmp(&entry.max_hash).then(Ordering::Less))
.err()
.unwrap()
.checked_sub(1);
.checked_sub(1)
.map(|i| i + start_index);
let Some(end_index) = end_index else {
#[cfg(feature = "stats")]
{
Expand All @@ -553,12 +577,13 @@ impl MetaFile {
}
continue;
}
for (hash, index, result) in &mut cells[start_index..=end_index] {
for (hash, index) in cells[start_index..=end_index].iter() {
debug_assert!(
*hash >= entry.min_hash && *hash <= entry.max_hash,
"Key hash out of range"
);
if result.is_some() {
if found.get(*index) {
// we already found this key in a different meta-file
continue;
}
if !entry.amqf.contains_fingerprint(*hash) {
Expand All @@ -580,14 +605,27 @@ impl MetaFile {
let Some(value) = values.pop() else {
unreachable!()
};
*result = Some(value);
*empty_cells -= 1;
// Resolve and callback immediately — the ArcBytes/blob is dropped before
// the next iteration so we never hold more than one decompressed block at a
// time.
let blob;
let opt_bytes: Option<&[u8]> = match &value {
LookupValue::Deleted => None,
LookupValue::Slice { value } => Some(value),
LookupValue::Blob { sequence_number } => {
blob = read_blob(*sequence_number)?;
Some(&*blob)
}
};
found.set(*index);
empty_cells -= 1;
#[cfg(feature = "stats")]
{
lookup_result.hits += 1;
}
if *empty_cells == 0 {
return Ok(lookup_result);
callback(*index, opt_bytes)?;
if empty_cells == 0 {
return Ok((true, lookup_result));
}
} else {
#[cfg(feature = "stats")]
Expand All @@ -597,6 +635,6 @@ impl MetaFile {
}
}
}
Ok(lookup_result)
Ok((false, lookup_result))
}
}
Loading
Loading