From b0936b40d49460724d23b3d306a4bd60db8aa18b Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 17 Sep 2025 11:34:45 +0200 Subject: [PATCH 1/4] feat: filter out lower-height peers from PeerResponse messages Signed-off-by: ljedrz --- node/bft/src/gateway.rs | 6 +- node/router/messages/src/peer_response.rs | 72 ++++++++++++++++++++--- node/router/src/heartbeat.rs | 18 +++++- node/router/src/helpers/peer.rs | 10 +++- node/router/src/inbound.rs | 10 ++-- node/router/src/lib.rs | 35 +++++++---- node/tests/peering.rs | 10 +++- 7 files changed, 126 insertions(+), 35 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 6c72524b59..a40c58352f 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -963,10 +963,10 @@ impl Gateway { // The trusted ones are already handled by `handle_trusted_validators`. let trusted_validators = self.trusted_peers(); if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize { - for candidate_addr in self.candidate_peers() { - if !trusted_validators.contains(&candidate_addr) { + for peer in self.get_candidate_peers() { + if !trusted_validators.contains(&peer.listener_addr) { // Attempt to connect to unconnected validators. - self.connect(candidate_addr); + self.connect(peer.listener_addr); } } diff --git a/node/router/messages/src/peer_response.rs b/node/router/messages/src/peer_response.rs index 6c64cc0515..55b06f4dca 100644 --- a/node/router/messages/src/peer_response.rs +++ b/node/router/messages/src/peer_response.rs @@ -21,7 +21,7 @@ use std::borrow::Cow; #[derive(Clone, Debug, PartialEq, Eq)] pub struct PeerResponse { - pub peers: Vec, + pub peers: Vec<(SocketAddr, Option)>, } impl MessageTrait for PeerResponse { @@ -39,9 +39,20 @@ impl ToBytes for PeerResponse { return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Too many peers: {}", self.peers.len()))); } + // A version indicator; we don't expect empty peer responses, so a zero value can serve + // as an indicator that this message is to be processed differently. The version value + // can be changed to a 2 in the future, once everyone expects it there. + 0u8.write_le(&mut writer)?; + (self.peers.len() as u8).write_le(&mut writer)?; - for peer in self.peers.iter() { - peer.write_le(&mut writer)?; + for (addr, height) in self.peers.iter() { + addr.write_le(&mut writer)?; + if let Some(h) = height { + 1u8.write_le(&mut writer)?; + h.write_le(&mut writer)?; + } else { + 0u8.write_le(&mut writer)?; + } } Ok(()) } @@ -49,10 +60,38 @@ impl ToBytes for PeerResponse { impl FromBytes for PeerResponse { fn read_le(mut reader: R) -> io::Result { - let count = u8::read_le(&mut reader)?; + // Read the peer count if their heights aren't present; otherwise, interpret this value + // as the message version. It is a workaround for a currently missing version value. + // The worst-case scenario is if a node hasn't updated, and it gets a `PeerRequest` from + // its only peer who has; this would cause it to return a message that appears as if it + // contains heights (due to a leading `0`), but it would end up failing to deserialize. + // TODO: after a release or two, we should always be expecting the version to be present, + // simplifying the deserialization; also, remove the `empty_old_peerlist_handling` test. + let mut contains_heights = false; + let count_or_version = u8::read_le(&mut reader)?; + let count = if count_or_version == 0 { + // Version indicator found; this message will contain optional heights. + contains_heights = true; + // If the first value is a zero, the next u8 is the peer count. + u8::read_le(&mut reader)? + } else { + // A non-zero value indicates that this is the "old" PeerResponse without heights. + count_or_version + }; + let mut peers = Vec::with_capacity(count as usize); for _ in 0..count { - peers.push(SocketAddr::read_le(&mut reader)?); + let addr = SocketAddr::read_le(&mut reader)?; + let height = if contains_heights { + match u8::read_le(&mut reader)? { + 1 => Some(u32::read_le(&mut reader)?), + 0 => None, + _ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid peer height".to_string())), + } + } else { + None + }; + peers.push((addr, height)); } Ok(Self { peers }) @@ -69,14 +108,19 @@ pub mod prop_tests { collection::vec, prelude::{BoxedStrategy, Strategy, any}, }; - use std::net::{IpAddr, SocketAddr}; + use std::{ + io, + net::{IpAddr, SocketAddr}, + }; use test_strategy::proptest; - pub fn any_valid_socket_addr() -> BoxedStrategy { - any::<(IpAddr, u16)>().prop_map(|(ip_addr, port)| SocketAddr::new(ip_addr, port)).boxed() + pub fn any_valid_socket_addr() -> BoxedStrategy<(SocketAddr, Option)> { + any::<(IpAddr, u16, Option)>() + .prop_map(|(ip_addr, port, height)| (SocketAddr::new(ip_addr, port), height)) + .boxed() } - pub fn any_vec() -> BoxedStrategy> { + pub fn any_vec() -> BoxedStrategy)>> { vec(any_valid_socket_addr(), 0..50).prop_map(|v| v).boxed() } @@ -91,4 +135,14 @@ pub mod prop_tests { let decoded = PeerResponse::read_le(&mut bytes.into_inner().reader()).unwrap(); assert_eq!(decoded, peer_response); } + + // The following test will be obsolete once all the nodes handle heights in the `PeerResponse`. + #[test] + fn empty_old_peerlist_handling() { + // An empty `PeerResponse` without heights contains a single 0u8. + let serialized = &[0u8]; + let deserialized = PeerResponse::read_le(&serialized[..]).unwrap_err(); + // Check for the expected error. + assert_eq!(deserialized.kind(), io::ErrorKind::UnexpectedEof); + } } diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index 81ae779c54..3a52b17554 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -239,9 +239,21 @@ pub trait Heartbeat: Outbound { // Initialize an RNG. let rng = &mut OsRng; - // Attempt to connect to more peers. - for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) { - self.router().connect(peer_ip); + // Attempt to connect to more peers, separately choosing from those at a greater block + // height, and those whose height is lower or unknown to us. + let own_height = self.router().ledger.latest_block_height(); + let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self + .router() + .get_candidate_peers() + .into_iter() + .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false)); + // We may not know of half of `num_deficient` candidates; account for it using `min`. + let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len()); + for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) { + self.router().connect(peer.listener_addr); + } + for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) { + self.router().connect(peer.listener_addr); } if self.router().allow_external_peers() { diff --git a/node/router/src/helpers/peer.rs b/node/router/src/helpers/peer.rs index 087fee3436..51762891c2 100644 --- a/node/router/src/helpers/peer.rs +++ b/node/router/src/helpers/peer.rs @@ -45,6 +45,8 @@ pub struct CandidatePeer { pub listener_addr: SocketAddr, /// Indicates whether the peer is considered trusted. pub trusted: bool, + /// The latest block height known to be associated with the peer. + pub last_height_seen: Option, } /// A fully connected peer. @@ -73,7 +75,7 @@ pub struct ConnectedPeer { impl Peer { /// Create a candidate peer. pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self { - Self::Candidate(CandidatePeer { listener_addr, trusted }) + Self::Candidate(CandidatePeer { listener_addr, trusted, last_height_seen: None }) } /// Create a connecting peer. @@ -114,7 +116,11 @@ impl Peer { /// Demote a peer to candidate status, marking it as disconnected. pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) { - *self = Self::new_candidate(listener_addr, self.is_trusted()); + *self = Self::Candidate(CandidatePeer { + listener_addr, + trusted: self.is_trusted(), + last_height_seen: self.last_height_seen(), + }); } /// Returns the type of the node (only applicable to connected peers). diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 4722d0ee54..551ca152a8 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -341,7 +341,7 @@ pub trait Inbound: Reading + Outbound { // Truncate and convert to socket addrs. peers.truncate(MAX_PEERS_TO_SEND); - let peers = peers.into_iter().map(|peer| peer.listener_addr).collect(); + let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect(); // Send a `PeerResponse` message to the peer. self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers })); @@ -349,17 +349,17 @@ pub trait Inbound: Reading + Outbound { } /// Handles a `PeerResponse` message. - fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool { + fn peer_response(&self, _peer_ip: SocketAddr, peers: &[(SocketAddr, Option)]) -> bool { // Check if the number of peers received is less than MAX_PEERS_TO_SEND. if peers.len() > MAX_PEERS_TO_SEND { return false; } - // Filter out invalid addresses. + // Filter out invalid addresses and peers with a lower known block height. let peers = match self.router().is_dev() { // In development mode, relax the validity requirements to make operating devnets more flexible. - true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::>(), + true => peers.iter().copied().filter(|(ip, _)| !is_bogon_ip(ip.ip())).collect::>(), // In production mode, ensure the peer IPs are valid. - false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(*ip)).collect(), + false => peers.iter().copied().filter(|(ip, _)| self.router().is_valid_peer_ip(*ip)).collect(), }; // Adds the given peer IPs to the list of candidate peers. self.router().insert_candidate_peers(&peers); diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 33a9fcab2d..3a14590811 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -294,13 +294,19 @@ pub trait PeerPoolHandling: P2P { } /// Returns the list of candidate peers. - fn candidate_peers(&self) -> HashSet { + fn get_candidate_peers(&self) -> Vec { let banned_ips = self.tcp().banned_peers().get_banned_ips(); self.peer_pool() .read() .iter() .filter_map(|(addr, peer)| { - (matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr) + if let Peer::Candidate(peer) = peer + && !banned_ips.contains(&addr.ip()) + { + Some(peer.clone()) + } else { + None + } }) .collect() } @@ -607,7 +613,7 @@ impl Router { /// /// This method skips adding any given peers if the combined size exceeds the threshold, /// as the peer providing this list could be subverting the protocol. - pub fn insert_candidate_peers(&self, peers: &[SocketAddr]) { + pub fn insert_candidate_peers(&self, peers: &[(SocketAddr, Option)]) { // Compute the maximum number of candidate peers. let max_candidate_peers = Self::MAXIMUM_CANDIDATE_PEERS.saturating_sub(self.number_of_candidate_peers()); { @@ -615,16 +621,25 @@ impl Router { // Ensure the combined number of peers does not surpass the threshold. let eligible_peers = peers .iter() - .filter(|&peer_ip| { - // Ensure the peer is not itself, and is not already known. - !self.is_local_ip(*peer_ip) && !peer_pool.contains_key(peer_ip) + .filter(|(peer_ip, _)| { + // Ensure the peer is not itself or connected. + !self.is_local_ip(*peer_ip) && !self.is_connected(*peer_ip) }) - .take(max_candidate_peers) - .map(|addr| (*addr, Peer::new_candidate(*addr, false))) - .collect::>(); + .take(max_candidate_peers); // Proceed to insert the eligible candidate peer IPs. - peer_pool.extend(eligible_peers); + for (addr, height) in eligible_peers { + match peer_pool.entry(*addr) { + Entry::Vacant(entry) => { + entry.insert(Peer::new_candidate(*addr, false)); + } + Entry::Occupied(mut entry) => { + if let Peer::Candidate(peer) = entry.get_mut() { + peer.last_height_seen = *height; + } + } + } + } } #[cfg(feature = "metrics")] self.update_metrics(); diff --git a/node/tests/peering.rs b/node/tests/peering.rs index 3048540a77..85b38e933a 100644 --- a/node/tests/peering.rs +++ b/node/tests/peering.rs @@ -55,7 +55,10 @@ macro_rules! test_reject_unsolicited_peer_response { // Check the candidate peers. assert_eq!(node.router().number_of_candidate_peers(), 0); - let peers = vec!["1.1.1.1:1111".parse().unwrap(), "2.2.2.2:2222".parse().unwrap()]; + let peers = vec![ + ("1.1.1.1:1111".parse().unwrap(), None), + ("2.2.2.2:2222".parse().unwrap(), None), + ]; // Send a `PeerResponse` to the node. assert!( @@ -71,8 +74,9 @@ macro_rules! test_reject_unsolicited_peer_response { deadline!(Duration::from_secs(5), move || node_clone.router().number_of_connected_peers() == 0); // Make sure the sent addresses weren't inserted in the candidate peers. - for peer in peers { - assert!(!node.router().candidate_peers().contains(&peer)); + let candidate_peer_addrs = node.router().get_candidate_peers().into_iter().map(|peer| peer.listener_addr).collect::>(); + for (peer, _) in peers { + assert!(!candidate_peer_addrs.contains(&peer)); } } } From e15eb1aec1764dfa468187aff6f31a84e130af4c Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 6 Oct 2025 09:56:15 +0200 Subject: [PATCH 2/4] tweak: apply review comments Signed-off-by: ljedrz --- node/router/src/inbound.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 551ca152a8..f9cc816580 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -354,7 +354,7 @@ pub trait Inbound: Reading + Outbound { if peers.len() > MAX_PEERS_TO_SEND { return false; } - // Filter out invalid addresses and peers with a lower known block height. + // Filter out invalid addresses. let peers = match self.router().is_dev() { // In development mode, relax the validity requirements to make operating devnets more flexible. true => peers.iter().copied().filter(|(ip, _)| !is_bogon_ip(ip.ip())).collect::>(), From 0bd1c81c51f8a3b02bc0b2b86a08a5ed0e1600b1 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Tue, 7 Oct 2025 10:15:03 +0200 Subject: [PATCH 3/4] fix: post-merge cleanups Signed-off-by: ljedrz --- node/bft/src/gateway.rs | 2 +- node/router/src/inbound.rs | 2 +- node/router/src/lib.rs | 10 ++++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 0fdd49230e..a75617ad33 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -748,7 +748,7 @@ impl Gateway { (self.account.address() != aleo_addr && !self.is_connected_address(aleo_addr) && self.is_authorized_validator_address(aleo_addr)) - .then_some(listener_addr) + .then_some((listener_addr, None)) }) .collect::>(); if !valid_addrs.is_empty() { diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 1951b55b04..5dee57e69e 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -349,7 +349,7 @@ pub trait Inbound: Reading + Outbound { } /// Handles a `PeerResponse` message. - fn peer_response(&self, _peer_ip: SocketAddr, peers: &[(SocketAddr, Option)]) -> bool { + fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option)>) -> bool { // Check if the number of peers received is less than MAX_PEERS_TO_SEND. if peers.len() > MAX_PEERS_TO_SEND { return false; diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index da41a194be..c3869770fb 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -269,13 +269,15 @@ pub trait PeerPoolHandling: P2P { // Insert or update the applicable candidate peers. for (addr, height) in listener_addrs { - match peer_pool.entry(*addr) { + match peer_pool.entry(addr) { Entry::Vacant(entry) => { - entry.insert(Peer::new_candidate(*addr, false)); + entry.insert(Peer::new_candidate(addr, false)); } Entry::Occupied(mut entry) => { - if let Peer::Candidate(peer) = entry.get_mut() { - peer.last_height_seen = *height; + if height.is_some() { + if let Peer::Candidate(peer) = entry.get_mut() { + peer.last_height_seen = height; + } } } } From 0ecfd99f2cc29c75cd6c99deee5f69c1f28e9041 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Tue, 7 Oct 2025 10:41:43 +0200 Subject: [PATCH 4/4] tweak: post-merge logic updates Signed-off-by: ljedrz --- node/router/src/lib.rs | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index c3869770fb..8a505805e4 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -219,11 +219,20 @@ pub trait PeerPoolHandling: P2P { // based on multiple batches of candidate peers, and to not overwrite any entries. let mut peer_pool = self.peer_pool().write(); - // Perform filtering to ensure candidate validity. - listener_addrs.retain(|&(addr, _)| { - peer_pool.get(&addr).map(|peer| peer.is_candidate()).unwrap_or(true) - && !self.is_ip_banned(addr.ip()) + // Perform filtering to ensure candidate validity. Also count how many entries are updates. + let mut num_updates: usize = 0; + listener_addrs.retain(|&(addr, height)| { + !self.is_ip_banned(addr.ip()) && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) } + && peer_pool + .get(&addr) + .map(|peer| peer.is_candidate() && height.is_some()) + .inspect(|is_valid_update| { + if *is_valid_update { + num_updates += 1 + } + }) + .unwrap_or(true) }); // If we've managed to filter out every entry, there's nothing to do. @@ -232,7 +241,9 @@ pub trait PeerPoolHandling: P2P { } // If we're about to exceed the peer pool size limit, apply candidate slashing. - if self.number_of_peers() + listener_addrs.len() >= Self::MAXIMUM_POOL_SIZE && Self::PEER_SLASHING_COUNT != 0 { + if self.number_of_peers() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE + && Self::PEER_SLASHING_COUNT != 0 + { // Collect the addresses of prospect peers. let mut peers_to_slash = peer_pool .iter() @@ -274,10 +285,8 @@ pub trait PeerPoolHandling: P2P { entry.insert(Peer::new_candidate(addr, false)); } Entry::Occupied(mut entry) => { - if height.is_some() { - if let Peer::Candidate(peer) = entry.get_mut() { - peer.last_height_seen = height; - } + if let Peer::Candidate(peer) = entry.get_mut() { + peer.last_height_seen = height; } } } @@ -406,19 +415,10 @@ pub trait PeerPoolHandling: P2P { /// Returns the list of candidate peers. fn get_candidate_peers(&self) -> Vec { - let banned_ips = self.tcp().banned_peers().get_banned_ips(); self.peer_pool() .read() - .iter() - .filter_map(|(addr, peer)| { - if let Peer::Candidate(peer) = peer - && !banned_ips.contains(&addr.ip()) - { - Some(peer.clone()) - } else { - None - } - }) + .values() + .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None }) .collect() }