From 281d4e59f32d6112056254ae81ecbc5564b06f4e Mon Sep 17 00:00:00 2001 From: Ernesto Cambuston Date: Sat, 16 May 2026 19:00:14 -0700 Subject: [PATCH 1/5] fast_slow_store: make has_with_results aware of fast store and in-flight slow writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `FastSlowStore::has_with_results` previously only consulted the slow store, which caused two known bugs: 1. Fast-only writes (and the brief window between fast-store insert and slow-store write start) were invisible — callers got NotFound for blobs that were locally present, triggering redundant fetches. 2. Concurrent writers racing on the same digest could not see each other's in-flight slow-store writes, so the second writer re-uploaded what the first had nearly finished pushing. The fix layers three consultations in order: slow store first (authoritative for downstream consumers), then a `in_flight_slow_writes: Mutex` tracking active slow writes, then fast store as a final fallback for fast-only / pre-slow-write hits. In-flight tracking uses a cancel-safe RAII guard (`InFlightSlowWriteGuard`) registered at the start of each `update`/`update_with_whole_file` path that writes to the slow store, so a cancelled write future correctly removes itself from the map on Drop. Tests cover all four reachable cases: slow-hit short-circuit, fast-only hit, in-flight slow write visibility, and noop-slow-store fall-through. Provenance: equivalent to upstream commits f69aaf8e8 and 2d770d913e from TraceMachina/nativelink PR #2243, ported atomically to current main. --- nativelink-store/src/fast_slow_store.rs | 123 +++++++- .../tests/fast_slow_store_test.rs | 268 +++++++++++++++++- 2 files changed, 382 insertions(+), 9 deletions(-) diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index ab912f6b9..0bd382ff5 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -65,6 +65,12 @@ pub struct FastSlowStore { // actually it's faster because we're not downloading the file multiple // times are doing loads of duplicate IO. populating_digests: Mutex, Loader>>, + // Tracks keys whose slow-store write is currently in flight, along with + // the best-known size. Consulted by `has_with_results` so that a + // concurrent writer that has not yet finished pushing to the slow store + // is still visible to a concurrent existence check, preventing redundant + // duplicate uploads of the same blob. + in_flight_slow_writes: Mutex, u64>>, } // This guard ensures that the populating_digests is cleared even if the future @@ -91,6 +97,27 @@ impl LoaderGuard<'_> { } } +/// Cancel-safe RAII guard that removes an entry from +/// `FastSlowStore::in_flight_slow_writes` when dropped. This ensures the +/// map does not leak entries if the surrounding `update()` future is +/// cancelled before the slow-store write completes. +struct InFlightSlowWriteGuard { + weak_store: Weak, + key: Option>, +} + +impl Drop for InFlightSlowWriteGuard { + fn drop(&mut self) { + let Some(store) = self.weak_store.upgrade() else { + return; + }; + let Some(key) = self.key.take() else { + return; + }; + store.in_flight_slow_writes.lock().remove(&key); + } +} + impl Drop for LoaderGuard<'_> { fn drop(&mut self) { let Some(store) = self.weak_store.upgrade() else { @@ -126,9 +153,40 @@ impl FastSlowStore { weak_self: weak_self.clone(), metrics: FastSlowStoreMetrics::default(), populating_digests: Mutex::new(HashMap::new()), + in_flight_slow_writes: Mutex::new(HashMap::new()), }) } + /// Best-effort size for tracking in-flight slow writes. Falls back to 0 + /// when neither the upload size nor the key carries an exact size + /// (e.g. `MaxSize` for a string key). Callers of `has_with_results` only + /// rely on `Some(_)` vs `None`, so a size of 0 still correctly signals + /// "this blob exists". + const fn track_size(key: &StoreKey<'_>, size_info: UploadSizeInfo) -> u64 { + match size_info { + UploadSizeInfo::ExactSize(s) => s, + UploadSizeInfo::MaxSize(_) => match key { + StoreKey::Digest(d) => d.size_bytes(), + StoreKey::Str(_) => 0, + }, + } + } + + fn register_in_flight_slow_write( + &self, + key: StoreKey<'_>, + size: u64, + ) -> InFlightSlowWriteGuard { + let owned = key.into_owned(); + self.in_flight_slow_writes + .lock() + .insert(owned.borrow().into_owned(), size); + InFlightSlowWriteGuard { + weak_store: self.weak_self.clone(), + key: Some(owned), + } + } + pub const fn fast_store(&self) -> &Store { &self.fast_store } @@ -365,11 +423,55 @@ impl StoreDriver for FastSlowStore { if slow_store.optimized_for(StoreOptimizations::NoopDownloads) { return self.fast_store.has_with_results(key, results).await; } - // Only check the slow store because if it's not there, then something - // down stream might be unable to get it. This should not affect - // workers as they only use get() and a CAS can use an - // ExistenceCacheStore to avoid the bottleneck. - self.slow_store.has_with_results(key, results).await + // Primary lookup is the slow store because that's authoritative for + // downstream consumers that fetch from there. But the slow store + // alone can miss two important cases: + // 1. A concurrent writer's slow-store write is still in flight. + // 2. The blob is present in the fast (local) store — either + // fast-only by configuration, or because the slow write has + // not yet started/completed. + // Reporting NotFound in those cases causes redundant duplicate + // uploads or unnecessary slow-store fetches. + self.slow_store.has_with_results(key, results).await?; + + // Fill in any blobs whose slow-store write is currently in flight. + // Cheap when the map is empty (the common case). + { + let in_flight = self.in_flight_slow_writes.lock(); + if !in_flight.is_empty() { + for (k, result) in key.iter().zip(results.iter_mut()) { + if result.is_none() { + let owned = k.borrow().into_owned(); + if let Some(size) = in_flight.get(&owned) { + *result = Some(*size); + } + } + } + } + } + + // Fall back to the fast store for anything still missing. This + // covers fast-only writes and the brief window between fast-store + // insertion and slow-store write start. + let missing_indices: Vec = results + .iter() + .enumerate() + .filter_map(|(i, r)| if r.is_none() { Some(i) } else { None }) + .collect(); + if !missing_indices.is_empty() { + let missing_keys: Vec> = + missing_indices.iter().map(|&i| key[i].borrow()).collect(); + let mut fast_results = vec![None; missing_keys.len()]; + self.fast_store + .has_with_results(&missing_keys, &mut fast_results) + .await?; + for (j, &orig_idx) in missing_indices.iter().enumerate() { + if fast_results[j].is_some() { + results[orig_idx] = fast_results[j]; + } + } + } + Ok(()) } async fn update( @@ -405,9 +507,14 @@ impl StoreDriver for FastSlowStore { return self.fast_store.update(key, reader, size_info).await; } if ignore_fast { + let _guard = + self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info)); return self.slow_store.update(key, reader, size_info).await; } + let _slow_in_flight_guard = + self.register_in_flight_slow_write(key.borrow(), Self::track_size(&key, size_info)); + let (mut fast_tx, fast_rx) = make_buf_channel_pair(); let (mut slow_tx, slow_rx) = make_buf_channel_pair(); @@ -549,6 +656,10 @@ impl StoreDriver for FastSlowStore { { trace!("FastSlowStore::update_with_whole_file: uploading to slow_store"); let slow_start = std::time::Instant::now(); + let _guard = self.register_in_flight_slow_write( + key.borrow(), + Self::track_size(&key, upload_size), + ); slow_update_store_with_file( self.slow_store.as_store_driver_pin(), key.borrow(), @@ -598,6 +709,8 @@ impl StoreDriver for FastSlowStore { if ignore_slow { return Ok(Some(file)); } + let _guard = self + .register_in_flight_slow_write(key.borrow(), Self::track_size(&key, upload_size)); return self .slow_store .update_with_whole_file(key, path, file, upload_size) diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 7ab8b8d7d..5ffa9225e 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -389,8 +389,14 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { get_res.merge(read_res) } +// Previously `has()` returned `None` for a blob that was present only in the +// fast store. That behavior caused redundant work: a worker that had already +// cached a blob locally would still get NotFound from `has()` and re-fetch +// (or re-upload) the same data. `has_with_results` now falls back to the +// fast store when the slow store reports nothing, so a fast-only hit +// correctly returns the blob's size. #[nativelink_test] -async fn ignore_value_in_fast_store() -> Result<(), Error> { +async fn fast_store_only_value_is_reported_by_has() -> Result<(), Error> { let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); let slow_store = Store::new(MemoryStore::new(&MemorySpec::default())); let fast_slow_store = Arc::new(FastSlowStore::new( @@ -407,9 +413,10 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> { fast_store .update_oneshot(digest, make_random_data(100).into()) .await?; - assert!( - fast_slow_store.has(digest).await?.is_none(), - "Expected data to not exist in store" + assert_eq!( + fast_slow_store.has(digest).await?, + Some(100), + "Expected fast-store-only blob to be reported as present", ); Ok(()) } @@ -915,3 +922,256 @@ async fn dropping_a_follower_does_not_cancel_the_leader() -> Result<(), Error> { Ok(()) } + +/// While one writer's slow-store write is in flight, a concurrent `has()` +/// must report the blob as present so the second writer does not race and +/// re-upload the same data. +#[nativelink_test] +async fn has_sees_in_flight_slow_writes() -> Result<(), Error> { + #[derive(MetricsComponent)] + struct GatedSlowStore { + /// Released by the test to let the in-flight slow write complete. + gate: Mutex>>, + /// Signalled once the slow-store `update` has begun draining. + started_tx: Mutex>>, + } + + #[async_trait] + impl StoreDriver for GatedSlowStore { + async fn has_with_results( + self: Pin<&Self>, + _keys: &[StoreKey<'_>], + _results: &mut [Option], + ) -> Result<(), Error> { + // Slow store reports nothing — the in-flight tracking is what + // should fill the result in. + Ok(()) + } + + async fn update( + self: Pin<&Self>, + _key: StoreKey<'_>, + mut reader: DropCloserReadHalf, + _size_info: UploadSizeInfo, + ) -> Result<(), Error> { + let started_tx = self.started_tx.lock().unwrap().take(); + if let Some(tx) = started_tx { + let _ = tx.send(()); + } + let gate = self.gate.lock().unwrap().take(); + if let Some(rx) = gate { + let _ = rx.await; + } + reader.drain().await + } + + async fn get_part( + self: Pin<&Self>, + _key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + _offset: u64, + _length: Option, + ) -> Result<(), Error> { + writer.send_eof() + } + + fn inner_store(&self, _key: Option) -> &'_ dyn StoreDriver { + self + } + + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } + + fn register_remove_callback( + self: Arc, + _callback: Arc, + ) -> Result<(), Error> { + Ok(()) + } + } + + default_health_status_indicator!(GatedSlowStore); + + let (gate_tx, gate_rx) = tokio::sync::oneshot::channel::<()>(); + let (started_tx, started_rx) = tokio::sync::oneshot::channel::<()>(); + let slow = Arc::new(GatedSlowStore { + gate: Mutex::new(Some(gate_rx)), + started_tx: Mutex::new(Some(started_tx)), + }); + let fast = Store::new(MemoryStore::new(&MemorySpec::default())); + let fast_slow = Arc::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + }, + fast, + Store::new(slow.clone()), + )); + + let data = make_random_data(256); + let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap(); + + // Sanity: nothing in flight, slow store has nothing, fast store has + // nothing -> NotFound. + assert_eq!( + fast_slow.has(digest).await?, + None, + "Pre-condition: blob should be absent before any writer starts", + ); + + let writer_store = fast_slow.clone(); + let writer_data = data.clone(); + let writer = tokio::spawn(async move { + writer_store + .update_oneshot(digest, writer_data.into()) + .await + }); + + // Wait until the slow store's update is actually being driven, which + // proves the in-flight registration is live. + started_rx + .await + .map_err(|e| make_err!(Code::Internal, "started signal lost: {e:?}"))?; + + // Concurrent observer: the slow store will return None (its + // has_with_results above), so the only way this can be Some is via + // the in-flight map. + assert_eq!( + fast_slow.has(digest).await?, + Some(data.len() as u64), + "Concurrent has() must see in-flight slow write", + ); + + // Release the writer and confirm the in-flight tracker is cleaned up. + gate_tx + .send(()) + .map_err(|()| make_err!(Code::Internal, "Failed to release slow-store gate"))?; + writer + .await + .map_err(|e| make_err!(Code::Internal, "writer join error: {e:?}"))??; + + // After completion the fast store still has the blob, so has() should + // remain Some via the fast-store fallback (the GatedSlowStore still + // reports None). + assert_eq!( + fast_slow.has(digest).await?, + Some(data.len() as u64), + "Post-write has() should see the blob via fast-store fallback", + ); + + Ok(()) +} + +/// `has()` consults the fast store after the slow store reports `NotFound`. +/// This is asserted indirectly above by `fast_store_only_value_is_reported_by_has`; +/// here we additionally assert that when the slow store DOES have the blob, +/// the fast store is NOT consulted (avoiding the extra round trip). +#[nativelink_test] +async fn has_does_not_consult_fast_store_when_slow_store_hits() -> Result<(), Error> { + #[derive(MetricsComponent)] + struct CountingFastStore { + inner: Arc, + has_calls: Arc, + } + + #[async_trait] + impl StoreDriver for CountingFastStore { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + self.has_calls.fetch_add(1, Ordering::Acquire); + Pin::new(self.inner.as_ref()) + .has_with_results(keys, results) + .await + } + + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + reader: DropCloserReadHalf, + size_info: UploadSizeInfo, + ) -> Result<(), Error> { + Pin::new(self.inner.as_ref()) + .update(key, reader, size_info) + .await + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + Pin::new(self.inner.as_ref()) + .get_part(key, writer, offset, length) + .await + } + + fn inner_store(&self, _key: Option) -> &'_ dyn StoreDriver { + self + } + + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + + fn as_any_arc(self: Arc) -> Arc { + self + } + + fn register_remove_callback( + self: Arc, + _callback: Arc, + ) -> Result<(), Error> { + Ok(()) + } + } + + default_health_status_indicator!(CountingFastStore); + + let has_calls = Arc::new(AtomicU64::new(0)); + let fast_inner = MemoryStore::new(&MemorySpec::default()); + let fast = Store::new(Arc::new(CountingFastStore { + inner: fast_inner, + has_calls: has_calls.clone(), + })); + let slow = Store::new(MemoryStore::new(&MemorySpec::default())); + let fast_slow = Arc::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + }, + fast, + slow.clone(), + )); + + let data = make_random_data(128); + let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap(); + slow.update_oneshot(digest, data.clone().into()).await?; + + let before = has_calls.load(Ordering::Acquire); + assert_eq!( + fast_slow.has(digest).await?, + Some(data.len() as u64), + "Slow-store-only blob must be reported via slow lookup", + ); + let after = has_calls.load(Ordering::Acquire); + assert_eq!( + after, before, + "Fast store has() must not be consulted when the slow store already reports the blob", + ); + + Ok(()) +} From 4a5c21afae888fb89b1ff78ec798e1565db9099e Mon Sep 17 00:00:00 2001 From: Ernesto Cambuston Date: Sat, 16 May 2026 19:06:50 -0700 Subject: [PATCH 2/5] fast_slow_store_test: add cancel-safety and mixed-key has_with_results tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit added tests for the four reachable cases of the fast/slow/in-flight has() lookup, but two correctness properties were asserted only indirectly: 1. Cancel safety of `InFlightSlowWriteGuard`. The fix's central claim is that aborting an in-progress update() removes the key from the in_flight_slow_writes map via Drop. The previous in-flight test only verified the happy-path (writer completes normally) and the fast-store fallback masked any leak. 2. Per-key independence in batched has_with_results. With multiple keys spanning different storage tiers, an off-by-one in the missing_indices fallback or an over-broad in-flight match would silently corrupt results without any existing test noticing. New tests: * dropping_update_future_cleans_up_in_flight_entry: uses a gated-slow-store with NoopStore as fast (so the fast-store fallback cannot mask a leak), spawns a writer, waits on a oneshot started signal, aborts the writer mid-update, then asserts has() returns None — proving the guard's Drop ran. * has_with_results_handles_mixed_key_sources: builds a request with four keys — slow-only, in-flight, fast-only, and missing — and asserts each result independently matches its source. Catches index-mapping regressions in the batched fallback path. Both tests are deterministic (oneshot channels, no sleeps). Verified that reverting the fix in fast_slow_store.rs causes both new tests plus the existing has_sees_in_flight_slow_writes and fast_store_only_value_is_reported_by_has to fail. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/fast_slow_store_test.rs | 292 ++++++++++++++++++ 1 file changed, 292 insertions(+) diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 5ffa9225e..0ac311980 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -1175,3 +1175,295 @@ async fn has_does_not_consult_fast_store_when_slow_store_hits() -> Result<(), Er Ok(()) } + +// Helpers for the gated-slow-store tests below. A `GatedSlowStore2` that +// blocks `update()` on a oneshot gate and signals when it starts, with +// `has_with_results` always returning all-None so the in-flight map is the +// only thing that can satisfy a concurrent `has()`. +#[derive(MetricsComponent)] +struct GatedSlowStore2 { + gate: Mutex>>, + started_tx: Mutex>>, +} + +#[async_trait] +impl StoreDriver for GatedSlowStore2 { + async fn has_with_results( + self: Pin<&Self>, + _keys: &[StoreKey<'_>], + _results: &mut [Option], + ) -> Result<(), Error> { + Ok(()) + } + + async fn update( + self: Pin<&Self>, + _key: StoreKey<'_>, + mut reader: DropCloserReadHalf, + _size_info: UploadSizeInfo, + ) -> Result<(), Error> { + let started_tx = self.started_tx.lock().unwrap().take(); + if let Some(tx) = started_tx { + let _ = tx.send(()); + } + let gate = self.gate.lock().unwrap().take(); + if let Some(rx) = gate { + let _ = rx.await; + } + reader.drain().await + } + + async fn get_part( + self: Pin<&Self>, + _key: StoreKey<'_>, + writer: &mut DropCloserWriteHalf, + _offset: u64, + _length: Option, + ) -> Result<(), Error> { + writer.send_eof() + } + + fn inner_store(&self, _key: Option) -> &'_ dyn StoreDriver { + self + } + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + fn register_remove_callback( + self: Arc, + _callback: Arc, + ) -> Result<(), Error> { + Ok(()) + } +} + +default_health_status_indicator!(GatedSlowStore2); + +/// Cancel-safety: if the surrounding `update()` future is dropped before the +/// slow-store write completes, the `InFlightSlowWriteGuard` must remove the +/// key from `in_flight_slow_writes`. Otherwise the map would leak entries on +/// every cancelled upload and `has()` would falsely report cancelled blobs as +/// present forever. +/// +/// This test fails against the pre-fix code (which had no guard at all) and +/// would also fail if the guard's `Drop` impl were removed/broken. +#[nativelink_test] +async fn dropping_update_future_cleans_up_in_flight_entry() -> Result<(), Error> { + let (gate_tx, gate_rx) = tokio::sync::oneshot::channel::<()>(); + let (started_tx, started_rx) = tokio::sync::oneshot::channel::<()>(); + let slow = Arc::new(GatedSlowStore2 { + gate: Mutex::new(Some(gate_rx)), + started_tx: Mutex::new(Some(started_tx)), + }); + // Use a fast store that holds nothing (NoopStore) AND set fast_direction + // to ReadOnly so the update path is `ignore_fast` -> single slow call, + // and so the fast-store fallback in `has` returns None. This isolates + // the in-flight map as the only signal of presence during the upload. + let fast = Store::new(NoopStore::new()); + let fast_slow = Arc::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Noop(NoopSpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::ReadOnly, + slow_direction: StoreDirection::default(), + }, + fast, + Store::new(slow.clone()), + )); + + let data = make_random_data(64); + let digest = DigestInfo::try_new(VALID_HASH, data.len()).unwrap(); + + let writer_store = fast_slow.clone(); + let writer_data = data.clone(); + let writer = tokio::spawn(async move { + writer_store + .update_oneshot(digest, writer_data.into()) + .await + }); + + // Confirm the writer is parked inside slow_store.update() — at this + // point the in-flight guard is alive. + started_rx + .await + .map_err(|e| make_err!(Code::Internal, "started signal lost: {e:?}"))?; + assert_eq!( + fast_slow.has(digest).await?, + Some(data.len() as u64), + "Pre-cancel: in-flight slow write must be visible via has()", + ); + + // Cancel the writer. The guard's Drop should remove the entry. + writer.abort(); + // Awaiting a cancelled JoinHandle resolves; ignore the JoinError. + drop(writer.await); + + // The gate is now stale (writer is gone) — drop the receiver explicitly + // by releasing the sender to avoid any hang in unrelated code paths. + drop(gate_tx); + + assert_eq!( + fast_slow.has(digest).await?, + None, + "Post-cancel: in-flight entry must be removed; \ + has() must NOT see the cancelled upload", + ); + + Ok(()) +} + +// Two more valid 64-hex-char digests for the multi-key mixed test below. +const VALID_HASH_B: &str = "0123456789abcdef000000000000000000020000000000000123456789abcdef"; +const VALID_HASH_C: &str = "0123456789abcdef000000000000000000030000000000000123456789abcdef"; +const VALID_HASH_D: &str = "0123456789abcdef000000000000000000040000000000000123456789abcdef"; + +/// `has_with_results` must independently classify each requested key. With +/// four keys — one only in the slow store, one with an in-flight slow write, +/// one only in the fast store, and one absent everywhere — the returned +/// slice must reflect each key's true state and not e.g. report the +/// in-flight size for unrelated keys. +#[nativelink_test] +async fn has_with_results_handles_mixed_key_sources() -> Result<(), Error> { + let (gate_tx, gate_rx) = tokio::sync::oneshot::channel::<()>(); + let (started_tx, started_rx) = tokio::sync::oneshot::channel::<()>(); + let slow_inner = Arc::new(GatedSlowStore2 { + gate: Mutex::new(Some(gate_rx)), + started_tx: Mutex::new(Some(started_tx)), + }); + + // We wrap the gated slow store so `has_with_results` returns Some for + // the "slow-only" key (size 11) and None for the others. The simplest + // way: a thin wrapper that pre-populates a map of known sizes. + #[derive(MetricsComponent)] + struct MapBackedSlow { + inner: Arc, + known: std::collections::HashMap, + } + #[async_trait] + impl StoreDriver for MapBackedSlow { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + for (k, r) in keys.iter().zip(results.iter_mut()) { + if let StoreKey::Digest(d) = k + && let Some(sz) = self.known.get(d) + { + *r = Some(*sz); + } + } + Ok(()) + } + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + reader: DropCloserReadHalf, + size_info: UploadSizeInfo, + ) -> Result<(), Error> { + // Delegate to the gated inner so timing is controllable. + Pin::new(self.inner.as_ref()) + .update(key, reader, size_info) + .await + } + async fn get_part( + self: Pin<&Self>, + _k: StoreKey<'_>, + w: &mut DropCloserWriteHalf, + _o: u64, + _l: Option, + ) -> Result<(), Error> { + w.send_eof() + } + fn inner_store(&self, _k: Option) -> &'_ dyn StoreDriver { + self + } + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + fn register_remove_callback( + self: Arc, + _cb: Arc, + ) -> Result<(), Error> { + Ok(()) + } + } + default_health_status_indicator!(MapBackedSlow); + + let slow_only_size: u64 = 11; + let in_flight_size: u64 = 22; + let fast_only_size: u64 = 33; + + let slow_only_digest = DigestInfo::try_new(VALID_HASH, slow_only_size).unwrap(); + let in_flight_digest = DigestInfo::try_new(VALID_HASH_B, in_flight_size).unwrap(); + let fast_only_digest = DigestInfo::try_new(VALID_HASH_C, fast_only_size).unwrap(); + let missing_digest = DigestInfo::try_new(VALID_HASH_D, 44).unwrap(); + + let mut known = std::collections::HashMap::new(); + known.insert(slow_only_digest, slow_only_size); + let slow = Arc::new(MapBackedSlow { + inner: slow_inner.clone(), + known, + }); + + let fast = Store::new(MemoryStore::new(&MemorySpec::default())); + let fast_slow = Arc::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::ReadOnly, + slow_direction: StoreDirection::default(), + }, + fast.clone(), + Store::new(slow.clone()), + )); + + // Seed the fast store with the fast-only blob directly. + fast.update_oneshot(fast_only_digest, make_random_data(fast_only_size as usize).into()) + .await?; + + // Kick off the in-flight slow write and wait until it's parked. + let writer_store = fast_slow.clone(); + let writer = tokio::spawn(async move { + writer_store + .update_oneshot(in_flight_digest, make_random_data(in_flight_size as usize).into()) + .await + }); + started_rx + .await + .map_err(|e| make_err!(Code::Internal, "started signal lost: {e:?}"))?; + + // Now query all four keys in one call. + let keys: [StoreKey<'static>; 4] = [ + StoreKey::Digest(slow_only_digest), + StoreKey::Digest(in_flight_digest), + StoreKey::Digest(fast_only_digest), + StoreKey::Digest(missing_digest), + ]; + let mut results: [Option; 4] = [None; 4]; + fast_slow + .as_store_driver_pin() + .has_with_results(&keys, &mut results) + .await?; + + assert_eq!(results[0], Some(slow_only_size), "slow-only key"); + assert_eq!(results[1], Some(in_flight_size), "in-flight key"); + assert_eq!(results[2], Some(fast_only_size), "fast-only key"); + assert_eq!(results[3], None, "missing key must stay None"); + + // Cleanup: release the gated writer. + gate_tx + .send(()) + .map_err(|()| make_err!(Code::Internal, "Failed to release slow-store gate"))?; + writer + .await + .map_err(|e| make_err!(Code::Internal, "writer join error: {e:?}"))??; + + Ok(()) +} From d333a751cc2c401cdcc9cb4afbca2ecc514f4654 Mon Sep 17 00:00:00 2001 From: Ernesto Cambuston Date: Sat, 16 May 2026 20:01:04 -0700 Subject: [PATCH 3/5] fmt: apply rustfmt to nativelink-store Co-Authored-By: Claude Opus 4.7 (1M context) --- nativelink-store/tests/fast_slow_store_test.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 0ac311980..c26f7ed9e 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -1425,14 +1425,20 @@ async fn has_with_results_handles_mixed_key_sources() -> Result<(), Error> { )); // Seed the fast store with the fast-only blob directly. - fast.update_oneshot(fast_only_digest, make_random_data(fast_only_size as usize).into()) - .await?; + fast.update_oneshot( + fast_only_digest, + make_random_data(fast_only_size as usize).into(), + ) + .await?; // Kick off the in-flight slow write and wait until it's parked. let writer_store = fast_slow.clone(); let writer = tokio::spawn(async move { writer_store - .update_oneshot(in_flight_digest, make_random_data(in_flight_size as usize).into()) + .update_oneshot( + in_flight_digest, + make_random_data(in_flight_size as usize).into(), + ) .await }); started_rx From 5d3abbde0152b433069df2ce9d27b84afeef72ad Mon Sep 17 00:00:00 2001 From: Ernesto Cambuston Date: Sat, 16 May 2026 20:01:16 -0700 Subject: [PATCH 4/5] fast_slow_store_test: address clippy items_after_statements and cast_possible_truncation Hoist the `MapBackedSlow` test helper (struct, StoreDriver impl, and `default_health_status_indicator!` macro invocation) out of `has_with_results_handles_mixed_key_sources` to module scope so that items are declared before statements, satisfying `clippy::items_after_statements`. Replace `as usize` casts of the per-key `u64` sizes with `usize::try_from(...).unwrap()` to satisfy `clippy::cast_possible_truncation`, matching the existing conversion pattern elsewhere in this test file. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tests/fast_slow_store_test.rs | 129 +++++++++--------- 1 file changed, 65 insertions(+), 64 deletions(-) diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index c26f7ed9e..25e6ee4b3 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -1320,6 +1320,68 @@ const VALID_HASH_B: &str = "0123456789abcdef000000000000000000020000000000000123 const VALID_HASH_C: &str = "0123456789abcdef000000000000000000030000000000000123456789abcdef"; const VALID_HASH_D: &str = "0123456789abcdef000000000000000000040000000000000123456789abcdef"; +/// Wraps a `GatedSlowStore2` so `has_with_results` returns `Some` for any +/// digest pre-populated in `known` and `None` otherwise, while still +/// delegating writes through the gated inner so timing is controllable. +#[derive(MetricsComponent)] +struct MapBackedSlow { + inner: Arc, + known: std::collections::HashMap, +} +#[async_trait] +impl StoreDriver for MapBackedSlow { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + for (k, r) in keys.iter().zip(results.iter_mut()) { + if let StoreKey::Digest(d) = k + && let Some(sz) = self.known.get(d) + { + *r = Some(*sz); + } + } + Ok(()) + } + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + reader: DropCloserReadHalf, + size_info: UploadSizeInfo, + ) -> Result<(), Error> { + // Delegate to the gated inner so timing is controllable. + Pin::new(self.inner.as_ref()) + .update(key, reader, size_info) + .await + } + async fn get_part( + self: Pin<&Self>, + _k: StoreKey<'_>, + w: &mut DropCloserWriteHalf, + _o: u64, + _l: Option, + ) -> Result<(), Error> { + w.send_eof() + } + fn inner_store(&self, _k: Option) -> &'_ dyn StoreDriver { + self + } + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + fn register_remove_callback( + self: Arc, + _cb: Arc, + ) -> Result<(), Error> { + Ok(()) + } +} +default_health_status_indicator!(MapBackedSlow); + /// `has_with_results` must independently classify each requested key. With /// four keys — one only in the slow store, one with an in-flight slow write, /// one only in the fast store, and one absent everywhere — the returned @@ -1334,68 +1396,7 @@ async fn has_with_results_handles_mixed_key_sources() -> Result<(), Error> { started_tx: Mutex::new(Some(started_tx)), }); - // We wrap the gated slow store so `has_with_results` returns Some for - // the "slow-only" key (size 11) and None for the others. The simplest - // way: a thin wrapper that pre-populates a map of known sizes. - #[derive(MetricsComponent)] - struct MapBackedSlow { - inner: Arc, - known: std::collections::HashMap, - } - #[async_trait] - impl StoreDriver for MapBackedSlow { - async fn has_with_results( - self: Pin<&Self>, - keys: &[StoreKey<'_>], - results: &mut [Option], - ) -> Result<(), Error> { - for (k, r) in keys.iter().zip(results.iter_mut()) { - if let StoreKey::Digest(d) = k - && let Some(sz) = self.known.get(d) - { - *r = Some(*sz); - } - } - Ok(()) - } - async fn update( - self: Pin<&Self>, - key: StoreKey<'_>, - reader: DropCloserReadHalf, - size_info: UploadSizeInfo, - ) -> Result<(), Error> { - // Delegate to the gated inner so timing is controllable. - Pin::new(self.inner.as_ref()) - .update(key, reader, size_info) - .await - } - async fn get_part( - self: Pin<&Self>, - _k: StoreKey<'_>, - w: &mut DropCloserWriteHalf, - _o: u64, - _l: Option, - ) -> Result<(), Error> { - w.send_eof() - } - fn inner_store(&self, _k: Option) -> &'_ dyn StoreDriver { - self - } - fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { - self - } - fn as_any_arc(self: Arc) -> Arc { - self - } - fn register_remove_callback( - self: Arc, - _cb: Arc, - ) -> Result<(), Error> { - Ok(()) - } - } - default_health_status_indicator!(MapBackedSlow); - + // Populate the wrapper so `has_with_results` reports the slow-only key. let slow_only_size: u64 = 11; let in_flight_size: u64 = 22; let fast_only_size: u64 = 33; @@ -1427,7 +1428,7 @@ async fn has_with_results_handles_mixed_key_sources() -> Result<(), Error> { // Seed the fast store with the fast-only blob directly. fast.update_oneshot( fast_only_digest, - make_random_data(fast_only_size as usize).into(), + make_random_data(usize::try_from(fast_only_size).unwrap()).into(), ) .await?; @@ -1437,7 +1438,7 @@ async fn has_with_results_handles_mixed_key_sources() -> Result<(), Error> { writer_store .update_oneshot( in_flight_digest, - make_random_data(in_flight_size as usize).into(), + make_random_data(usize::try_from(in_flight_size).unwrap()).into(), ) .await }); From ed1bdcc85a68bc5fd6a609e7a6da40ba87756092 Mon Sep 17 00:00:00 2001 From: Ernesto Cambuston Date: Sat, 16 May 2026 21:21:37 -0700 Subject: [PATCH 5/5] ci: retrigger after GitHub 502 fetching hermetic_cc_toolchain tarball