Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ impl CacheKeyObjects {
// obligation.
awaitable = awaitable || cache_value.obligated;
}

// Also wait on any obligation under a *different* vary rule.
// An in-flight fetch could respond with a simpler vary rule (e.g. empty)
// that would satisfy this request, so we must not race to create a new GoGet.
// The worst case is a spurious wakeup; correctness is preserved either way.
if !awaitable {
awaitable = key_objects.objects.values().any(|v| v.obligated);
}
}
// Done computing awaitable, make it read-only:
let awaitable = awaitable;
Expand Down
73 changes: 73 additions & 0 deletions test-fixtures/src/bin/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ fn main() {
run_test!(test_implicit_cancel_of_pending);
run_test!(test_explicit_cancel);
run_test!(test_collapse_across_vary);
run_test!(test_collapse_from_later_vary);

run_test!(test_stream_back);
run_test!(test_stream_back_fixed);
Expand Down Expand Up @@ -1202,6 +1203,78 @@ fn test_collapse_across_vary() {
assert!(!txn1.pending().unwrap());
}

fn test_collapse_from_later_vary() {
let key = new_key();
let header_a = HeaderName::from_static("header-a");
// Prefill two stale variants, both under Vary: header-a.
let b = insert(key.clone(), Duration::ZERO)
.header(&header_a, "foo")
.vary_by([&header_a])
.execute()
.unwrap();
b.finish().unwrap();

let b = insert(key.clone(), Duration::ZERO)
.header(&header_a, "bar")
.vary_by([&header_a])
.execute()
.unwrap();
b.finish().unwrap();

// Transaction 1 (foo) and Transaction 2 (bar) dispatch concurrently.
// Both see their respective expired variants and recieve GoGet.
let pending_txn1 = Transaction::lookup(key.clone())
Comment thread
cceckman-at-fastly marked this conversation as resolved.
Outdated
.header(&header_a, "foo")
.execute_async()
.unwrap();
let pending_txn2 = Transaction::lookup(key.clone())
.header(&header_a, "bar")
.execute_async()
.unwrap();
assert!(!pending_txn1.pending().unwrap(), "txn1 should have a GoGet");
assert!(!pending_txn2.pending().unwrap(), "txn2 should have a GoGet");

// Transaction 3 (bar) dispatches and collapses behind Transaction 2.
// Sleep briefly so the background task reaches sub.changed() before we proceed.
Comment thread
cceckman-at-fastly marked this conversation as resolved.
Outdated
let pending_txn3 = Transaction::lookup(key.clone())
.header(&header_a, "bar")
.execute_async()
.unwrap();
std::thread::sleep(Duration::from_millis(50));
assert!(pending_txn3.pending().unwrap(), "txn3 should be waiting on txn2");

let txn1 = pending_txn1.wait().unwrap();
let txn2 = pending_txn2.wait().unwrap();
assert!(txn1.must_insert_or_update());
assert!(txn2.must_insert_or_update());

// Transaction 2 is abandoned before Transaction 1 completes.
// Must use cancel_insert_or_update() — drop() only releases the Rust wrapper;
// it does not close the host-side handle or release the Obligation.
Comment thread
cceckman-at-fastly marked this conversation as resolved.
Outdated
txn2.cancel_insert_or_update().unwrap();
// Sleep to let txn3's background task wake up from sub.changed() and re-run
// the transaction_get loop — where the bug causes it to issue a new GoGet
// instead of waiting for txn1.
std::thread::sleep(Duration::from_millis(50));

// Transaction 1 completes with an *empty* vary rule (No Vary header in response)
let mut writer = txn1.insert(Duration::from_secs(120)).execute().unwrap();
writer.write_all(b"the-response").unwrap();
writer.finish().unwrap();

// Transaction 3 must resolve as Found from Transaction 1 empty-vary insert,
// not as a GoGet requiring a new origin fetch
let txn3 = pending_txn3.wait().unwrap();
assert!(
txn3.found().is_some(),
"txn3 should receive the cached entry from txn1's empty-vary insert"
);
assert!(
!txn3.must_insert_or_update(),
"txn3 must not be issued a GoGet"
);
}

fn test_stream_back() {
let key = new_key();

Expand Down
Loading