Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 116 additions & 4 deletions crates/core/src/ring/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ pub const INTEREST_TTL: Duration = Duration::from_secs(INTEREST_HEARTBEAT_INTERV
/// Interval for background sweep to clean up expired interests.
pub const INTEREST_SWEEP_INTERVAL: Duration = Duration::from_secs(60); // 1 minute

/// Max distinct peers tracked as interested in a single contract.
/// Matches MAX_DOWNSTREAM_SUBSCRIBERS_PER_CONTRACT (hosting.rs) so the two
/// broadcast-target sources are symmetrically bounded (#3798 Gap 2).
pub(crate) const MAX_INTERESTED_PEERS_PER_CONTRACT: usize = 512;

/// Grace period before removing a disconnected peer's interests.
///
/// When a peer disconnects, we defer interest removal for this duration instead of
Expand Down Expand Up @@ -394,6 +399,25 @@ impl<T: TimeSource + Sync> InterestManager<T> {
// change for these four sites — see PR notes.
let mut entry = self.interested_peers.entry(*contract).or_default();
let is_new = !entry.contains_key(&peer);

// Cap distinct interested peers per contract to bound an adversarial
// broadcast-amplification vector (#3798 Gap 2). Reject BEFORE the
// reverse-index/hash writes below so a rejected peer leaves no zombie
// `peer_contracts` / `contract_hash_index` entry. Only a NEW peer at
// capacity is rejected — renewals of an already-tracked peer always
// proceed so a legit at-capacity contract keeps serving its peers.
// Returns `is_new = false` so a rejected adversary is not treated as a
// new viable target and cannot trigger the #4359 pending-broadcast flush.
if is_new && entry.len() >= MAX_INTERESTED_PEERS_PER_CONTRACT {
drop(entry);
tracing::warn!(
contract = %contract,
limit = MAX_INTERESTED_PEERS_PER_CONTRACT,
"Interested-peer limit reached, rejecting peer"
);
return false;
}

entry.insert(peer.clone(), PeerInterest::new(summary, is_upstream, now));

// Maintain reverse index for O(1) peer disconnect cleanup
Expand Down Expand Up @@ -1196,10 +1220,21 @@ mod tests {
ContractKey::from_id_and_code(ContractInstanceId::new(id), CodeHash::new(code))
}

fn make_peer_key(_seed: u8) -> PeerKey {
use crate::transport::TransportKeypair;
let keypair = TransportKeypair::new();
PeerKey(keypair.public().clone())
/// Build a deterministic peer key from a seed.
///
/// Deterministic-and-distinct so tests never rely on RNG distinctness:
/// distinct seeds always yield distinct keys, and the same seed always
/// yields the same key (mirrors the sibling `hosting.rs` test helper).
fn make_peer_key(seed: u8) -> PeerKey {
make_unique_peer_key(seed as u32)
}

/// Like `make_peer_key` but with a `u32` seed for tests that need more
/// than 256 pairwise-distinct peers (mirrors `make_unique_contract_key`).
fn make_unique_peer_key(seed: u32) -> PeerKey {
let mut bytes = [0u8; 32];
bytes[0..4].copy_from_slice(&seed.to_le_bytes());
PeerKey(crate::transport::TransportPublicKey::from_bytes(bytes))
}

fn make_manager() -> (TestInterestManager, SharedMockTimeSource) {
Expand Down Expand Up @@ -1233,6 +1268,83 @@ mod tests {
assert!(!manager.remove_peer_interest(&contract, &peer));
}

#[test]
fn test_register_peer_interest_caps_at_max() {
// #3798 Gap 2: a single contract's interested_peers map must be bounded
// so a peer flooding distinct identities cannot amplify every broadcast.
let (manager, _time) = make_manager();
let contract = make_contract_key(1);

// Fill to exactly MAX distinct peers — each is new and accepted.
// Keys are deterministic AND pairwise-distinct (derived from a u32
// counter), so the test never relies on RNG distinctness: a leaked
// thread-local GlobalRng seed or a one-in-a-billion keypair collision
// can no longer make the 513th registration spuriously non-new and
// skip the cap branch (the cold-build flake this hardening fixes).
let mut peers = Vec::with_capacity(MAX_INTERESTED_PEERS_PER_CONTRACT);
for i in 0..MAX_INTERESTED_PEERS_PER_CONTRACT {
let peer = make_unique_peer_key(i as u32);
assert!(
manager.register_peer_interest(&contract, peer.clone(), None, false),
"registering a fresh peer below capacity must return is_new = true"
);
peers.push(peer);
}
assert_eq!(
manager.get_interested_peers(&contract).len(),
MAX_INTERESTED_PEERS_PER_CONTRACT
);

// One MORE distinct peer is rejected: returns is_new = false (so it does
// NOT trigger the #4359 first-viable-target broadcast flush) and the map
// length is unchanged. Its seed is past the fill range, so it is
// guaranteed not already tracked.
let overflow_peer = make_unique_peer_key(MAX_INTERESTED_PEERS_PER_CONTRACT as u32);
assert!(
!manager.register_peer_interest(&contract, overflow_peer.clone(), None, false),
"a new peer at capacity must be rejected (is_new = false)"
);
assert_eq!(
manager.get_interested_peers(&contract).len(),
MAX_INTERESTED_PEERS_PER_CONTRACT,
"capacity must not be exceeded"
);

// Invariant: the rejected peer left NO zombie reverse-index entry.
assert!(
manager.get_contracts_for_peer(&overflow_peer).is_empty(),
"rejected peer must not appear in the peer_contracts reverse index"
);

// Renewals of an ALREADY-tracked peer are never rejected by capacity:
// re-registering an existing peer with an updated summary returns false
// (not new) but still refreshes the entry.
let existing = peers[0].clone();
let summary = StateSummary::from(vec![9, 9, 9]);
assert!(
!manager.register_peer_interest(
&contract,
existing.clone(),
Some(summary.clone()),
false
),
"renewal of an existing peer must return is_new = false"
);
assert_eq!(
manager.get_interested_peers(&contract).len(),
MAX_INTERESTED_PEERS_PER_CONTRACT,
"renewal must not change capacity"
);
let refreshed = manager
.get_peer_summary(&contract, &existing)
.expect("existing peer must still be present after renewal");
assert_eq!(
refreshed.as_ref(),
summary.as_ref(),
"renewal must update the existing peer's summary"
);
}

#[test]
fn test_update_peer_summary() {
let (manager, _time) = make_manager();
Expand Down
Loading