Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 152 additions & 55 deletions protocols/gossipsub/src/behaviour/tests/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,26 +834,28 @@ fn test_peer_disconnect_cleanup() {
supports_partial: true,
},
);
// Publish again - if peer2's metadata was preserved, we'd only send
// missing parts (0b10101010). Since it was cleaned up on disconnect,
// we send all parts (0b11111111).
let actions = state1
.handle_publish(topic_hash.clone(), message, HashSet::from([peer2]))
.expect("Publish should succeed");
assert_eq!(actions.len(), 1);
let PublishAction::SendMessage {
peer_id,
rpc: RpcOut::PartialMessage(partial_msg),
} = &actions[0]
else {
panic!("Expected SendMessage with PartialMessage");
};
assert_eq!(*peer_id, peer2);
// Should send all parts since peer2's metadata was wiped on disconnect
// Re-publishing the identical partial would be skipped as stale, so verify cleanup via
// heartbeat re-gossip instead: since peer2's metadata was wiped on disconnect, heartbeat
// should re-send all parts. (If the state had been preserved, heartbeat would send nothing
// because publish_action would find nothing new for peer2.)
let empty_mesh = HashMap::new();
let empty_fanout = HashMap::new();
let actions = state1.heartbeat(&empty_mesh, &empty_fanout, 10, 1.0, 100);
let partial_msg = actions
.iter()
.find_map(|action| match action {
PublishAction::SendMessage {
peer_id,
rpc: RpcOut::PartialMessage(pm),
} if *peer_id == peer2 => Some(pm),
_ => None,
})
.expect("Reconnected peer should be re-gossiped the partial via heartbeat");
// Should send all parts since peer2's metadata was wiped on disconnect.
let body = partial_msg.body.as_ref().expect("Should have body");
assert_eq!(
body[0], 0b11111111,
"Should send all parts after reconnect (peer metadata was cleaned up)"
"Should re-send all parts after reconnect (peer metadata was cleaned up)"
);
}

Expand Down Expand Up @@ -901,28 +903,28 @@ fn test_peer_unsubscribed_cleanup() {
supports_partial: true,
},
);
// Publish again - if peer2's metadata was preserved, we'd only send
// missing parts (0b10101010). Since it was cleaned up on unsubscribe,
// we send all parts (0b11111111).
let mut new_message = Bitmap::new(group_id);
new_message.fill_parts(0b11111111);
let actions = state1
.handle_publish(topic_hash.clone(), new_message, HashSet::from([peer2]))
.expect("Publish should succeed");
assert_eq!(actions.len(), 1);
let PublishAction::SendMessage {
peer_id,
rpc: RpcOut::PartialMessage(partial_msg),
} = &actions[0]
else {
panic!("Expected SendMessage with PartialMessage");
};
assert_eq!(*peer_id, peer2);
// Should send all parts since peer2's metadata was wiped on unsubscribe
// Re-publishing the identical partial would be skipped as stale, so verify cleanup via
// heartbeat re-gossip instead: since peer2's metadata was wiped on unsubscribe, heartbeat
// should re-send all parts. (If the state had been preserved, heartbeat would send nothing
// because publish_action would find nothing new for peer2.)
let empty_mesh = HashMap::new();
let empty_fanout = HashMap::new();
let actions = state1.heartbeat(&empty_mesh, &empty_fanout, 10, 1.0, 100);
let partial_msg = actions
.iter()
.find_map(|action| match action {
PublishAction::SendMessage {
peer_id,
rpc: RpcOut::PartialMessage(pm),
} if *peer_id == peer2 => Some(pm),
_ => None,
})
.expect("Re-subscribed peer should be re-gossiped the partial via heartbeat");
// Should send all parts since peer2's metadata was wiped on unsubscribe.
let body = partial_msg.body.as_ref().expect("Should have body");
assert_eq!(
body[0], 0b11111111,
"Should send all parts after peer re-subscribes (peer metadata was cleaned up)"
"Should re-send all parts after peer re-subscribes (peer metadata was cleaned up)"
);
}

Expand Down Expand Up @@ -979,8 +981,7 @@ fn test_peer_unsubscribed_preserves_other_topics() {
let _ = state1.handle_received(peer2, peer2_partial_topic2);
// Peer2 unsubscribes from topic1 only
state1.peer_unsubscribed(peer2, &topic1);
// Publish on topic1 - peer2's state was cleared, should send all parts
// Re-subscribe peer2 to topic1 first
// Re-subscribe peer2 to topic1.
state1.peer_subscribed(
&peer2,
topic1.clone(),
Expand All @@ -989,23 +990,9 @@ fn test_peer_unsubscribed_preserves_other_topics() {
supports_partial: true,
},
);
let actions1 = state1
.handle_publish(topic1.clone(), message.clone(), HashSet::from([peer2]))
.expect("Publish should succeed");
let PublishAction::SendMessage {
rpc: RpcOut::PartialMessage(partial_msg1),
..
} = &actions1[0]
else {
panic!("Expected SendMessage with PartialMessage");
};
// Topic1: Should send all parts (state was cleared)
let body1 = partial_msg1.body.as_ref().expect("Should have body");
assert_eq!(
body1[0], 0b11111111,
"Topic1: Should send all parts (peer metadata was cleared on unsubscribe)"
);
// Publish on topic2 - peer2's state was preserved, should send only missing parts
// Publish on topic2 first - peer2's state there was preserved, so we should send only the
// missing parts. Done before the topic1 heartbeat check so the heartbeat does not alter
// peer2's topic2 state.
let actions2 = state1
.handle_publish(topic2.clone(), message, HashSet::from([peer2]))
.expect("Publish should succeed");
Expand All @@ -1022,6 +1009,28 @@ fn test_peer_unsubscribed_preserves_other_topics() {
body2[0], 0b10101010,
"Topic2: Should send only missing parts (peer metadata preserved)"
);
// Topic1: peer2's state was cleared on unsubscribe. Re-publishing the identical partial
// would be skipped as stale, so verify cleanup via heartbeat re-gossip instead: peer2
// should be re-sent all parts on topic1.
let empty_mesh = HashMap::new();
let empty_fanout = HashMap::new();
let actions1 = state1.heartbeat(&empty_mesh, &empty_fanout, 10, 1.0, 100);
let partial_msg1 = actions1
.iter()
.find_map(|action| match action {
PublishAction::SendMessage {
peer_id,
rpc: RpcOut::PartialMessage(pm),
} if *peer_id == peer2 && pm.topic_hash == topic1 => Some(pm),
_ => None,
})
.expect("topic1 should be re-gossiped to the re-subscribed peer via heartbeat");
// Topic1: Should re-send all parts (state was cleared on unsubscribe).
let body1 = partial_msg1.body.as_ref().expect("Should have body");
assert_eq!(
body1[0], 0b11111111,
"Topic1: Should re-send all parts (peer metadata was cleared on unsubscribe)"
);
}

/// Verifies that:
Expand Down Expand Up @@ -2021,3 +2030,91 @@ fn test_ihave_not_sent_to_partial_peers() {
"Peer with requestsPartial=false should receive IHAVE messages"
);
}

/// Verifies that:
/// - Republishing a partial that adds new parts is not stale and is published normally.
/// - Republishing a partial whose parts are a subset of the cached one adds nothing new, so it is
/// detected as stale: `handle_publish` returns no actions.
/// - A stale republish does NOT overwrite the cached partial: a peer that subsequently requests the
/// message is served the full cached partial, not the stale subset.
#[test]
fn test_handle_publish_skips_stale_partial() {
let topic_hash = TopicHash::from_raw("test-topic");
let group_id: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8];
let peer = PeerId::random();
let requester = PeerId::random();
let mut state = State::default();
state.enable_partials_for_topic(topic_hash.clone(), true);
for p in [&peer, &requester] {
state.peer_subscribed(
p,
topic_hash.clone(),
SubscriptionOpts {
requests_partial: true,
supports_partial: true,
},
);
}

// Publish a partial holding the lower four parts. This caches it locally.
let mut message = Bitmap::new(group_id);
message.fill_parts(0b00001111);
let actions = state
.handle_publish(topic_hash.clone(), message, HashSet::from([peer]))
.expect("Publish should succeed");
assert_eq!(actions.len(), 1, "Initial publish should send one message");

// Republish a partial that adds the upper four parts. Not stale, should be sent.
let mut superset = Bitmap::new(group_id);
superset.fill_parts(0b11111111);
let actions = state
.handle_publish(topic_hash.clone(), superset, HashSet::from([peer]))
.expect("Publish should succeed");
assert_eq!(
actions.len(),
1,
"Republishing a partial with new parts should not be considered stale"
);

// Republish a partial that is a strict subset of what was already published.
// It adds nothing new, so it is stale: no actions are returned.
let mut stale = Bitmap::new(group_id);
stale.fill_parts(0b00000011);
let actions = state
.handle_publish(topic_hash.clone(), stale, HashSet::from([peer]))
.expect("Publish should succeed");
assert!(
actions.is_empty(),
"Republishing a stale partial (subset of cached parts) should produce no actions"
);

// A fresh peer announces it has no parts. We should respond from the cached partial.
// If the stale republish had overwritten the cache, the response would only carry
// 0b00000011; instead it must carry the full 0b11111111 that is still cached.
let request = PartialMessage {
group_id: group_id.to_vec(),
topic_hash: topic_hash.clone(),
body: None,
metadata: Some(vec![0b00000000]),
};
let mut actions = state.handle_received(requester, request);
assert_eq!(actions.len(), 1, "Should respond with the cached partial");
let ReceivedAction::Publish(PublishAction::SendMessage {
peer_id,
rpc: RpcOut::PartialMessage(response),
}) = actions.remove(0)
else {
panic!("Expected a Publish(SendMessage) response");
};
assert_eq!(peer_id, requester);
assert_eq!(
response.metadata,
Some(vec![0b11111111]),
"A stale republish must not overwrite the cached partial"
);
assert_eq!(
response.body.expect("Response should carry a body")[0],
0b11111111,
"Response should serve all cached parts, not the stale subset"
);
}
44 changes: 32 additions & 12 deletions protocols/gossipsub/src/extensions/partial_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use std::{
cmp::max,
collections::{BTreeSet, HashMap, HashSet},
collections::{BTreeSet, HashMap, HashSet, hash_map::Entry},
fmt::{self, Debug},
};

Expand Down Expand Up @@ -493,13 +493,42 @@ impl State {
));
};

// Check whether this partial holds more parts than the previously cached one.
let group_id = partial_message.group_id();
let metadata = partial_message.metadata();
let cached = topic_partials.partial_messages.entry(group_id.clone());
if let Entry::Occupied(cached) = &cached {
let mut old_metadata = cached.get().content.metadata();
if !old_metadata
.update(metadata.as_slice())
.map_err(PublishError::Partial)?
{
tracing::debug!(
?metadata,
?old_metadata,
?group_id,
?topic_hash,
"Partial was not updated due to stale data; skipping publish"
);
// Metadata was not updated, so we do not need to send anything and keep the
// previously sent message in cache.
return Ok(vec![]);
}
}

// Cache the sent partial before publishing, so the local node retains what it
// published even when there are currently no recipients to send it to.
let partial_message = &cached.insert_entry(LocalPartial {
content: Box::new(partial_message),
ttl: DEFAULT_PARTIAL_TTL,
});

if recipients.is_empty() {
tracing::debug!(topic = %topic_hash, "Recipient list for publishing partial message is empty");
return Err(PublishError::NoPeersSubscribedToTopic);
}

let mut actions = vec![];
let group_id = partial_message.group_id();
let Some(topic_peers) = self.peer_subscriptions.get_mut(&topic_hash) else {
tracing::error!(topic = %topic_hash, "No peers subscribed to topic");
return Err(PublishError::NoPeersSubscribedToTopic);
Expand All @@ -516,7 +545,7 @@ impl State {
peer_id,
&topic_hash,
&group_id,
&partial_message,
partial_message.get().content.as_ref(),
remote_subscription,
) {
Some(message @ PublishAction::SendMessage { .. }) => {
Expand All @@ -530,15 +559,6 @@ impl State {
}
}

// Cache the sent partial
topic_partials.partial_messages.insert(
partial_message.group_id(),
LocalPartial {
content: Box::new(partial_message),
ttl: DEFAULT_PARTIAL_TTL,
},
);

Ok(actions)
}

Expand Down
Loading