@@ -106,6 +106,45 @@ struct TrackedStats {
106106 miss_global : std:: sync:: atomic:: AtomicU64 ,
107107}
108108
109+ /// A compact bitset for tracking which batch-lookup cells have been resolved.
110+ ///
111+ /// Uses `u64` words (1 bit per cell) rather than `Vec<bool>` (1 byte per cell) for an 8x
112+ /// reduction in memory and better cache behaviour on large batches.
113+ pub struct FoundBitset {
114+ words : Box < [ u64 ] > ,
115+ len : usize ,
116+ }
117+
118+ impl FoundBitset {
119+ pub ( crate ) fn new ( len : usize ) -> Self {
120+ let words = vec ! [ 0u64 ; len. div_ceil( 64 ) ] . into_boxed_slice ( ) ;
121+ Self { words, len }
122+ }
123+
124+ #[ inline]
125+ pub ( crate ) fn get ( & self , index : usize ) -> bool {
126+ debug_assert ! ( index < self . len) ;
127+ ( self . words [ index / 64 ] >> ( index % 64 ) ) & 1 == 1
128+ }
129+
130+ #[ inline]
131+ pub ( crate ) fn set ( & mut self , index : usize ) {
132+ debug_assert ! ( index < self . len) ;
133+ let wi = index / 64 ;
134+ let bi = index % 64 ;
135+ self . words [ wi] |= 1 << bi;
136+ }
137+
138+ /// Returns the number of bits that are set (i.e. not yet found).
139+ pub ( crate ) fn count_ones ( & self ) -> usize {
140+ self . words . iter ( ) . map ( |w| w. count_ones ( ) ) . sum :: < u32 > ( ) as usize
141+ }
142+
143+ pub ( crate ) fn len ( & self ) -> usize {
144+ self . len
145+ }
146+ }
147+
109148/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
110149/// using a single write batch. It allows for concurrent reads.
111150pub struct TurboPersistence < S : ParallelScheduler , const FAMILIES : usize > {
@@ -1564,11 +1603,36 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
15641603 Ok ( output)
15651604 }
15661605
1606+ /// Looks up multiple keys in batch and collects results into a `Vec`.
1607+ ///
1608+ /// For large batches where memory pressure matters, prefer
1609+ /// [`batch_get_with`][Self::batch_get_with] which calls a callback per entry without
1610+ /// accumulating all decoded bytes simultaneously.
15671611 pub fn batch_get < K : QueryKey > (
15681612 & self ,
15691613 family : usize ,
15701614 keys : & [ K ] ,
15711615 ) -> Result < Vec < Option < ArcBytes > > > {
1616+ let mut results = vec ! [ None ; keys. len( ) ] ;
1617+ self . batch_get_with ( family, keys, |index, opt_bytes| {
1618+ results[ index] = opt_bytes. map ( |b| ArcBytes :: from ( b. to_vec ( ) . into_boxed_slice ( ) ) ) ;
1619+ Ok ( ( ) )
1620+ } ) ?;
1621+ Ok ( results)
1622+ }
1623+
1624+ /// Looks up multiple keys in batch, calling `callback(index, Option<&[u8]>)` for each entry
1625+ /// immediately after it is resolved rather than accumulating all results into a `Vec`.
1626+ ///
1627+ /// This keeps at most one decompressed value block live at a time, significantly reducing
1628+ /// peak memory when the batch is large. The callback receives the 0-based key index and the
1629+ /// value bytes (`None` for not-found or deleted). Callback errors propagate immediately.
1630+ pub fn batch_get_with < K : QueryKey > (
1631+ & self ,
1632+ family : usize ,
1633+ keys : & [ K ] ,
1634+ mut callback : impl FnMut ( usize , Option < & [ u8 ] > ) -> Result < ( ) > ,
1635+ ) -> Result < ( ) > {
15721636 debug_assert ! ( family < FAMILIES , "Family index out of bounds" ) ;
15731637 if self . config . family_configs [ family] . kind != FamilyKind :: SingleValue {
15741638 // This is an error in our caller so just panic
@@ -1583,22 +1647,38 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
15831647 result_size = tracing:: field:: Empty
15841648 )
15851649 . entered ( ) ;
1586- let mut cells: Vec < ( u64 , usize , Option < LookupValue > ) > = Vec :: with_capacity ( keys. len ( ) ) ;
1587- let mut empty_cells = keys. len ( ) ;
1650+ let mut cells: Vec < ( u64 , usize ) > = Vec :: with_capacity ( keys. len ( ) ) ;
15881651 for ( index, key) in keys. iter ( ) . enumerate ( ) {
15891652 let hash = hash_key ( key) ;
1590- cells. push ( ( hash, index, None ) ) ;
1653+ cells. push ( ( hash, index) ) ;
15911654 }
1592- cells. sort_by_key ( |( hash, _, _) | * hash) ;
1655+ cells. sort_unstable_by_key ( |( hash, _) | * hash) ;
1656+ let mut found = FoundBitset :: new ( cells. len ( ) ) ;
15931657 let inner = self . inner . read ( ) ;
1658+ let mut not_found = 0 ;
1659+ let mut deleted = 0 ;
1660+ let mut result_size = 0 ;
1661+ let mut read_blob = |seq| self . read_blob ( seq) ;
1662+ // Wrap the callback to track stats (deleted/result_size) for found keys.
1663+ // not_found is tracked separately in the post-loop over cells.
1664+ let mut stats_callback = |index : usize , opt_bytes : Option < & [ u8 ] > | -> Result < ( ) > {
1665+ if let Some ( bytes) = opt_bytes {
1666+ result_size += bytes. len ( ) ;
1667+ } else {
1668+ deleted += 1 ;
1669+ }
1670+ callback ( index, opt_bytes)
1671+ } ;
15941672 for meta in inner. meta_files . iter ( ) . rev ( ) {
1595- let _result = meta. batch_lookup (
1673+ let ( all_found , _result) = meta. batch_lookup_with (
15961674 family as u32 ,
15971675 keys,
1598- & mut cells,
1599- & mut empty_cells ,
1676+ & cells,
1677+ & mut found ,
16001678 & self . key_block_cache ,
16011679 & self . value_block_cache ,
1680+ & mut read_blob,
1681+ & mut stats_callback,
16021682 ) ?;
16031683
16041684 #[ cfg( feature = "stats" ) ]
@@ -1630,49 +1710,25 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
16301710 }
16311711 }
16321712
1633- if empty_cells == 0 {
1713+ if all_found {
16341714 break ;
16351715 }
16361716 }
1637- let mut deleted = 0 ;
1638- let mut not_found = 0 ;
1639- let mut result_size = 0 ;
1640- let mut results = vec ! [ None ; keys. len( ) ] ;
1641- for ( hash, index, result) in cells {
1642- if let Some ( result) = result {
1717+ // Record accessed hashes for found keys, and fire callback for not-found keys.
1718+ for ( hash, index) in cells. into_iter ( ) {
1719+ if found. get ( index) {
16431720 inner. accessed_key_hashes [ family] . insert ( hash) ;
1644- let result = match result {
1645- LookupValue :: Deleted => {
1646- #[ cfg( feature = "stats" ) ]
1647- self . stats . hits_deleted . fetch_add ( 1 , Ordering :: Relaxed ) ;
1648- deleted += 1 ;
1649- None
1650- }
1651- LookupValue :: Slice { value } => {
1652- #[ cfg( feature = "stats" ) ]
1653- self . stats . hits_small . fetch_add ( 1 , Ordering :: Relaxed ) ;
1654- result_size += value. len ( ) ;
1655- Some ( value)
1656- }
1657- LookupValue :: Blob { sequence_number } => {
1658- #[ cfg( feature = "stats" ) ]
1659- self . stats . hits_blob . fetch_add ( 1 , Ordering :: Relaxed ) ;
1660- let blob = self . read_blob ( sequence_number) ?;
1661- result_size += blob. len ( ) ;
1662- Some ( blob)
1663- }
1664- } ;
1665- results[ index] = result;
16661721 } else {
16671722 #[ cfg( feature = "stats" ) ]
16681723 self . stats . miss_global . fetch_add ( 1 , Ordering :: Relaxed ) ;
16691724 not_found += 1 ;
1725+ callback ( index, None ) ?;
16701726 }
16711727 }
16721728 span. record ( "not_found" , not_found) ;
16731729 span. record ( "deleted" , deleted) ;
16741730 span. record ( "result_size" , result_size) ;
1675- Ok ( results )
1731+ Ok ( ( ) )
16761732 }
16771733
16781734 /// Returns database statistics.
0 commit comments