Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 95 additions & 52 deletions crates/liquidity-sources/src/recent_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
//! - Automatically updating the cache is decoupled from normal on-chain data
//! fetches.
//!
//! A result of this is that it is possible that the same uncached entry is
//! requested multiple times simultaneously and some work is wasted. This is
//! unlikely to happen in practice and the value is going to be cached the next
//! time it is needed.
//! A result of this design is that simultaneous requests for the same
//! uncached entries can still perform some duplicate work before the results
//! are cached. However, cache misses are fetched in batches to avoid excessive
//! per-entry fan-out and RPC overhead.
//!
//! When entries are requested we mark all those entries as recently used which
//! potentially evicts other entries from the lru cache. Cache misses are
Expand All @@ -26,13 +26,12 @@

use {
alloy::eips::BlockId,
anyhow::{Context, Result},
anyhow::Result,
cached::{Cached, SizedCache},
ethrpc::block_stream::CurrentBlockWatcher,
futures::{FutureExt, StreamExt},
futures::StreamExt,
itertools::Itertools,
prometheus::IntCounterVec,
request_sharing::BoxRequestSharing,
std::{
cmp,
collections::{BTreeMap, HashMap, HashSet, hash_map::Entry},
Expand Down Expand Up @@ -109,7 +108,6 @@ where
delay_between_retries: Duration,
metrics: &'static Metrics,
metrics_label: &'static str,
requests: BoxRequestSharing<(K, Block), Option<Vec<V>>>,
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -179,7 +177,6 @@ where
delay_between_retries: config.delay_between_retries,
metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(),
metrics_label,
requests: BoxRequestSharing::labelled("liquidity_fetching".into()),
});

Self::spawn_gc_task(
Expand Down Expand Up @@ -246,41 +243,23 @@ where
}

async fn fetch_inner_many(&self, keys: HashSet<K>, block: Block) -> Result<Vec<V>> {
Comment thread
metalurgical marked this conversation as resolved.
let fetched =
futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block)))
.await;
let fetched: Vec<_> = fetched
.into_iter()
.filter_map(|res| res.ok())
.flatten()
.collect();
Ok(fetched)
}

// Sometimes nodes requests error when we try to get state from what we think is
// the current block when the node has been load balanced out to one that
// hasn't seen the block yet. As a workaround we repeat the request up to N
// times while sleeping in between.
async fn fetch_inner(&self, key: K, block: Block) -> Result<Vec<V>> {
let retries = self.maximum_retries;
let delay = self.delay_between_retries;
let fetcher = self.fetcher.clone();
let shared = self.requests.shared_or_else((key, block), |entry| {
let (key, block) = entry.clone();
async move {
for _ in 0..=retries {
let keys = [key.clone()].into();
match fetcher.fetch_values(keys, block).await {
Ok(values) => return Some(values),
Err(err) => tracing::warn!("retrying fetch because error: {:?}", err),
}
tokio::time::sleep(delay).await;
if keys.is_empty() {
return Ok(Vec::new());
}
let mut last_err = None;
for attempt in 0..=self.maximum_retries {
match self.fetcher.fetch_values(keys.clone(), block).await {
Ok(values) => return Ok(values),
Err(err) => {
tracing::warn!("retrying fetch because error: {:?}", err);
last_err = Some(err);
}
None
}
.boxed()
});
shared.await.context("could not fetch liquidity")
if attempt < self.maximum_retries {
tokio::time::sleep(self.delay_between_retries).await;
}
}
Err(last_err.unwrap().context("could not fetch liquidity"))
}

async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
Expand Down Expand Up @@ -386,7 +365,7 @@ where
}

fn get(&mut self, key: K, block: Option<u64>) -> Option<&[V]> {
let allow_background_udpates = block.is_some();
let allow_background_updates = block.is_some();
let block = block.or_else(|| {
self.cached_most_recently_at_block
.get(&key)
Expand All @@ -396,7 +375,7 @@ where
})
})?;
let result = self.entries.get(&(block, key.clone())).map(Vec::as_slice);
if allow_background_udpates && result.is_some_and(|values| !values.is_empty()) {
if allow_background_updates && result.is_some_and(|values| !values.is_empty()) {
self.recently_used.cache_set(key, ());
}
result
Expand Down Expand Up @@ -433,21 +412,24 @@ where
fn remove_cached_blocks_older_than(&mut self, oldest_to_keep: u64) {
tracing::debug!("dropping blocks older than {} from cache", oldest_to_keep);
self.entries = self.entries.split_off(&(oldest_to_keep, K::first_ord()));

// Iterate from newest block to oldest block and only keep the most recent
// liquidity around to reduce memory consumption.
// Iterate from the newest block to the oldest block and only keep the most
// recent liquidity around to reduce memory consumption. Empty entries
// are valid negative cache entries and must be kept for the most recent
// block.
let mut cached_keys = HashSet::new();
let mut entries_to_remove = Vec::new();
let mut items = 0;
for ((_block, key), values) in self.entries.iter_mut().rev() {
if !cached_keys.insert(key) {
*values = vec![];
for ((block, key), values) in self.entries.iter().rev() {
if !cached_keys.insert((*key).clone()) {
entries_to_remove.push((*block, (*key).clone()));
} else {
items += values.len();
}
}
// Afterwards drop all entries that are now empty.
self.entries.retain(|_, values| !values.is_empty());

for entry in entries_to_remove {
self.entries.remove(&entry);
}
self.cached_most_recently_at_block
.retain(|_, block| *block >= oldest_to_keep);
tracing::debug!(
Expand Down Expand Up @@ -870,4 +852,65 @@ mod tests {
assert!(cache.mutexed.lock().unwrap().get(key, Some(8)).is_some());
assert!(cache.mutexed.lock().unwrap().get(key, None).is_some());
}

#[tokio::test]
async fn negative_cache_entries_survive_gc() {
// Key 0 has on-chain data; key 1 has none (negative cache entry).
let fetcher = FakeCacheFetcher::new(vec![TestValue::new(0, "a")]);
let block = |number| BlockInfo {
number,
..Default::default()
};
let (block_sender, block_stream) = tokio::sync::watch::channel(block(10));
let cache = RecentBlockCache::new(
CacheConfig {
number_of_blocks_to_cache: NonZeroU64::new(2).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(),
..Default::default()
},
fetcher,
block_stream,
"",
)
.unwrap()
.inner;

// Populate the cache: key 0 gets a value, key 1 gets a negative entry.
cache
.fetch(test_keys(0..2), Block::Number(10))
.await
.unwrap();
assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 2);

// Key 1 has no on-chain data so get() never adds it to recently_used
// (the guard requires a non-empty result). Add it manually so the
// background updater re-fetches it each cycle and re-inserts the
// negative entry, giving GC the opportunity to destroy it.
cache
.mutexed
.lock()
.unwrap()
.recently_used
.cache_set(TestKey(1), ());

// Advance two blocks, triggering GC each time. The updater re-fetches
// both keys, still finds no data for key 1, and re-inserts its negative
// entry.
block_sender.send(block(11)).unwrap();
cache.update_cache_at_block(11).await.unwrap();
block_sender.send(block(12)).unwrap();
cache.update_cache_at_block(12).await.unwrap();

// Negative entry for key 1 must still be in the cache — get() should
// return Some(&[]) rather than None.
let mut mutexed = cache.mutexed.lock().unwrap();
assert!(
mutexed.get(TestKey(0), None).is_some(),
"key 0 should still be cached"
);
assert!(
matches!(mutexed.get(TestKey(1), None), Some(&[])),
"key 1 negative entry must survive GC; got None (treated as cache miss)"
);
}
}
Loading