Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 118 additions & 5 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ pub struct FastSlowStore {
// actually it's faster because we're not downloading the file multiple
// times are doing loads of duplicate IO.
populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>,
// Tracks keys whose slow-store write is currently in flight, along with
// the best-known size. Consulted by `has_with_results` so that a
// concurrent writer that has not yet finished pushing to the slow store
// is still visible to a concurrent existence check, preventing redundant
// duplicate uploads of the same blob.
in_flight_slow_writes: Mutex<HashMap<StoreKey<'static>, u64>>,
}

// This guard ensures that the populating_digests is cleared even if the future
Expand All @@ -91,6 +97,27 @@ impl LoaderGuard<'_> {
}
}

/// Cancel-safe RAII guard that removes an entry from
/// `FastSlowStore::in_flight_slow_writes` when dropped. This ensures the
/// map does not leak entries if the surrounding `update()` future is
/// cancelled before the slow-store write completes.
struct InFlightSlowWriteGuard {
weak_store: Weak<FastSlowStore>,
key: Option<StoreKey<'static>>,
}

impl Drop for InFlightSlowWriteGuard {
fn drop(&mut self) {
let Some(store) = self.weak_store.upgrade() else {
return;
};
let Some(key) = self.key.take() else {
return;
};
store.in_flight_slow_writes.lock().remove(&key);
}
}

impl Drop for LoaderGuard<'_> {
fn drop(&mut self) {
let Some(store) = self.weak_store.upgrade() else {
Expand Down Expand Up @@ -126,9 +153,40 @@ impl FastSlowStore {
weak_self: weak_self.clone(),
metrics: FastSlowStoreMetrics::default(),
populating_digests: Mutex::new(HashMap::new()),
in_flight_slow_writes: Mutex::new(HashMap::new()),
})
}

/// Best-effort size for tracking in-flight slow writes. Falls back to 0
/// when neither the upload size nor the key carries an exact size
/// (e.g. `MaxSize` for a string key). Callers of `has_with_results` only
/// rely on `Some(_)` vs `None`, so a size of 0 still correctly signals
/// "this blob exists".
const fn track_size(key: &StoreKey<'_>, size_info: UploadSizeInfo) -> u64 {
match size_info {
UploadSizeInfo::ExactSize(s) => s,
UploadSizeInfo::MaxSize(_) => match key {
StoreKey::Digest(d) => d.size_bytes(),
StoreKey::Str(_) => 0,
},
}
}

fn register_in_flight_slow_write(
&self,
key: StoreKey<'_>,
size: u64,
) -> InFlightSlowWriteGuard {
let owned = key.into_owned();
self.in_flight_slow_writes
.lock()
.insert(owned.borrow().into_owned(), size);
InFlightSlowWriteGuard {
weak_store: self.weak_self.clone(),
key: Some(owned),
}
}

pub const fn fast_store(&self) -> &Store {
&self.fast_store
}
Expand Down Expand Up @@ -365,11 +423,55 @@ impl StoreDriver for FastSlowStore {
if slow_store.optimized_for(StoreOptimizations::NoopDownloads) {
return self.fast_store.has_with_results(key, results).await;
}
// Only check the slow store because if it's not there, then something
// down stream might be unable to get it. This should not affect
// workers as they only use get() and a CAS can use an
// ExistenceCacheStore to avoid the bottleneck.
self.slow_store.has_with_results(key, results).await
// Primary lookup is the slow store because that's authoritative for
// downstream consumers that fetch from there. But the slow store
// alone can miss two important cases:
// 1. A concurrent writer's slow-store write is still in flight.
// 2. The blob is present in the fast (local) store — either
// fast-only by configuration, or because the slow write has
// not yet started/completed.
// Reporting NotFound in those cases causes redundant duplicate
// uploads or unnecessary slow-store fetches.
self.slow_store.has_with_results(key, results).await?;

// Fill in any blobs whose slow-store write is currently in flight.
// Cheap when the map is empty (the common case).
{
let in_flight = self.in_flight_slow_writes.lock();
if !in_flight.is_empty() {
for (k, result) in key.iter().zip(results.iter_mut()) {
if result.is_none() {
let owned = k.borrow().into_owned();
if let Some(size) = in_flight.get(&owned) {
*result = Some(*size);
}
}
}
}
}

// Fall back to the fast store for anything still missing. This
// covers fast-only writes and the brief window between fast-store
// insertion and slow-store write start.
let missing_indices: Vec<usize> = results
.iter()
.enumerate()
.filter_map(|(i, r)| if r.is_none() { Some(i) } else { None })
.collect();
if !missing_indices.is_empty() {
let missing_keys: Vec<StoreKey<'_>> =
missing_indices.iter().map(|&i| key[i].borrow()).collect();
let mut fast_results = vec![None; missing_keys.len()];
self.fast_store
.has_with_results(&missing_keys, &mut fast_results)
.await?;
for (j, &orig_idx) in missing_indices.iter().enumerate() {
if fast_results[j].is_some() {
results[orig_idx] = fast_results[j];
}
}
}
Ok(())
}

async fn update(
Expand Down Expand Up @@ -405,9 +507,14 @@ impl StoreDriver for FastSlowStore {
return self.fast_store.update(key, reader, size_info).await;
}
if ignore_fast {
let _guard =
self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info));
return self.slow_store.update(key, reader, size_info).await;
}

let _slow_in_flight_guard =
self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info));

let (mut fast_tx, fast_rx) = make_buf_channel_pair();
let (mut slow_tx, slow_rx) = make_buf_channel_pair();

Expand Down Expand Up @@ -549,6 +656,10 @@ impl StoreDriver for FastSlowStore {
{
trace!("FastSlowStore::update_with_whole_file: uploading to slow_store");
let slow_start = std::time::Instant::now();
let _guard = self.register_in_flight_slow_write(
key.borrow(),
Self::track_size(&key, upload_size),
);
slow_update_store_with_file(
self.slow_store.as_store_driver_pin(),
key.borrow(),
Expand Down Expand Up @@ -598,6 +709,8 @@ impl StoreDriver for FastSlowStore {
if ignore_slow {
return Ok(Some(file));
}
let _guard = self
.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, upload_size));
return self
.slow_store
.update_with_whole_file(key, path, file, upload_size)
Expand Down
Loading
Loading