From 2d47f531989b9f563b3994d9643f0b575d9f3d57 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 27 May 2026 12:33:55 +0200 Subject: [PATCH 1/2] feat(gossipsub): Check whether published partials are stale --- .../gossipsub/src/behaviour/tests/partial.rs | 207 +++++++++++++----- .../src/extensions/partial_messages.rs | 44 +++- 2 files changed, 184 insertions(+), 67 deletions(-) diff --git a/protocols/gossipsub/src/behaviour/tests/partial.rs b/protocols/gossipsub/src/behaviour/tests/partial.rs index 907d30e3477..7fb4d328c8b 100644 --- a/protocols/gossipsub/src/behaviour/tests/partial.rs +++ b/protocols/gossipsub/src/behaviour/tests/partial.rs @@ -834,26 +834,28 @@ fn test_peer_disconnect_cleanup() { supports_partial: true, }, ); - // Publish again - if peer2's metadata was preserved, we'd only send - // missing parts (0b10101010). Since it was cleaned up on disconnect, - // we send all parts (0b11111111). - let actions = state1 - .handle_publish(topic_hash.clone(), message, HashSet::from([peer2])) - .expect("Publish should succeed"); - assert_eq!(actions.len(), 1); - let PublishAction::SendMessage { - peer_id, - rpc: RpcOut::PartialMessage(partial_msg), - } = &actions[0] - else { - panic!("Expected SendMessage with PartialMessage"); - }; - assert_eq!(*peer_id, peer2); - // Should send all parts since peer2's metadata was wiped on disconnect + // Re-publishing the identical partial would be skipped as stale, so verify cleanup via + // heartbeat re-gossip instead: since peer2's metadata was wiped on disconnect, heartbeat + // should re-send all parts. (If the state had been preserved, heartbeat would send nothing + // because publish_action would find nothing new for peer2.) + let empty_mesh = HashMap::new(); + let empty_fanout = HashMap::new(); + let actions = state1.heartbeat(&empty_mesh, &empty_fanout, 10, 1.0, 100); + let partial_msg = actions + .iter() + .find_map(|action| match action { + PublishAction::SendMessage { + peer_id, + rpc: RpcOut::PartialMessage(pm), + } if *peer_id == peer2 => Some(pm), + _ => None, + }) + .expect("Reconnected peer should be re-gossiped the partial via heartbeat"); + // Should send all parts since peer2's metadata was wiped on disconnect. let body = partial_msg.body.as_ref().expect("Should have body"); assert_eq!( body[0], 0b11111111, - "Should send all parts after reconnect (peer metadata was cleaned up)" + "Should re-send all parts after reconnect (peer metadata was cleaned up)" ); } @@ -901,28 +903,28 @@ fn test_peer_unsubscribed_cleanup() { supports_partial: true, }, ); - // Publish again - if peer2's metadata was preserved, we'd only send - // missing parts (0b10101010). Since it was cleaned up on unsubscribe, - // we send all parts (0b11111111). - let mut new_message = Bitmap::new(group_id); - new_message.fill_parts(0b11111111); - let actions = state1 - .handle_publish(topic_hash.clone(), new_message, HashSet::from([peer2])) - .expect("Publish should succeed"); - assert_eq!(actions.len(), 1); - let PublishAction::SendMessage { - peer_id, - rpc: RpcOut::PartialMessage(partial_msg), - } = &actions[0] - else { - panic!("Expected SendMessage with PartialMessage"); - }; - assert_eq!(*peer_id, peer2); - // Should send all parts since peer2's metadata was wiped on unsubscribe + // Re-publishing the identical partial would be skipped as stale, so verify cleanup via + // heartbeat re-gossip instead: since peer2's metadata was wiped on unsubscribe, heartbeat + // should re-send all parts. (If the state had been preserved, heartbeat would send nothing + // because publish_action would find nothing new for peer2.) + let empty_mesh = HashMap::new(); + let empty_fanout = HashMap::new(); + let actions = state1.heartbeat(&empty_mesh, &empty_fanout, 10, 1.0, 100); + let partial_msg = actions + .iter() + .find_map(|action| match action { + PublishAction::SendMessage { + peer_id, + rpc: RpcOut::PartialMessage(pm), + } if *peer_id == peer2 => Some(pm), + _ => None, + }) + .expect("Re-subscribed peer should be re-gossiped the partial via heartbeat"); + // Should send all parts since peer2's metadata was wiped on unsubscribe. let body = partial_msg.body.as_ref().expect("Should have body"); assert_eq!( body[0], 0b11111111, - "Should send all parts after peer re-subscribes (peer metadata was cleaned up)" + "Should re-send all parts after peer re-subscribes (peer metadata was cleaned up)" ); } @@ -979,8 +981,7 @@ fn test_peer_unsubscribed_preserves_other_topics() { let _ = state1.handle_received(peer2, peer2_partial_topic2); // Peer2 unsubscribes from topic1 only state1.peer_unsubscribed(peer2, &topic1); - // Publish on topic1 - peer2's state was cleared, should send all parts - // Re-subscribe peer2 to topic1 first + // Re-subscribe peer2 to topic1. state1.peer_subscribed( &peer2, topic1.clone(), @@ -989,23 +990,9 @@ fn test_peer_unsubscribed_preserves_other_topics() { supports_partial: true, }, ); - let actions1 = state1 - .handle_publish(topic1.clone(), message.clone(), HashSet::from([peer2])) - .expect("Publish should succeed"); - let PublishAction::SendMessage { - rpc: RpcOut::PartialMessage(partial_msg1), - .. - } = &actions1[0] - else { - panic!("Expected SendMessage with PartialMessage"); - }; - // Topic1: Should send all parts (state was cleared) - let body1 = partial_msg1.body.as_ref().expect("Should have body"); - assert_eq!( - body1[0], 0b11111111, - "Topic1: Should send all parts (peer metadata was cleared on unsubscribe)" - ); - // Publish on topic2 - peer2's state was preserved, should send only missing parts + // Publish on topic2 first - peer2's state there was preserved, so we should send only the + // missing parts. Done before the topic1 heartbeat check so the heartbeat does not alter + // peer2's topic2 state. let actions2 = state1 .handle_publish(topic2.clone(), message, HashSet::from([peer2])) .expect("Publish should succeed"); @@ -1022,6 +1009,28 @@ fn test_peer_unsubscribed_preserves_other_topics() { body2[0], 0b10101010, "Topic2: Should send only missing parts (peer metadata preserved)" ); + // Topic1: peer2's state was cleared on unsubscribe. Re-publishing the identical partial + // would be skipped as stale, so verify cleanup via heartbeat re-gossip instead: peer2 + // should be re-sent all parts on topic1. + let empty_mesh = HashMap::new(); + let empty_fanout = HashMap::new(); + let actions1 = state1.heartbeat(&empty_mesh, &empty_fanout, 10, 1.0, 100); + let partial_msg1 = actions1 + .iter() + .find_map(|action| match action { + PublishAction::SendMessage { + peer_id, + rpc: RpcOut::PartialMessage(pm), + } if *peer_id == peer2 && pm.topic_hash == topic1 => Some(pm), + _ => None, + }) + .expect("topic1 should be re-gossiped to the re-subscribed peer via heartbeat"); + // Topic1: Should re-send all parts (state was cleared on unsubscribe). + let body1 = partial_msg1.body.as_ref().expect("Should have body"); + assert_eq!( + body1[0], 0b11111111, + "Topic1: Should re-send all parts (peer metadata was cleared on unsubscribe)" + ); } /// Verifies that: @@ -2021,3 +2030,91 @@ fn test_ihave_not_sent_to_partial_peers() { "Peer with requestsPartial=false should receive IHAVE messages" ); } + +/// Verifies that: +/// - Republishing a partial that adds new parts is not stale and is published normally. +/// - Republishing a partial whose parts are a subset of the cached one adds nothing new, +/// so it is detected as stale: `handle_publish` returns no actions. +/// - A stale republish does NOT overwrite the cached partial: a peer that subsequently +/// requests the message is served the full cached partial, not the stale subset. +#[test] +fn test_handle_publish_skips_stale_partial() { + let topic_hash = TopicHash::from_raw("test-topic"); + let group_id: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8]; + let peer = PeerId::random(); + let requester = PeerId::random(); + let mut state = State::default(); + state.enable_partials_for_topic(topic_hash.clone(), true); + for p in [&peer, &requester] { + state.peer_subscribed( + p, + topic_hash.clone(), + SubscriptionOpts { + requests_partial: true, + supports_partial: true, + }, + ); + } + + // Publish a partial holding the lower four parts. This caches it locally. + let mut message = Bitmap::new(group_id); + message.fill_parts(0b00001111); + let actions = state + .handle_publish(topic_hash.clone(), message, HashSet::from([peer])) + .expect("Publish should succeed"); + assert_eq!(actions.len(), 1, "Initial publish should send one message"); + + // Republish a partial that adds the upper four parts. Not stale, should be sent. + let mut superset = Bitmap::new(group_id); + superset.fill_parts(0b11111111); + let actions = state + .handle_publish(topic_hash.clone(), superset, HashSet::from([peer])) + .expect("Publish should succeed"); + assert_eq!( + actions.len(), + 1, + "Republishing a partial with new parts should not be considered stale" + ); + + // Republish a partial that is a strict subset of what was already published. + // It adds nothing new, so it is stale: no actions are returned. + let mut stale = Bitmap::new(group_id); + stale.fill_parts(0b00000011); + let actions = state + .handle_publish(topic_hash.clone(), stale, HashSet::from([peer])) + .expect("Publish should succeed"); + assert!( + actions.is_empty(), + "Republishing a stale partial (subset of cached parts) should produce no actions" + ); + + // A fresh peer announces it has no parts. We should respond from the cached partial. + // If the stale republish had overwritten the cache, the response would only carry + // 0b00000011; instead it must carry the full 0b11111111 that is still cached. + let request = PartialMessage { + group_id: group_id.to_vec(), + topic_hash: topic_hash.clone(), + body: None, + metadata: Some(vec![0b00000000]), + }; + let mut actions = state.handle_received(requester, request); + assert_eq!(actions.len(), 1, "Should respond with the cached partial"); + let ReceivedAction::Publish(PublishAction::SendMessage { + peer_id, + rpc: RpcOut::PartialMessage(response), + }) = actions.remove(0) + else { + panic!("Expected a Publish(SendMessage) response"); + }; + assert_eq!(peer_id, requester); + assert_eq!( + response.metadata, + Some(vec![0b11111111]), + "A stale republish must not overwrite the cached partial" + ); + assert_eq!( + response.body.expect("Response should carry a body")[0], + 0b11111111, + "Response should serve all cached parts, not the stale subset" + ); +} diff --git a/protocols/gossipsub/src/extensions/partial_messages.rs b/protocols/gossipsub/src/extensions/partial_messages.rs index 9ce7b1c8936..f92ec01ac0e 100644 --- a/protocols/gossipsub/src/extensions/partial_messages.rs +++ b/protocols/gossipsub/src/extensions/partial_messages.rs @@ -20,7 +20,7 @@ use std::{ cmp::max, - collections::{BTreeSet, HashMap, HashSet}, + collections::{BTreeSet, HashMap, HashSet, hash_map::Entry}, fmt::{self, Debug}, }; @@ -493,13 +493,42 @@ impl State { )); }; + // Check whether this partial holds more parts than the previously cached one. + let group_id = partial_message.group_id(); + let metadata = partial_message.metadata(); + let cached = topic_partials.partial_messages.entry(group_id.clone()); + if let Entry::Occupied(cached) = &cached { + let mut old_metadata = cached.get().content.metadata(); + if !old_metadata + .update(metadata.as_slice()) + .map_err(PublishError::Partial)? + { + tracing::debug!( + ?metadata, + ?old_metadata, + ?group_id, + ?topic_hash, + "Partial was not updated due to stale data; skipping publish" + ); + // Metadata was not updated, so we do not need to send anything and keep the + // previously sent message in cache. + return Ok(vec![]); + } + } + + // Cache the sent partial before publishing, so the local node retains what it + // published even when there are currently no recipients to send it to. + let partial_message = &cached.insert_entry(LocalPartial { + content: Box::new(partial_message), + ttl: DEFAULT_PARTIAL_TTL, + }); + if recipients.is_empty() { tracing::debug!(topic = %topic_hash, "Recipient list for publishing partial message is empty"); return Err(PublishError::NoPeersSubscribedToTopic); } let mut actions = vec![]; - let group_id = partial_message.group_id(); let Some(topic_peers) = self.peer_subscriptions.get_mut(&topic_hash) else { tracing::error!(topic = %topic_hash, "No peers subscribed to topic"); return Err(PublishError::NoPeersSubscribedToTopic); @@ -516,7 +545,7 @@ impl State { peer_id, &topic_hash, &group_id, - &partial_message, + partial_message.get().content.as_ref(), remote_subscription, ) { Some(message @ PublishAction::SendMessage { .. }) => { @@ -530,15 +559,6 @@ impl State { } } - // Cache the sent partial - topic_partials.partial_messages.insert( - partial_message.group_id(), - LocalPartial { - content: Box::new(partial_message), - ttl: DEFAULT_PARTIAL_TTL, - }, - ); - Ok(actions) } From 8ffa4be54193ed79fa0223f53db3aa7ba15c70a2 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 27 May 2026 15:18:09 +0200 Subject: [PATCH 2/2] fmt --- protocols/gossipsub/src/behaviour/tests/partial.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/protocols/gossipsub/src/behaviour/tests/partial.rs b/protocols/gossipsub/src/behaviour/tests/partial.rs index 7fb4d328c8b..0144e536816 100644 --- a/protocols/gossipsub/src/behaviour/tests/partial.rs +++ b/protocols/gossipsub/src/behaviour/tests/partial.rs @@ -2033,10 +2033,10 @@ fn test_ihave_not_sent_to_partial_peers() { /// Verifies that: /// - Republishing a partial that adds new parts is not stale and is published normally. -/// - Republishing a partial whose parts are a subset of the cached one adds nothing new, -/// so it is detected as stale: `handle_publish` returns no actions. -/// - A stale republish does NOT overwrite the cached partial: a peer that subsequently -/// requests the message is served the full cached partial, not the stale subset. +/// - Republishing a partial whose parts are a subset of the cached one adds nothing new, so it is +/// detected as stale: `handle_publish` returns no actions. +/// - A stale republish does NOT overwrite the cached partial: a peer that subsequently requests the +/// message is served the full cached partial, not the stale subset. #[test] fn test_handle_publish_skips_stale_partial() { let topic_hash = TopicHash::from_raw("test-topic");