diff --git a/crates/core/src/ring/interest.rs b/crates/core/src/ring/interest.rs index 83422c793c..b051e4446e 100644 --- a/crates/core/src/ring/interest.rs +++ b/crates/core/src/ring/interest.rs @@ -66,6 +66,11 @@ pub const INTEREST_TTL: Duration = Duration::from_secs(INTEREST_HEARTBEAT_INTERV /// Interval for background sweep to clean up expired interests. pub const INTEREST_SWEEP_INTERVAL: Duration = Duration::from_secs(60); // 1 minute +/// Max distinct peers tracked as interested in a single contract. +/// Matches MAX_DOWNSTREAM_SUBSCRIBERS_PER_CONTRACT (hosting.rs) so the two +/// broadcast-target sources are symmetrically bounded (#3798 Gap 2). +pub(crate) const MAX_INTERESTED_PEERS_PER_CONTRACT: usize = 512; + /// Grace period before removing a disconnected peer's interests. /// /// When a peer disconnects, we defer interest removal for this duration instead of @@ -394,6 +399,25 @@ impl InterestManager { // change for these four sites — see PR notes. let mut entry = self.interested_peers.entry(*contract).or_default(); let is_new = !entry.contains_key(&peer); + + // Cap distinct interested peers per contract to bound an adversarial + // broadcast-amplification vector (#3798 Gap 2). Reject BEFORE the + // reverse-index/hash writes below so a rejected peer leaves no zombie + // `peer_contracts` / `contract_hash_index` entry. Only a NEW peer at + // capacity is rejected — renewals of an already-tracked peer always + // proceed so a legit at-capacity contract keeps serving its peers. + // Returns `is_new = false` so a rejected adversary is not treated as a + // new viable target and cannot trigger the #4359 pending-broadcast flush. + if is_new && entry.len() >= MAX_INTERESTED_PEERS_PER_CONTRACT { + drop(entry); + tracing::warn!( + contract = %contract, + limit = MAX_INTERESTED_PEERS_PER_CONTRACT, + "Interested-peer limit reached, rejecting peer" + ); + return false; + } + entry.insert(peer.clone(), PeerInterest::new(summary, is_upstream, now)); // Maintain reverse index for O(1) peer disconnect cleanup @@ -1196,10 +1220,21 @@ mod tests { ContractKey::from_id_and_code(ContractInstanceId::new(id), CodeHash::new(code)) } - fn make_peer_key(_seed: u8) -> PeerKey { - use crate::transport::TransportKeypair; - let keypair = TransportKeypair::new(); - PeerKey(keypair.public().clone()) + /// Build a deterministic peer key from a seed. + /// + /// Deterministic-and-distinct so tests never rely on RNG distinctness: + /// distinct seeds always yield distinct keys, and the same seed always + /// yields the same key (mirrors the sibling `hosting.rs` test helper). + fn make_peer_key(seed: u8) -> PeerKey { + make_unique_peer_key(seed as u32) + } + + /// Like `make_peer_key` but with a `u32` seed for tests that need more + /// than 256 pairwise-distinct peers (mirrors `make_unique_contract_key`). + fn make_unique_peer_key(seed: u32) -> PeerKey { + let mut bytes = [0u8; 32]; + bytes[0..4].copy_from_slice(&seed.to_le_bytes()); + PeerKey(crate::transport::TransportPublicKey::from_bytes(bytes)) } fn make_manager() -> (TestInterestManager, SharedMockTimeSource) { @@ -1233,6 +1268,83 @@ mod tests { assert!(!manager.remove_peer_interest(&contract, &peer)); } + #[test] + fn test_register_peer_interest_caps_at_max() { + // #3798 Gap 2: a single contract's interested_peers map must be bounded + // so a peer flooding distinct identities cannot amplify every broadcast. + let (manager, _time) = make_manager(); + let contract = make_contract_key(1); + + // Fill to exactly MAX distinct peers — each is new and accepted. + // Keys are deterministic AND pairwise-distinct (derived from a u32 + // counter), so the test never relies on RNG distinctness: a leaked + // thread-local GlobalRng seed or a one-in-a-billion keypair collision + // can no longer make the 513th registration spuriously non-new and + // skip the cap branch (the cold-build flake this hardening fixes). + let mut peers = Vec::with_capacity(MAX_INTERESTED_PEERS_PER_CONTRACT); + for i in 0..MAX_INTERESTED_PEERS_PER_CONTRACT { + let peer = make_unique_peer_key(i as u32); + assert!( + manager.register_peer_interest(&contract, peer.clone(), None, false), + "registering a fresh peer below capacity must return is_new = true" + ); + peers.push(peer); + } + assert_eq!( + manager.get_interested_peers(&contract).len(), + MAX_INTERESTED_PEERS_PER_CONTRACT + ); + + // One MORE distinct peer is rejected: returns is_new = false (so it does + // NOT trigger the #4359 first-viable-target broadcast flush) and the map + // length is unchanged. Its seed is past the fill range, so it is + // guaranteed not already tracked. + let overflow_peer = make_unique_peer_key(MAX_INTERESTED_PEERS_PER_CONTRACT as u32); + assert!( + !manager.register_peer_interest(&contract, overflow_peer.clone(), None, false), + "a new peer at capacity must be rejected (is_new = false)" + ); + assert_eq!( + manager.get_interested_peers(&contract).len(), + MAX_INTERESTED_PEERS_PER_CONTRACT, + "capacity must not be exceeded" + ); + + // Invariant: the rejected peer left NO zombie reverse-index entry. + assert!( + manager.get_contracts_for_peer(&overflow_peer).is_empty(), + "rejected peer must not appear in the peer_contracts reverse index" + ); + + // Renewals of an ALREADY-tracked peer are never rejected by capacity: + // re-registering an existing peer with an updated summary returns false + // (not new) but still refreshes the entry. + let existing = peers[0].clone(); + let summary = StateSummary::from(vec![9, 9, 9]); + assert!( + !manager.register_peer_interest( + &contract, + existing.clone(), + Some(summary.clone()), + false + ), + "renewal of an existing peer must return is_new = false" + ); + assert_eq!( + manager.get_interested_peers(&contract).len(), + MAX_INTERESTED_PEERS_PER_CONTRACT, + "renewal must not change capacity" + ); + let refreshed = manager + .get_peer_summary(&contract, &existing) + .expect("existing peer must still be present after renewal"); + assert_eq!( + refreshed.as_ref(), + summary.as_ref(), + "renewal must update the existing peer's summary" + ); + } + #[test] fn test_update_peer_summary() { let (manager, _time) = make_manager();