@@ -106,6 +106,45 @@ struct TrackedStats {
106106 miss_global : std:: sync:: atomic:: AtomicU64 ,
107107}
108108
109+ /// A compact bitset for tracking which batch-lookup cells have been resolved.
110+ ///
111+ /// Uses `u64` words (1 bit per cell) rather than `Vec<bool>` (1 byte per cell) for an 8x
112+ /// reduction in memory and better cache behaviour on large batches.
113+ pub struct FoundBitset {
114+ words : Box < [ u64 ] > ,
115+ len : usize ,
116+ }
117+
118+ impl FoundBitset {
119+ pub ( crate ) fn new ( len : usize ) -> Self {
120+ let words = vec ! [ 0u64 ; len. div_ceil( 64 ) ] . into_boxed_slice ( ) ;
121+ Self { words, len }
122+ }
123+
124+ #[ inline]
125+ pub ( crate ) fn get ( & self , index : usize ) -> bool {
126+ debug_assert ! ( index < self . len) ;
127+ ( self . words [ index / 64 ] >> ( index % 64 ) ) & 1 == 1
128+ }
129+
130+ #[ inline]
131+ pub ( crate ) fn set ( & mut self , index : usize ) {
132+ debug_assert ! ( index < self . len) ;
133+ let wi = index / 64 ;
134+ let bi = index % 64 ;
135+ self . words [ wi] |= 1 << bi;
136+ }
137+
138+ /// Returns the number of bits that are set (i.e. not yet found).
139+ pub ( crate ) fn count_ones ( & self ) -> usize {
140+ self . words . iter ( ) . map ( |w| w. count_ones ( ) ) . sum :: < u32 > ( ) as usize
141+ }
142+
143+ pub ( crate ) fn len ( & self ) -> usize {
144+ self . len
145+ }
146+ }
147+
109148/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time
110149/// using a single write batch. It allows for concurrent reads.
111150pub struct TurboPersistence < S : ParallelScheduler , const FAMILIES : usize > {
@@ -1603,11 +1642,36 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
16031642 Ok ( output)
16041643 }
16051644
1645+ /// Looks up multiple keys in batch and collects results into a `Vec`.
1646+ ///
1647+ /// For large batches where memory pressure matters, prefer
1648+ /// [`batch_get_with`][Self::batch_get_with] which calls a callback per entry without
1649+ /// accumulating all decoded bytes simultaneously.
16061650 pub fn batch_get < K : QueryKey > (
16071651 & self ,
16081652 family : usize ,
16091653 keys : & [ K ] ,
16101654 ) -> Result < Vec < Option < ArcBytes > > > {
1655+ let mut results = vec ! [ None ; keys. len( ) ] ;
1656+ self . batch_get_with ( family, keys, |index, opt_bytes| {
1657+ results[ index] = opt_bytes. map ( |b| ArcBytes :: from ( b. to_vec ( ) . into_boxed_slice ( ) ) ) ;
1658+ Ok ( ( ) )
1659+ } ) ?;
1660+ Ok ( results)
1661+ }
1662+
1663+ /// Looks up multiple keys in batch, calling `callback(index, Option<&[u8]>)` for each entry
1664+ /// immediately after it is resolved rather than accumulating all results into a `Vec`.
1665+ ///
1666+ /// This keeps at most one decompressed value block live at a time, significantly reducing
1667+ /// peak memory when the batch is large. The callback receives the 0-based key index and the
1668+ /// value bytes (`None` for not-found or deleted). Callback errors propagate immediately.
1669+ pub fn batch_get_with < K : QueryKey > (
1670+ & self ,
1671+ family : usize ,
1672+ keys : & [ K ] ,
1673+ mut callback : impl FnMut ( usize , Option < & [ u8 ] > ) -> Result < ( ) > ,
1674+ ) -> Result < ( ) > {
16111675 debug_assert ! ( family < FAMILIES , "Family index out of bounds" ) ;
16121676 if self . config . family_configs [ family] . kind != FamilyKind :: SingleValue {
16131677 // This is an error in our caller so just panic
@@ -1622,22 +1686,38 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
16221686 result_size = tracing:: field:: Empty
16231687 )
16241688 . entered ( ) ;
1625- let mut cells: Vec < ( u64 , usize , Option < LookupValue > ) > = Vec :: with_capacity ( keys. len ( ) ) ;
1626- let mut empty_cells = keys. len ( ) ;
1689+ let mut cells: Vec < ( u64 , usize ) > = Vec :: with_capacity ( keys. len ( ) ) ;
16271690 for ( index, key) in keys. iter ( ) . enumerate ( ) {
16281691 let hash = hash_key ( key) ;
1629- cells. push ( ( hash, index, None ) ) ;
1692+ cells. push ( ( hash, index) ) ;
16301693 }
1631- cells. sort_by_key ( |( hash, _, _) | * hash) ;
1694+ cells. sort_unstable_by_key ( |( hash, _) | * hash) ;
1695+ let mut found = FoundBitset :: new ( cells. len ( ) ) ;
16321696 let inner = self . inner . read ( ) ;
1697+ let mut not_found = 0 ;
1698+ let mut deleted = 0 ;
1699+ let mut result_size = 0 ;
1700+ let mut read_blob = |seq| self . read_blob ( seq) ;
1701+ // Wrap the callback to track stats (deleted/result_size) for found keys.
1702+ // not_found is tracked separately in the post-loop over cells.
1703+ let mut stats_callback = |index : usize , opt_bytes : Option < & [ u8 ] > | -> Result < ( ) > {
1704+ if let Some ( bytes) = opt_bytes {
1705+ result_size += bytes. len ( ) ;
1706+ } else {
1707+ deleted += 1 ;
1708+ }
1709+ callback ( index, opt_bytes)
1710+ } ;
16331711 for meta in inner. meta_files . iter ( ) . rev ( ) {
1634- let _result = meta. batch_lookup (
1712+ let ( all_found , _result) = meta. batch_lookup_with (
16351713 family as u32 ,
16361714 keys,
1637- & mut cells,
1638- & mut empty_cells ,
1715+ & cells,
1716+ & mut found ,
16391717 & self . key_block_cache ,
16401718 & self . value_block_cache ,
1719+ & mut read_blob,
1720+ & mut stats_callback,
16411721 ) ?;
16421722
16431723 #[ cfg( feature = "stats" ) ]
@@ -1669,49 +1749,25 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
16691749 }
16701750 }
16711751
1672- if empty_cells == 0 {
1752+ if all_found {
16731753 break ;
16741754 }
16751755 }
1676- let mut deleted = 0 ;
1677- let mut not_found = 0 ;
1678- let mut result_size = 0 ;
1679- let mut results = vec ! [ None ; keys. len( ) ] ;
1680- for ( hash, index, result) in cells {
1681- if let Some ( result) = result {
1756+ // Record accessed hashes for found keys, and fire callback for not-found keys.
1757+ for ( hash, index) in cells. into_iter ( ) {
1758+ if found. get ( index) {
16821759 inner. accessed_key_hashes [ family] . insert ( hash) ;
1683- let result = match result {
1684- LookupValue :: Deleted => {
1685- #[ cfg( feature = "stats" ) ]
1686- self . stats . hits_deleted . fetch_add ( 1 , Ordering :: Relaxed ) ;
1687- deleted += 1 ;
1688- None
1689- }
1690- LookupValue :: Slice { value } => {
1691- #[ cfg( feature = "stats" ) ]
1692- self . stats . hits_small . fetch_add ( 1 , Ordering :: Relaxed ) ;
1693- result_size += value. len ( ) ;
1694- Some ( value)
1695- }
1696- LookupValue :: Blob { sequence_number } => {
1697- #[ cfg( feature = "stats" ) ]
1698- self . stats . hits_blob . fetch_add ( 1 , Ordering :: Relaxed ) ;
1699- let blob = self . read_blob ( sequence_number) ?;
1700- result_size += blob. len ( ) ;
1701- Some ( blob)
1702- }
1703- } ;
1704- results[ index] = result;
17051760 } else {
17061761 #[ cfg( feature = "stats" ) ]
17071762 self . stats . miss_global . fetch_add ( 1 , Ordering :: Relaxed ) ;
17081763 not_found += 1 ;
1764+ callback ( index, None ) ?;
17091765 }
17101766 }
17111767 span. record ( "not_found" , not_found) ;
17121768 span. record ( "deleted" , deleted) ;
17131769 span. record ( "result_size" , result_size) ;
1714- Ok ( results )
1770+ Ok ( ( ) )
17151771 }
17161772
17171773 /// Returns database statistics.
0 commit comments