From a35f2b78b37cd19af3ccbea055280de443b74c70 Mon Sep 17 00:00:00 2001 From: Teagan Haddawy Date: Wed, 27 May 2026 12:47:50 -0700 Subject: [PATCH 1/4] test: add integration test for collapse from later vary edge case --- test-fixtures/src/bin/cache.rs | 73 ++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/test-fixtures/src/bin/cache.rs b/test-fixtures/src/bin/cache.rs index 6a1846b6..98422218 100644 --- a/test-fixtures/src/bin/cache.rs +++ b/test-fixtures/src/bin/cache.rs @@ -64,6 +64,7 @@ fn main() { run_test!(test_implicit_cancel_of_pending); run_test!(test_explicit_cancel); run_test!(test_collapse_across_vary); + run_test!(test_collapse_from_later_vary); run_test!(test_stream_back); run_test!(test_stream_back_fixed); @@ -1202,6 +1203,78 @@ fn test_collapse_across_vary() { assert!(!txn1.pending().unwrap()); } +fn test_collapse_from_later_vary() { + let key = new_key(); + let header_a = HeaderName::from_static("header-a"); + // Prefill two stale variants, both under Vary: header-a. + let b = insert(key.clone(), Duration::ZERO) + .header(&header_a, "foo") + .vary_by([&header_a]) + .execute() + .unwrap(); + b.finish().unwrap(); + + let b = insert(key.clone(), Duration::ZERO) + .header(&header_a, "bar") + .vary_by([&header_a]) + .execute() + .unwrap(); + b.finish().unwrap(); + + // Transaction 1 (foo) and Transaction 2 (bar) dispatch concurrently. + // Both see their respective expired variants and recieve GoGet. + let pending_txn1 = Transaction::lookup(key.clone()) + .header(&header_a, "foo") + .execute_async() + .unwrap(); + let pending_txn2 = Transaction::lookup(key.clone()) + .header(&header_a, "bar") + .execute_async() + .unwrap(); + assert!(!pending_txn1.pending().unwrap(), "txn1 should have a GoGet"); + assert!(!pending_txn2.pending().unwrap(), "txn2 should have a GoGet"); + + // Transaction 3 (bar) dispatches and collapses behind Transaction 2. + // Sleep briefly so the background task reaches sub.changed() before we proceed. + let pending_txn3 = Transaction::lookup(key.clone()) + .header(&header_a, "bar") + .execute_async() + .unwrap(); + std::thread::sleep(Duration::from_millis(50)); + assert!(pending_txn3.pending().unwrap(), "txn3 should be waiting on txn2"); + + let txn1 = pending_txn1.wait().unwrap(); + let txn2 = pending_txn2.wait().unwrap(); + assert!(txn1.must_insert_or_update()); + assert!(txn2.must_insert_or_update()); + + // Transaction 2 is abandoned before Transaction 1 completes. + // Must use cancel_insert_or_update() — drop() only releases the Rust wrapper; + // it does not close the host-side handle or release the Obligation. + txn2.cancel_insert_or_update().unwrap(); + // Sleep to let txn3's background task wake up from sub.changed() and re-run + // the transaction_get loop — where the bug causes it to issue a new GoGet + // instead of waiting for txn1. + std::thread::sleep(Duration::from_millis(50)); + + // Transaction 1 completes with an *empty* vary rule (No Vary header in response) + let mut writer = txn1.insert(Duration::from_secs(120)).execute().unwrap(); + writer.write_all(b"the-response").unwrap(); + writer.finish().unwrap(); + + // Transaction 3 must resolve as Found from Transaction 1 empty-vary insert, + // not as a GoGet requiring a new origin fetch + let txn3 = pending_txn3.wait().unwrap(); + assert!( + txn3.found().is_some(), + "txn3 should receive the cached entry from txn1's empty-vary insert" + ); + assert!( + !txn3.must_insert_or_update(), + "txn3 must not be issued a GoGet" + ); +} + fn test_stream_back() { let key = new_key(); From 9ecf2723d190209de1edce1a6e6eb8d28fb389e1 Mon Sep 17 00:00:00 2001 From: Teagan Haddawy Date: Wed, 27 May 2026 12:48:02 -0700 Subject: [PATCH 2/4] fix: extend transaction_get lookup to observe all pending obligations --- src/cache/store.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/cache/store.rs b/src/cache/store.rs index 73ddd42a..3de3006b 100644 --- a/src/cache/store.rs +++ b/src/cache/store.rs @@ -193,6 +193,14 @@ impl CacheKeyObjects { // obligation. awaitable = awaitable || cache_value.obligated; } + + // Also wait on any obligation under a *different* vary rule. + // An in-flight fetch could respond with a simpler vary rule (e.g. empty) + // that would satisfy this request, so we must not race to create a new GoGet. + // The worst case is a spurious wakeup; correctness is preserved either way. + if !awaitable { + awaitable = key_objects.objects.values().any(|v| v.obligated); + } } // Done computing awaitable, make it read-only: let awaitable = awaitable; From 5559cb323dfd80a14bfc73fa9b995953e5cd8cc4 Mon Sep 17 00:00:00 2001 From: Teagan Haddawy Date: Thu, 28 May 2026 13:03:10 -0700 Subject: [PATCH 3/4] test: add test for empty-vary cache collapsing --- src/cache/store.rs | 8 -------- test-fixtures/src/bin/cache.rs | 30 ++++++++++++------------------ 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/src/cache/store.rs b/src/cache/store.rs index 3de3006b..73ddd42a 100644 --- a/src/cache/store.rs +++ b/src/cache/store.rs @@ -193,14 +193,6 @@ impl CacheKeyObjects { // obligation. awaitable = awaitable || cache_value.obligated; } - - // Also wait on any obligation under a *different* vary rule. - // An in-flight fetch could respond with a simpler vary rule (e.g. empty) - // that would satisfy this request, so we must not race to create a new GoGet. - // The worst case is a spurious wakeup; correctness is preserved either way. - if !awaitable { - awaitable = key_objects.objects.values().any(|v| v.obligated); - } } // Done computing awaitable, make it read-only: let awaitable = awaitable; diff --git a/test-fixtures/src/bin/cache.rs b/test-fixtures/src/bin/cache.rs index 98422218..420b30e5 100644 --- a/test-fixtures/src/bin/cache.rs +++ b/test-fixtures/src/bin/cache.rs @@ -1204,9 +1204,13 @@ fn test_collapse_across_vary() { } fn test_collapse_from_later_vary() { + // Per issue #626: cache contains two stale variants under Vary: Header-A. + // Txn1 (foo), Txn2 (bar), Txn3 (bar) dispatch; Txn1 and Txn2 receive GoGet, + // Txn3 waits on Txn2. Txn1 completes with an *empty* vary rule, which can in + // principle fulfill Txn3. Txn2 is then abandoned. Txn3 should get Found. let key = new_key(); let header_a = HeaderName::from_static("header-a"); - // Prefill two stale variants, both under Vary: header-a. + let b = insert(key.clone(), Duration::ZERO) .header(&header_a, "foo") .vary_by([&header_a]) @@ -1221,8 +1225,6 @@ fn test_collapse_from_later_vary() { .unwrap(); b.finish().unwrap(); - // Transaction 1 (foo) and Transaction 2 (bar) dispatch concurrently. - // Both see their respective expired variants and recieve GoGet. let pending_txn1 = Transaction::lookup(key.clone()) .header(&header_a, "foo") .execute_async() @@ -1234,13 +1236,10 @@ fn test_collapse_from_later_vary() { assert!(!pending_txn1.pending().unwrap(), "txn1 should have a GoGet"); assert!(!pending_txn2.pending().unwrap(), "txn2 should have a GoGet"); - // Transaction 3 (bar) dispatches and collapses behind Transaction 2. - // Sleep briefly so the background task reaches sub.changed() before we proceed. let pending_txn3 = Transaction::lookup(key.clone()) .header(&header_a, "bar") .execute_async() .unwrap(); - std::thread::sleep(Duration::from_millis(50)); assert!(pending_txn3.pending().unwrap(), "txn3 should be waiting on txn2"); let txn1 = pending_txn1.wait().unwrap(); @@ -1248,22 +1247,17 @@ fn test_collapse_from_later_vary() { assert!(txn1.must_insert_or_update()); assert!(txn2.must_insert_or_update()); - // Transaction 2 is abandoned before Transaction 1 completes. - // Must use cancel_insert_or_update() — drop() only releases the Rust wrapper; - // it does not close the host-side handle or release the Obligation. - txn2.cancel_insert_or_update().unwrap(); - // Sleep to let txn3's background task wake up from sub.changed() and re-run - // the transaction_get loop — where the bug causes it to issue a new GoGet - // instead of waiting for txn1. - std::thread::sleep(Duration::from_millis(50)); - - // Transaction 1 completes with an *empty* vary rule (No Vary header in response) + // Txn1 completes with an empty vary rule (no .vary_by() on the insert). + // This pushes VaryRule::default() to the front of the key's vary_rules and + // inserts under the universal variant {}, which matches any request. let mut writer = txn1.insert(Duration::from_secs(120)).execute().unwrap(); writer.write_all(b"the-response").unwrap(); writer.finish().unwrap(); - // Transaction 3 must resolve as Found from Transaction 1 empty-vary insert, - // not as a GoGet requiring a new origin fetch + // Txn2 is abandoned after Txn1 completes (matches the issue's ordering). + txn2.cancel_insert_or_update().unwrap(); + + // Txn3 must resolve as Found from Txn1's empty-vary insert. let txn3 = pending_txn3.wait().unwrap(); assert!( txn3.found().is_some(), From b4641a769043ed5a4d059223b6cf3b59397d35c6 Mon Sep 17 00:00:00 2001 From: Teagan Haddawy Date: Thu, 28 May 2026 17:00:52 -0700 Subject: [PATCH 4/4] refactor: address quality of life improvements and removal of execute_async() --- test-fixtures/src/bin/cache.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/test-fixtures/src/bin/cache.rs b/test-fixtures/src/bin/cache.rs index 420b30e5..0b8dabf6 100644 --- a/test-fixtures/src/bin/cache.rs +++ b/test-fixtures/src/bin/cache.rs @@ -1225,36 +1225,30 @@ fn test_collapse_from_later_vary() { .unwrap(); b.finish().unwrap(); - let pending_txn1 = Transaction::lookup(key.clone()) + let txn1 = Transaction::lookup(key.clone()) .header(&header_a, "foo") - .execute_async() + .execute() .unwrap(); - let pending_txn2 = Transaction::lookup(key.clone()) + + let txn2 = Transaction::lookup(key.clone()) .header(&header_a, "bar") - .execute_async() + .execute() .unwrap(); - assert!(!pending_txn1.pending().unwrap(), "txn1 should have a GoGet"); - assert!(!pending_txn2.pending().unwrap(), "txn2 should have a GoGet"); let pending_txn3 = Transaction::lookup(key.clone()) .header(&header_a, "bar") .execute_async() .unwrap(); assert!(pending_txn3.pending().unwrap(), "txn3 should be waiting on txn2"); - - let txn1 = pending_txn1.wait().unwrap(); - let txn2 = pending_txn2.wait().unwrap(); assert!(txn1.must_insert_or_update()); assert!(txn2.must_insert_or_update()); - // Txn1 completes with an empty vary rule (no .vary_by() on the insert). - // This pushes VaryRule::default() to the front of the key's vary_rules and - // inserts under the universal variant {}, which matches any request. let mut writer = txn1.insert(Duration::from_secs(120)).execute().unwrap(); writer.write_all(b"the-response").unwrap(); writer.finish().unwrap(); - // Txn2 is abandoned after Txn1 completes (matches the issue's ordering). + // Abandon Txn2 to verify that Txn3 falls back to + // Txn1's wildcard entry instead of spawning a duplicate origin fetch. txn2.cancel_insert_or_update().unwrap(); // Txn3 must resolve as Found from Txn1's empty-vary insert.