diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index a8e65f86bc..a75617ad33 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -748,7 +748,7 @@ impl Gateway { (self.account.address() != aleo_addr && !self.is_connected_address(aleo_addr) && self.is_authorized_validator_address(aleo_addr)) - .then_some(listener_addr) + .then_some((listener_addr, None)) }) .collect::>(); if !valid_addrs.is_empty() { @@ -931,10 +931,10 @@ impl Gateway { // The trusted ones are already handled by `handle_trusted_validators`. let trusted_validators = self.trusted_peers(); if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize { - for candidate_addr in self.candidate_peers() { - if !trusted_validators.contains(&candidate_addr) { + for peer in self.get_candidate_peers() { + if !trusted_validators.contains(&peer.listener_addr) { // Attempt to connect to unconnected validators. - self.connect(candidate_addr); + self.connect(peer.listener_addr); } } diff --git a/node/router/messages/src/peer_response.rs b/node/router/messages/src/peer_response.rs index 6c64cc0515..55b06f4dca 100644 --- a/node/router/messages/src/peer_response.rs +++ b/node/router/messages/src/peer_response.rs @@ -21,7 +21,7 @@ use std::borrow::Cow; #[derive(Clone, Debug, PartialEq, Eq)] pub struct PeerResponse { - pub peers: Vec, + pub peers: Vec<(SocketAddr, Option)>, } impl MessageTrait for PeerResponse { @@ -39,9 +39,20 @@ impl ToBytes for PeerResponse { return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Too many peers: {}", self.peers.len()))); } + // A version indicator; we don't expect empty peer responses, so a zero value can serve + // as an indicator that this message is to be processed differently. The version value + // can be changed to a 2 in the future, once everyone expects it there. + 0u8.write_le(&mut writer)?; + (self.peers.len() as u8).write_le(&mut writer)?; - for peer in self.peers.iter() { - peer.write_le(&mut writer)?; + for (addr, height) in self.peers.iter() { + addr.write_le(&mut writer)?; + if let Some(h) = height { + 1u8.write_le(&mut writer)?; + h.write_le(&mut writer)?; + } else { + 0u8.write_le(&mut writer)?; + } } Ok(()) } @@ -49,10 +60,38 @@ impl ToBytes for PeerResponse { impl FromBytes for PeerResponse { fn read_le(mut reader: R) -> io::Result { - let count = u8::read_le(&mut reader)?; + // Read the peer count if their heights aren't present; otherwise, interpret this value + // as the message version. It is a workaround for a currently missing version value. + // The worst-case scenario is if a node hasn't updated, and it gets a `PeerRequest` from + // its only peer who has; this would cause it to return a message that appears as if it + // contains heights (due to a leading `0`), but it would end up failing to deserialize. + // TODO: after a release or two, we should always be expecting the version to be present, + // simplifying the deserialization; also, remove the `empty_old_peerlist_handling` test. + let mut contains_heights = false; + let count_or_version = u8::read_le(&mut reader)?; + let count = if count_or_version == 0 { + // Version indicator found; this message will contain optional heights. + contains_heights = true; + // If the first value is a zero, the next u8 is the peer count. + u8::read_le(&mut reader)? + } else { + // A non-zero value indicates that this is the "old" PeerResponse without heights. + count_or_version + }; + let mut peers = Vec::with_capacity(count as usize); for _ in 0..count { - peers.push(SocketAddr::read_le(&mut reader)?); + let addr = SocketAddr::read_le(&mut reader)?; + let height = if contains_heights { + match u8::read_le(&mut reader)? { + 1 => Some(u32::read_le(&mut reader)?), + 0 => None, + _ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid peer height".to_string())), + } + } else { + None + }; + peers.push((addr, height)); } Ok(Self { peers }) @@ -69,14 +108,19 @@ pub mod prop_tests { collection::vec, prelude::{BoxedStrategy, Strategy, any}, }; - use std::net::{IpAddr, SocketAddr}; + use std::{ + io, + net::{IpAddr, SocketAddr}, + }; use test_strategy::proptest; - pub fn any_valid_socket_addr() -> BoxedStrategy { - any::<(IpAddr, u16)>().prop_map(|(ip_addr, port)| SocketAddr::new(ip_addr, port)).boxed() + pub fn any_valid_socket_addr() -> BoxedStrategy<(SocketAddr, Option)> { + any::<(IpAddr, u16, Option)>() + .prop_map(|(ip_addr, port, height)| (SocketAddr::new(ip_addr, port), height)) + .boxed() } - pub fn any_vec() -> BoxedStrategy> { + pub fn any_vec() -> BoxedStrategy)>> { vec(any_valid_socket_addr(), 0..50).prop_map(|v| v).boxed() } @@ -91,4 +135,14 @@ pub mod prop_tests { let decoded = PeerResponse::read_le(&mut bytes.into_inner().reader()).unwrap(); assert_eq!(decoded, peer_response); } + + // The following test will be obsolete once all the nodes handle heights in the `PeerResponse`. + #[test] + fn empty_old_peerlist_handling() { + // An empty `PeerResponse` without heights contains a single 0u8. + let serialized = &[0u8]; + let deserialized = PeerResponse::read_le(&serialized[..]).unwrap_err(); + // Check for the expected error. + assert_eq!(deserialized.kind(), io::ErrorKind::UnexpectedEof); + } } diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index 81ae779c54..3a52b17554 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -239,9 +239,21 @@ pub trait Heartbeat: Outbound { // Initialize an RNG. let rng = &mut OsRng; - // Attempt to connect to more peers. - for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) { - self.router().connect(peer_ip); + // Attempt to connect to more peers, separately choosing from those at a greater block + // height, and those whose height is lower or unknown to us. + let own_height = self.router().ledger.latest_block_height(); + let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self + .router() + .get_candidate_peers() + .into_iter() + .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false)); + // We may not know of half of `num_deficient` candidates; account for it using `min`. + let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len()); + for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) { + self.router().connect(peer.listener_addr); + } + for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) { + self.router().connect(peer.listener_addr); } if self.router().allow_external_peers() { diff --git a/node/router/src/helpers/peer.rs b/node/router/src/helpers/peer.rs index 087fee3436..51762891c2 100644 --- a/node/router/src/helpers/peer.rs +++ b/node/router/src/helpers/peer.rs @@ -45,6 +45,8 @@ pub struct CandidatePeer { pub listener_addr: SocketAddr, /// Indicates whether the peer is considered trusted. pub trusted: bool, + /// The latest block height known to be associated with the peer. + pub last_height_seen: Option, } /// A fully connected peer. @@ -73,7 +75,7 @@ pub struct ConnectedPeer { impl Peer { /// Create a candidate peer. pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self { - Self::Candidate(CandidatePeer { listener_addr, trusted }) + Self::Candidate(CandidatePeer { listener_addr, trusted, last_height_seen: None }) } /// Create a connecting peer. @@ -114,7 +116,11 @@ impl Peer { /// Demote a peer to candidate status, marking it as disconnected. pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) { - *self = Self::new_candidate(listener_addr, self.is_trusted()); + *self = Self::Candidate(CandidatePeer { + listener_addr, + trusted: self.is_trusted(), + last_height_seen: self.last_height_seen(), + }); } /// Returns the type of the node (only applicable to connected peers). diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 8d7e8ccc46..5dee57e69e 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -341,7 +341,7 @@ pub trait Inbound: Reading + Outbound { // Truncate and convert to socket addrs. peers.truncate(MAX_PEERS_TO_SEND); - let peers = peers.into_iter().map(|peer| peer.listener_addr).collect(); + let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect(); // Send a `PeerResponse` message to the peer. self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers })); @@ -349,7 +349,7 @@ pub trait Inbound: Reading + Outbound { } /// Handles a `PeerResponse` message. - fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec) -> bool { + fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option)>) -> bool { // Check if the number of peers received is less than MAX_PEERS_TO_SEND. if peers.len() > MAX_PEERS_TO_SEND { return false; diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 33fed85735..8a505805e4 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -212,17 +212,27 @@ pub trait PeerPoolHandling: P2P { } /// Adds new candidate peers to the peer pool, ensuring their validity and following the - /// limit on the number of peers in the pool. - fn insert_candidate_peers(&self, mut listener_addrs: Vec) { + /// limit on the number of peers in the pool. The listener addresses may be paired with + /// the last known block height of the associated peer. + fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option)>) { // Hold a write guard from now on, so as not to accidentally slash multiple times // based on multiple batches of candidate peers, and to not overwrite any entries. let mut peer_pool = self.peer_pool().write(); - // Perform filtering to ensure candidate validity. - listener_addrs.retain(|&addr| { - !peer_pool.contains_key(&addr) - && !self.is_ip_banned(addr.ip()) + // Perform filtering to ensure candidate validity. Also count how many entries are updates. + let mut num_updates: usize = 0; + listener_addrs.retain(|&(addr, height)| { + !self.is_ip_banned(addr.ip()) && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) } + && peer_pool + .get(&addr) + .map(|peer| peer.is_candidate() && height.is_some()) + .inspect(|is_valid_update| { + if *is_valid_update { + num_updates += 1 + } + }) + .unwrap_or(true) }); // If we've managed to filter out every entry, there's nothing to do. @@ -231,7 +241,9 @@ pub trait PeerPoolHandling: P2P { } // If we're about to exceed the peer pool size limit, apply candidate slashing. - if self.number_of_peers() + listener_addrs.len() >= Self::MAXIMUM_POOL_SIZE && Self::PEER_SLASHING_COUNT != 0 { + if self.number_of_peers() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE + && Self::PEER_SLASHING_COUNT != 0 + { // Collect the addresses of prospect peers. let mut peers_to_slash = peer_pool .iter() @@ -266,9 +278,18 @@ pub trait PeerPoolHandling: P2P { return; } - // Insert new candidate peers. - for addr in listener_addrs { - peer_pool.insert(addr, Peer::new_candidate(addr, false)); + // Insert or update the applicable candidate peers. + for (addr, height) in listener_addrs { + match peer_pool.entry(addr) { + Entry::Vacant(entry) => { + entry.insert(Peer::new_candidate(addr, false)); + } + Entry::Occupied(mut entry) => { + if let Peer::Candidate(peer) = entry.get_mut() { + peer.last_height_seen = height; + } + } + } } } @@ -393,14 +414,11 @@ pub trait PeerPoolHandling: P2P { } /// Returns the list of candidate peers. - fn candidate_peers(&self) -> Vec { - let banned_ips = self.tcp().banned_peers().get_banned_ips(); + fn get_candidate_peers(&self) -> Vec { self.peer_pool() .read() - .iter() - .filter_map(|(addr, peer)| { - (matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr) - }) + .values() + .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None }) .collect() } diff --git a/node/tests/peering.rs b/node/tests/peering.rs index 3048540a77..85b38e933a 100644 --- a/node/tests/peering.rs +++ b/node/tests/peering.rs @@ -55,7 +55,10 @@ macro_rules! test_reject_unsolicited_peer_response { // Check the candidate peers. assert_eq!(node.router().number_of_candidate_peers(), 0); - let peers = vec!["1.1.1.1:1111".parse().unwrap(), "2.2.2.2:2222".parse().unwrap()]; + let peers = vec![ + ("1.1.1.1:1111".parse().unwrap(), None), + ("2.2.2.2:2222".parse().unwrap(), None), + ]; // Send a `PeerResponse` to the node. assert!( @@ -71,8 +74,9 @@ macro_rules! test_reject_unsolicited_peer_response { deadline!(Duration::from_secs(5), move || node_clone.router().number_of_connected_peers() == 0); // Make sure the sent addresses weren't inserted in the candidate peers. - for peer in peers { - assert!(!node.router().candidate_peers().contains(&peer)); + let candidate_peer_addrs = node.router().get_candidate_peers().into_iter().map(|peer| peer.listener_addr).collect::>(); + for (peer, _) in peers { + assert!(!candidate_peer_addrs.contains(&peer)); } } }