diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index b5578f90a38..c7cc8c23520 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,6 +6,11 @@ - Raise MSRV to 1.88.0. See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273). +- Fix `panic!("cannot extract twice")` in `Connection::poll` by selecting + a `Waiting` `SubstreamRequested` explicitly instead of the first entry, + which could be a stale `Done` left behind by a prior extraction. + See [PR 6427](https://github.com/libp2p/rust-libp2p/pull/6427). + ## 0.47.1 - Replace `lru::LruCache` with `hashlink::LruCache`. diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 0a1a634d8b8..dffb93e197b 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -411,12 +411,13 @@ where } } - if let Some(requested_substream) = requested_substreams.iter_mut().next() { - match muxing.poll_outbound_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(substream) => { - let (user_data, timeout, upgrade) = requested_substream.extract(); - + match muxing.poll_outbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + if let Some((user_data, timeout, upgrade)) = requested_substreams + .iter_mut() + .find_map(SubstreamRequested::extract) + { negotiating_out.push(StreamUpgrade::new_outbound( substream, user_data, @@ -425,11 +426,11 @@ where *substream_upgrade_protocol_override, stream_counter.clone(), )); - - // Go back to the top, - // handler can potentially make progress again. - continue; } + + // Go back to the top, + // handler can potentially make progress again. + continue; } } @@ -691,7 +692,7 @@ impl SubstreamRequested { } } - fn extract(&mut self) -> (UserData, Delay, Upgrade) { + fn extract(&mut self) -> Option<(UserData, Delay, Upgrade)> { match mem::replace(self, Self::Done) { SubstreamRequested::Waiting { user_data, @@ -703,9 +704,9 @@ impl SubstreamRequested { waker.wake(); } - (user_data, timeout, upgrade) + Some((user_data, timeout, upgrade)) } - SubstreamRequested::Done => panic!("cannot extract twice"), + SubstreamRequested::Done => None, } } } @@ -855,6 +856,23 @@ mod tests { )) } + /// Regression test for "cannot extract twice". + #[test] + fn connection_poll_skips_done_substream_requested_entries() { + let mut connection = Connection::new( + StreamMuxerBox::new(ReadyOutboundStreamMuxer { remaining: 2 }), + MockConnectionHandler::new(Duration::from_secs(10)), + None, + 0, + Duration::ZERO, + ); + + connection.handler.open_outbound_substreams(2); + + let _ = connection.poll_noop_waker(); + let _ = connection.poll_noop_waker(); + } + #[test] fn propagates_changes_to_supported_inbound_protocols() { let mut connection = Connection::new( @@ -1098,6 +1116,47 @@ mod tests { } } + /// A [`StreamMuxer`] which immediately returns outbound streams. + struct ReadyOutboundStreamMuxer { + remaining: usize, + } + + impl StreamMuxer for ReadyOutboundStreamMuxer { + type Substream = PendingSubstream; + type Error = Infallible; + + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_outbound( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + if self.remaining == 0 { + return Poll::Pending; + } + + self.remaining -= 1; + + Poll::Ready(Ok(PendingSubstream { _weak: Weak::new() })) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + struct PendingSubstream { _weak: Weak<()>, } @@ -1131,7 +1190,7 @@ mod tests { } struct MockConnectionHandler { - outbound_requested: bool, + outbound_requested: usize, error: Option>, upgrade_timeout: Duration, } @@ -1139,14 +1198,18 @@ mod tests { impl MockConnectionHandler { fn new(upgrade_timeout: Duration) -> Self { Self { - outbound_requested: false, + outbound_requested: 0, error: None, upgrade_timeout, } } fn open_new_outbound(&mut self) { - self.outbound_requested = true; + self.open_outbound_substreams(1); + } + + fn open_outbound_substreams(&mut self, count: usize) { + self.outbound_requested += count; } } @@ -1231,8 +1294,8 @@ mod tests { &mut self, _: &mut Context<'_>, ) -> Poll> { - if self.outbound_requested { - self.outbound_requested = false; + if self.outbound_requested > 0 { + self.outbound_requested -= 1; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(DeniedUpgrade, ()) .with_timeout(self.upgrade_timeout),