Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 87 additions & 53 deletions crates/liquidity-sources/src/recent_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
//! - Automatically updating the cache is decoupled from normal on-chain data
//! fetches.
//!
//! A result of this is that it is possible that the same uncached entry is
//! requested multiple times simultaneously and some work is wasted. This is
//! unlikely to happen in practice and the value is going to be cached the next
//! time it is needed.
//! A result of this design is that simultaneous requests for the same
//! uncached entries can still perform some duplicate work before the results
//! are cached. However, cache misses are fetched in batches to avoid excessive
//! per-entry fan-out and RPC overhead.
//!
//! When entries are requested we mark all those entries as recently used which
//! potentially evicts other entries from the lru cache. Cache misses are
Expand All @@ -26,13 +26,12 @@

use {
alloy::eips::BlockId,
anyhow::{Context, Result},
anyhow::Result,
cached::{Cached, SizedCache},
ethrpc::block_stream::CurrentBlockWatcher,
futures::{FutureExt, StreamExt},
futures::StreamExt,
itertools::Itertools,
prometheus::IntCounterVec,
request_sharing::BoxRequestSharing,
std::{
cmp,
collections::{BTreeMap, HashMap, HashSet, hash_map::Entry},
Expand Down Expand Up @@ -109,7 +108,6 @@ where
delay_between_retries: Duration,
metrics: &'static Metrics,
metrics_label: &'static str,
requests: BoxRequestSharing<(K, Block), Option<Vec<V>>>,
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -179,7 +177,6 @@ where
delay_between_retries: config.delay_between_retries,
metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(),
metrics_label,
requests: BoxRequestSharing::labelled("liquidity_fetching".into()),
});

Self::spawn_gc_task(
Expand Down Expand Up @@ -246,41 +243,14 @@ where
}

async fn fetch_inner_many(&self, keys: HashSet<K>, block: Block) -> Result<Vec<V>> {
Comment thread
metalurgical marked this conversation as resolved.
let fetched =
futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block)))
.await;
let fetched: Vec<_> = fetched
.into_iter()
.filter_map(|res| res.ok())
.flatten()
.collect();
Ok(fetched)
}

// Sometimes nodes requests error when we try to get state from what we think is
// the current block when the node has been load balanced out to one that
// hasn't seen the block yet. As a workaround we repeat the request up to N
// times while sleeping in between.
async fn fetch_inner(&self, key: K, block: Block) -> Result<Vec<V>> {
let retries = self.maximum_retries;
let delay = self.delay_between_retries;
let fetcher = self.fetcher.clone();
let shared = self.requests.shared_or_else((key, block), |entry| {
let (key, block) = entry.clone();
async move {
for _ in 0..=retries {
let keys = [key.clone()].into();
match fetcher.fetch_values(keys, block).await {
Ok(values) => return Some(values),
Err(err) => tracing::warn!("retrying fetch because error: {:?}", err),
}
tokio::time::sleep(delay).await;
}
None
for _ in 0..=self.maximum_retries {
match self.fetcher.fetch_values(keys.clone(), block).await {
Ok(values) => return Ok(values),
Err(err) => tracing::warn!("retrying fetch because error: {:?}", err),
}
.boxed()
});
shared.await.context("could not fetch liquidity")
tokio::time::sleep(self.delay_between_retries).await;
}
anyhow::bail!("could not fetch liquidity");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current retry logic in fetch_inner_many has several issues:

  1. Unnecessary Latency: It performs a tokio::time::sleep even after the final retry attempt fails, adding redundant delay to the overall request time.
  2. Error Context Loss: By using anyhow::bail!, the original RPC error (e.g., timeout, node error) is discarded. Using .context() preserves the root cause for better troubleshooting.
  3. Batch Error Detail: According to repository guidelines for batch fetches, the implementation should explicitly indicate which items failed and provide error details for each, rather than returning a single error for the entire batch. Please ensure the response structure reflects individual item statuses.
        let mut keys = keys;
        for i in 0..=self.maximum_retries {
            let current_keys = if i == self.maximum_retries {
                keys
            } else {
                keys.clone()
            };
            match self.fetcher.fetch_values(current_keys, block).await {
                Ok(values) => return Ok(values),
                Err(err) => {
                    if i == self.maximum_retries {
                        return Err(anyhow::Error::from(err).context("could not fetch liquidity"));
                    }
                    tracing::warn!(?err, "retrying fetch");
                    tokio::time::sleep(self.delay_between_retries).await;
                }
            }
        }
        unreachable!()
References
  1. When fetching a batch of items where individual fetches can fail, do not silently ignore errors. The API response should explicitly indicate which items failed and provide error details for each failure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point 1 and 2 have been corrected.

Point 3 is a trait redesign, Result<Vec<V>> will need to become HashMap<K, Result<Vec<V>> in multiple places for this.

}

async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
Expand Down Expand Up @@ -386,7 +356,7 @@ where
}

fn get(&mut self, key: K, block: Option<u64>) -> Option<&[V]> {
let allow_background_udpates = block.is_some();
let allow_background_updates = block.is_some();
let block = block.or_else(|| {
self.cached_most_recently_at_block
.get(&key)
Expand All @@ -396,7 +366,7 @@ where
})
})?;
let result = self.entries.get(&(block, key.clone())).map(Vec::as_slice);
if allow_background_udpates && result.is_some_and(|values| !values.is_empty()) {
if allow_background_updates && result.is_some_and(|values| !values.is_empty()) {
self.recently_used.cache_set(key, ());
}
result
Expand Down Expand Up @@ -433,21 +403,24 @@ where
fn remove_cached_blocks_older_than(&mut self, oldest_to_keep: u64) {
tracing::debug!("dropping blocks older than {} from cache", oldest_to_keep);
self.entries = self.entries.split_off(&(oldest_to_keep, K::first_ord()));

// Iterate from newest block to oldest block and only keep the most recent
// liquidity around to reduce memory consumption.
// Iterate from the newest block to the oldest block and only keep the most
// recent liquidity around to reduce memory consumption. Empty entries
// are valid negative cache entries and must be kept for the most recent
// block.
let mut cached_keys = HashSet::new();
let mut entries_to_remove = Vec::new();
let mut items = 0;
for ((_block, key), values) in self.entries.iter_mut().rev() {
if !cached_keys.insert(key) {
*values = vec![];
for ((block, key), values) in self.entries.iter().rev() {
if !cached_keys.insert((*key).clone()) {
entries_to_remove.push((*block, (*key).clone()));
} else {
items += values.len();
}
}
// Afterwards drop all entries that are now empty.
self.entries.retain(|_, values| !values.is_empty());

for entry in entries_to_remove {
self.entries.remove(&entry);
}
self.cached_most_recently_at_block
.retain(|_, block| *block >= oldest_to_keep);
tracing::debug!(
Expand Down Expand Up @@ -870,4 +843,65 @@ mod tests {
assert!(cache.mutexed.lock().unwrap().get(key, Some(8)).is_some());
assert!(cache.mutexed.lock().unwrap().get(key, None).is_some());
}

#[tokio::test]
async fn negative_cache_entries_survive_gc() {
// Key 0 has on-chain data; key 1 has none (negative cache entry).
let fetcher = FakeCacheFetcher::new(vec![TestValue::new(0, "a")]);
let block = |number| BlockInfo {
number,
..Default::default()
};
let (block_sender, block_stream) = tokio::sync::watch::channel(block(10));
let cache = RecentBlockCache::new(
CacheConfig {
number_of_blocks_to_cache: NonZeroU64::new(2).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(),
..Default::default()
},
fetcher,
block_stream,
"",
)
.unwrap()
.inner;

// Populate the cache: key 0 gets a value, key 1 gets a negative entry.
cache
.fetch(test_keys(0..2), Block::Number(10))
.await
.unwrap();
assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 2);

// Key 1 has no on-chain data so get() never adds it to recently_used
// (the guard requires a non-empty result). Add it manually so the
// background updater re-fetches it each cycle and re-inserts the
// negative entry, giving GC the opportunity to destroy it.
cache
.mutexed
.lock()
.unwrap()
.recently_used
.cache_set(TestKey(1), ());

// Advance two blocks, triggering GC each time. The updater re-fetches
// both keys, still finds no data for key 1, and re-inserts its negative
// entry.
block_sender.send(block(11)).unwrap();
cache.update_cache_at_block(11).await.unwrap();
block_sender.send(block(12)).unwrap();
cache.update_cache_at_block(12).await.unwrap();

// Negative entry for key 1 must still be in the cache — get() should
// return Some(&[]) rather than None.
let mut mutexed = cache.mutexed.lock().unwrap();
assert!(
mutexed.get(TestKey(0), None).is_some(),
"key 0 should still be cached"
);
assert!(
matches!(mutexed.get(TestKey(1), None), Some(&[])),
"key 1 negative entry must survive GC; got None (treated as cache miss)"
);
}
}
Loading