diff --git a/crates/liquidity-sources/src/recent_block_cache.rs b/crates/liquidity-sources/src/recent_block_cache.rs index ba3dee67c7..75b4e4b8e0 100644 --- a/crates/liquidity-sources/src/recent_block_cache.rs +++ b/crates/liquidity-sources/src/recent_block_cache.rs @@ -8,10 +8,10 @@ //! - Automatically updating the cache is decoupled from normal on-chain data //! fetches. //! -//! A result of this is that it is possible that the same uncached entry is -//! requested multiple times simultaneously and some work is wasted. This is -//! unlikely to happen in practice and the value is going to be cached the next -//! time it is needed. +//! A result of this design is that simultaneous requests for the same +//! uncached entries can still perform some duplicate work before the results +//! are cached. However, cache misses are fetched in batches to avoid excessive +//! per-entry fan-out and RPC overhead. //! //! When entries are requested we mark all those entries as recently used which //! potentially evicts other entries from the lru cache. Cache misses are @@ -26,13 +26,12 @@ use { alloy::eips::BlockId, - anyhow::{Context, Result}, + anyhow::Result, cached::{Cached, SizedCache}, ethrpc::block_stream::CurrentBlockWatcher, - futures::{FutureExt, StreamExt}, + futures::StreamExt, itertools::Itertools, prometheus::IntCounterVec, - request_sharing::BoxRequestSharing, std::{ cmp, collections::{BTreeMap, HashMap, HashSet, hash_map::Entry}, @@ -109,7 +108,6 @@ where delay_between_retries: Duration, metrics: &'static Metrics, metrics_label: &'static str, - requests: BoxRequestSharing<(K, Block), Option>>, } #[derive(Clone, Copy, Debug)] @@ -179,7 +177,6 @@ where delay_between_retries: config.delay_between_retries, metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(), metrics_label, - requests: BoxRequestSharing::labelled("liquidity_fetching".into()), }); Self::spawn_gc_task( @@ -246,41 +243,23 @@ where } async fn fetch_inner_many(&self, keys: HashSet, block: Block) -> Result> { - let fetched = - futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block))) - .await; - let fetched: Vec<_> = fetched - .into_iter() - .filter_map(|res| res.ok()) - .flatten() - .collect(); - Ok(fetched) - } - - // Sometimes nodes requests error when we try to get state from what we think is - // the current block when the node has been load balanced out to one that - // hasn't seen the block yet. As a workaround we repeat the request up to N - // times while sleeping in between. - async fn fetch_inner(&self, key: K, block: Block) -> Result> { - let retries = self.maximum_retries; - let delay = self.delay_between_retries; - let fetcher = self.fetcher.clone(); - let shared = self.requests.shared_or_else((key, block), |entry| { - let (key, block) = entry.clone(); - async move { - for _ in 0..=retries { - let keys = [key.clone()].into(); - match fetcher.fetch_values(keys, block).await { - Ok(values) => return Some(values), - Err(err) => tracing::warn!("retrying fetch because error: {:?}", err), - } - tokio::time::sleep(delay).await; + if keys.is_empty() { + return Ok(Vec::new()); + } + let mut last_err = None; + for attempt in 0..=self.maximum_retries { + match self.fetcher.fetch_values(keys.clone(), block).await { + Ok(values) => return Ok(values), + Err(err) => { + tracing::warn!("retrying fetch because error: {:?}", err); + last_err = Some(err); } - None } - .boxed() - }); - shared.await.context("could not fetch liquidity") + if attempt < self.maximum_retries { + tokio::time::sleep(self.delay_between_retries).await; + } + } + Err(last_err.unwrap().context("could not fetch liquidity")) } async fn fetch(&self, keys: impl IntoIterator, block: Block) -> Result> { @@ -386,7 +365,7 @@ where } fn get(&mut self, key: K, block: Option) -> Option<&[V]> { - let allow_background_udpates = block.is_some(); + let allow_background_updates = block.is_some(); let block = block.or_else(|| { self.cached_most_recently_at_block .get(&key) @@ -396,7 +375,7 @@ where }) })?; let result = self.entries.get(&(block, key.clone())).map(Vec::as_slice); - if allow_background_udpates && result.is_some_and(|values| !values.is_empty()) { + if allow_background_updates && result.is_some_and(|values| !values.is_empty()) { self.recently_used.cache_set(key, ()); } result @@ -433,21 +412,24 @@ where fn remove_cached_blocks_older_than(&mut self, oldest_to_keep: u64) { tracing::debug!("dropping blocks older than {} from cache", oldest_to_keep); self.entries = self.entries.split_off(&(oldest_to_keep, K::first_ord())); - - // Iterate from newest block to oldest block and only keep the most recent - // liquidity around to reduce memory consumption. + // Iterate from the newest block to the oldest block and only keep the most + // recent liquidity around to reduce memory consumption. Empty entries + // are valid negative cache entries and must be kept for the most recent + // block. let mut cached_keys = HashSet::new(); + let mut entries_to_remove = Vec::new(); let mut items = 0; - for ((_block, key), values) in self.entries.iter_mut().rev() { - if !cached_keys.insert(key) { - *values = vec![]; + for ((block, key), values) in self.entries.iter().rev() { + if !cached_keys.insert((*key).clone()) { + entries_to_remove.push((*block, (*key).clone())); } else { items += values.len(); } } // Afterwards drop all entries that are now empty. - self.entries.retain(|_, values| !values.is_empty()); - + for entry in entries_to_remove { + self.entries.remove(&entry); + } self.cached_most_recently_at_block .retain(|_, block| *block >= oldest_to_keep); tracing::debug!( @@ -870,4 +852,65 @@ mod tests { assert!(cache.mutexed.lock().unwrap().get(key, Some(8)).is_some()); assert!(cache.mutexed.lock().unwrap().get(key, None).is_some()); } + + #[tokio::test] + async fn negative_cache_entries_survive_gc() { + // Key 0 has on-chain data; key 1 has none (negative cache entry). + let fetcher = FakeCacheFetcher::new(vec![TestValue::new(0, "a")]); + let block = |number| BlockInfo { + number, + ..Default::default() + }; + let (block_sender, block_stream) = tokio::sync::watch::channel(block(10)); + let cache = RecentBlockCache::new( + CacheConfig { + number_of_blocks_to_cache: NonZeroU64::new(2).unwrap(), + number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(), + ..Default::default() + }, + fetcher, + block_stream, + "", + ) + .unwrap() + .inner; + + // Populate the cache: key 0 gets a value, key 1 gets a negative entry. + cache + .fetch(test_keys(0..2), Block::Number(10)) + .await + .unwrap(); + assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 2); + + // Key 1 has no on-chain data so get() never adds it to recently_used + // (the guard requires a non-empty result). Add it manually so the + // background updater re-fetches it each cycle and re-inserts the + // negative entry, giving GC the opportunity to destroy it. + cache + .mutexed + .lock() + .unwrap() + .recently_used + .cache_set(TestKey(1), ()); + + // Advance two blocks, triggering GC each time. The updater re-fetches + // both keys, still finds no data for key 1, and re-inserts its negative + // entry. + block_sender.send(block(11)).unwrap(); + cache.update_cache_at_block(11).await.unwrap(); + block_sender.send(block(12)).unwrap(); + cache.update_cache_at_block(12).await.unwrap(); + + // Negative entry for key 1 must still be in the cache — get() should + // return Some(&[]) rather than None. + let mut mutexed = cache.mutexed.lock().unwrap(); + assert!( + mutexed.get(TestKey(0), None).is_some(), + "key 0 should still be cached" + ); + assert!( + matches!(mutexed.get(TestKey(1), None), Some(&[])), + "key 1 negative entry must survive GC; got None (treated as cache miss)" + ); + } }