From 9443583cf0e16d58f097aa7924af7dd67f49130e Mon Sep 17 00:00:00 2001 From: "Can H. Tartanoglu" Date: Thu, 7 May 2026 08:46:31 +0200 Subject: [PATCH 1/4] swarm: fix panic!("cannot extract twice") in Connection::poll MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Connection::poll` selects the next outbound-ready `SubstreamRequested` via `iter_mut().next()` and calls `.extract()` on it. `extract()` moves the data out and replaces the variant with `Done`, then relies on `extracted_waker.wake()` to schedule the cleanup poll that removes the entry from `requested_substreams`. The waker is only populated by the `Future::poll` impl when the entry first observes a `Poll::Pending`. If the entry is extracted before that first poll ever happens — which can occur when `FuturesUnordered`'s ordering puts a freshly-pushed entry at the head of `iter_mut()`, or when the muxer returns `poll_outbound = Ready` on the same loop iteration as the request was pushed — the waker is `None`, no wake is scheduled, and the `Done` entry persists. The next outbound-ready iteration's `iter_mut().next()` lands on that stale `Done` entry and `extract()` panics. The intent at this site has always been "find a request waiting for a substream"; making that explicit with `iter_mut().find(|r| matches!(r, SubstreamRequested::Waiting { .. }))` removes the dependence on `FuturesUnordered`'s scheduling. Eager removal of the `Done` entry is not possible through `FuturesUnordered`'s public API while iterating it, so the existing wake-driven cleanup stays in place. The number of un-cleaned `Done` entries is bounded by outstanding outbound requests and is drained naturally as later events trigger connection polls. Adds a unit test reproducing the failure shape directly on a `FuturesUnordered, DeniedUpgrade>>`. --- swarm/CHANGELOG.md | 9 ++++ swarm/src/connection.rs | 99 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index b5578f90a38..7df5ee567bb 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,6 +6,15 @@ - Raise MSRV to 1.88.0. See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273). +- Fix `panic!("cannot extract twice")` in `Connection::poll` when a + `SubstreamRequested` is extracted before its `Future::poll` impl has had + a chance to store its waker. Without a stored waker, `extract()` could + not schedule the cleanup poll that would remove the resulting `Done` + entry from `requested_substreams`, and the next outbound-ready iteration + would land on the same entry via `iter_mut().next()` and panic. The + outbound-extraction site now selects a `Waiting` entry explicitly via + `iter_mut().find(...)`, so a stale `Done` entry can never be re-extracted. + ## 0.47.1 - Replace `lru::LruCache` with `hashlink::LruCache`. diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 0a1a634d8b8..76734388de9 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -411,7 +411,44 @@ where } } - if let Some(requested_substream) = requested_substreams.iter_mut().next() { + // Find the next *Waiting* substream request to pair with a newly + // muxer-ready outbound stream. + // + // We must skip any `SubstreamRequested::Done` entries: `extract()` + // transitions the state to `Done` but does not remove the entry + // from `requested_substreams` directly — that removal is driven + // by the entry's own `Future::poll` returning `Ready(Ok(()))` on + // the next `requested_substreams.poll_next_unpin(cx)` call, after + // its waker is woken from inside `extract()`. + // + // That cleanup pass relies on `extracted_waker` being `Some` at + // the moment `extract()` runs. The waker is populated by the + // `Future::poll` impl when it observes `Poll::Pending` from the + // timeout (see the `Waiting` arm of `::poll` below), but extraction can happen *before* the + // future ever reaches a `Pending` poll: either because the muxer + // returned `poll_outbound = Ready` on the same loop iteration as + // the request was pushed, or — more subtly — because + // `FuturesUnordered`'s ordering for newly-pushed entries means + // `iter_mut().next()` can land on an entry that has not yet + // completed its initial `Pending` poll. In that case the + // `extracted_waker` is `None`, no wake is scheduled, and the + // `Done` entry persists until *some other* event drives a poll + // of this connection. + // + // Without this filter, the next outbound-ready iteration's + // `iter_mut().next()` would land on that stale `Done` entry and + // `extract()` would panic with "cannot extract twice". + // + // Filtering at extraction time is the precise expression of the + // intent ("find a waiting request to fulfil") and is robust + // regardless of `FuturesUnordered`'s internal poll-ordering. The + // alternative — removing `Done` entries eagerly — is not possible + // through `FuturesUnordered`'s public API while iterating it. + if let Some(requested_substream) = requested_substreams + .iter_mut() + .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) + { match muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { @@ -855,6 +892,66 @@ mod tests { )) } + /// Regression test for "cannot extract twice". + /// + /// Constructs the minimal failure shape directly on a `FuturesUnordered`: + /// push a `SubstreamRequested` and call `extract()` *before* the future + /// has been polled, so `extracted_waker` is `None`. The entry is now in + /// `Done` state but lacks the wake call that would let `poll_next` clean + /// it up. + /// + /// Before the fix, `iter_mut().next()` would return that stale `Done` + /// entry on the next outbound-ready iteration of `Connection::poll` and + /// `extract()` would panic. After the fix, the iter site filters to + /// `Waiting` entries only — no panic, and the `Done` entry is allowed to + /// be cleaned up by a later `poll_next` whenever its waker (or + /// `FuturesUnordered`'s internal scheduling) drives it. + #[test] + fn iter_mut_skips_done_substream_requested_entries() { + let mut requested: FuturesUnordered> = + FuturesUnordered::new(); + + // Two pushes that are never `poll_next`'d, so neither future has + // stored a waker — this mirrors the bug condition where extraction + // races ahead of the future's first `Poll::Pending`. + requested.push(SubstreamRequested::new( + (), + Duration::from_secs(60), + DeniedUpgrade, + )); + requested.push(SubstreamRequested::new( + (), + Duration::from_secs(60), + DeniedUpgrade, + )); + + // First extraction: pick a Waiting entry and extract. + let first = requested + .iter_mut() + .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) + .expect("first call must find a Waiting entry"); + let _ = first.extract(); + + // After extract, one entry is Done. `iter_mut().next()` would + // surface that Done — the filter must skip past it to the remaining + // Waiting entry. + let second = requested + .iter_mut() + .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) + .expect("second call must skip the Done entry and find the other Waiting"); + let _ = second.extract(); + + // After both extracts, all entries are Done. `find(Waiting)` returns + // None. Critically, no panic. + assert!( + requested + .iter_mut() + .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) + .is_none(), + "no Waiting entries remain; the filter must not surface Done entries" + ); + } + #[test] fn propagates_changes_to_supported_inbound_protocols() { let mut connection = Connection::new( From 4b11ee213bdec53c5a5604fa310ed40750453b79 Mon Sep 17 00:00:00 2001 From: "Can H. Tartanoglu" Date: Tue, 12 May 2026 13:20:08 +0200 Subject: [PATCH 2/4] swarm: trim docs and changelog for cannot-extract-twice fix --- swarm/CHANGELOG.md | 12 ++--- swarm/src/connection.rs | 107 +++++++++------------------------------- 2 files changed, 28 insertions(+), 91 deletions(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 7df5ee567bb..c7cc8c23520 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -6,14 +6,10 @@ - Raise MSRV to 1.88.0. See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273). -- Fix `panic!("cannot extract twice")` in `Connection::poll` when a - `SubstreamRequested` is extracted before its `Future::poll` impl has had - a chance to store its waker. Without a stored waker, `extract()` could - not schedule the cleanup poll that would remove the resulting `Done` - entry from `requested_substreams`, and the next outbound-ready iteration - would land on the same entry via `iter_mut().next()` and panic. The - outbound-extraction site now selects a `Waiting` entry explicitly via - `iter_mut().find(...)`, so a stale `Done` entry can never be re-extracted. +- Fix `panic!("cannot extract twice")` in `Connection::poll` by selecting + a `Waiting` `SubstreamRequested` explicitly instead of the first entry, + which could be a stale `Done` left behind by a prior extraction. + See [PR 6427](https://github.com/libp2p/rust-libp2p/pull/6427). ## 0.47.1 diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 76734388de9..897faa96d6c 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -411,40 +411,11 @@ where } } - // Find the next *Waiting* substream request to pair with a newly - // muxer-ready outbound stream. - // - // We must skip any `SubstreamRequested::Done` entries: `extract()` - // transitions the state to `Done` but does not remove the entry - // from `requested_substreams` directly — that removal is driven - // by the entry's own `Future::poll` returning `Ready(Ok(()))` on - // the next `requested_substreams.poll_next_unpin(cx)` call, after - // its waker is woken from inside `extract()`. - // - // That cleanup pass relies on `extracted_waker` being `Some` at - // the moment `extract()` runs. The waker is populated by the - // `Future::poll` impl when it observes `Poll::Pending` from the - // timeout (see the `Waiting` arm of `::poll` below), but extraction can happen *before* the - // future ever reaches a `Pending` poll: either because the muxer - // returned `poll_outbound = Ready` on the same loop iteration as - // the request was pushed, or — more subtly — because - // `FuturesUnordered`'s ordering for newly-pushed entries means - // `iter_mut().next()` can land on an entry that has not yet - // completed its initial `Pending` poll. In that case the - // `extracted_waker` is `None`, no wake is scheduled, and the - // `Done` entry persists until *some other* event drives a poll - // of this connection. - // - // Without this filter, the next outbound-ready iteration's - // `iter_mut().next()` would land on that stale `Done` entry and - // `extract()` would panic with "cannot extract twice". - // - // Filtering at extraction time is the precise expression of the - // intent ("find a waiting request to fulfil") and is robust - // regardless of `FuturesUnordered`'s internal poll-ordering. The - // alternative — removing `Done` entries eagerly — is not possible - // through `FuturesUnordered`'s public API while iterating it. + // Skip `Done` entries: `extract()` leaves them behind until the + // entry's `Future::poll` removes them on the next `poll_next`. + // If `extract()` runs before that future has stored its waker, + // the `Done` entry can persist and a plain `iter_mut().next()` + // would re-extract it and panic with "cannot extract twice". if let Some(requested_substream) = requested_substreams .iter_mut() .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) @@ -892,63 +863,33 @@ mod tests { )) } - /// Regression test for "cannot extract twice". - /// - /// Constructs the minimal failure shape directly on a `FuturesUnordered`: - /// push a `SubstreamRequested` and call `extract()` *before* the future - /// has been polled, so `extracted_waker` is `None`. The entry is now in - /// `Done` state but lacks the wake call that would let `poll_next` clean - /// it up. - /// - /// Before the fix, `iter_mut().next()` would return that stale `Done` - /// entry on the next outbound-ready iteration of `Connection::poll` and - /// `extract()` would panic. After the fix, the iter site filters to - /// `Waiting` entries only — no panic, and the `Done` entry is allowed to - /// be cleaned up by a later `poll_next` whenever its waker (or - /// `FuturesUnordered`'s internal scheduling) drives it. + // Regression test for `panic!("cannot extract twice")`: pushing two + // entries and calling `extract()` without ever polling them leaves + // a `Done` entry that `iter_mut().next()` would re-extract. The + // `find(Waiting)` filter must skip past it. #[test] fn iter_mut_skips_done_substream_requested_entries() { let mut requested: FuturesUnordered> = FuturesUnordered::new(); + for _ in 0..2 { + requested.push(SubstreamRequested::new( + (), + Duration::from_secs(60), + DeniedUpgrade, + )); + } - // Two pushes that are never `poll_next`'d, so neither future has - // stored a waker — this mirrors the bug condition where extraction - // races ahead of the future's first `Poll::Pending`. - requested.push(SubstreamRequested::new( - (), - Duration::from_secs(60), - DeniedUpgrade, - )); - requested.push(SubstreamRequested::new( - (), - Duration::from_secs(60), - DeniedUpgrade, - )); - - // First extraction: pick a Waiting entry and extract. - let first = requested - .iter_mut() - .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) - .expect("first call must find a Waiting entry"); - let _ = first.extract(); - - // After extract, one entry is Done. `iter_mut().next()` would - // surface that Done — the filter must skip past it to the remaining - // Waiting entry. - let second = requested - .iter_mut() - .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) - .expect("second call must skip the Done entry and find the other Waiting"); - let _ = second.extract(); - - // After both extracts, all entries are Done. `find(Waiting)` returns - // None. Critically, no panic. + for _ in 0..2 { + let _ = requested + .iter_mut() + .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) + .unwrap() + .extract(); + } assert!( requested .iter_mut() - .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) - .is_none(), - "no Waiting entries remain; the filter must not surface Done entries" + .all(|r| matches!(r, SubstreamRequested::Done)) ); } From e56350120a5e095bdc333b6379c97983c24c147f Mon Sep 17 00:00:00 2001 From: "Can H. Tartanoglu" Date: Mon, 25 May 2026 06:57:53 +0200 Subject: [PATCH 3/4] swarm: address cannot-extract-twice review --- swarm/src/connection.rs | 134 +++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 51 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 897faa96d6c..6346305ec75 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -411,28 +411,28 @@ where } } - // Skip `Done` entries: `extract()` leaves them behind until the - // entry's `Future::poll` removes them on the next `poll_next`. - // If `extract()` runs before that future has stored its waker, - // the `Done` entry can persist and a plain `iter_mut().next()` - // would re-extract it and panic with "cannot extract twice". - if let Some(requested_substream) = requested_substreams + // `extract()` leaves `Done` entries behind until the next + // `poll_next` cleanup. Skip them when fulfilling a waiting request. + if requested_substreams .iter_mut() - .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) + .any(|r| matches!(r, SubstreamRequested::Waiting { .. })) { match muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let (user_data, timeout, upgrade) = requested_substream.extract(); - - negotiating_out.push(StreamUpgrade::new_outbound( - substream, - user_data, - timeout, - upgrade, - *substream_upgrade_protocol_override, - stream_counter.clone(), - )); + if let Some((user_data, timeout, upgrade)) = requested_substreams + .iter_mut() + .find_map(SubstreamRequested::extract) + { + negotiating_out.push(StreamUpgrade::new_outbound( + substream, + user_data, + timeout, + upgrade, + *substream_upgrade_protocol_override, + stream_counter.clone(), + )); + } // Go back to the top, // handler can potentially make progress again. @@ -699,7 +699,7 @@ impl SubstreamRequested { } } - fn extract(&mut self) -> (UserData, Delay, Upgrade) { + fn extract(&mut self) -> Option<(UserData, Delay, Upgrade)> { match mem::replace(self, Self::Done) { SubstreamRequested::Waiting { user_data, @@ -711,9 +711,9 @@ impl SubstreamRequested { waker.wake(); } - (user_data, timeout, upgrade) + Some((user_data, timeout, upgrade)) } - SubstreamRequested::Done => panic!("cannot extract twice"), + SubstreamRequested::Done => None, } } } @@ -863,34 +863,21 @@ mod tests { )) } - // Regression test for `panic!("cannot extract twice")`: pushing two - // entries and calling `extract()` without ever polling them leaves - // a `Done` entry that `iter_mut().next()` would re-extract. The - // `find(Waiting)` filter must skip past it. + /// Regression test for "cannot extract twice". #[test] - fn iter_mut_skips_done_substream_requested_entries() { - let mut requested: FuturesUnordered> = - FuturesUnordered::new(); - for _ in 0..2 { - requested.push(SubstreamRequested::new( - (), - Duration::from_secs(60), - DeniedUpgrade, - )); - } - - for _ in 0..2 { - let _ = requested - .iter_mut() - .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) - .unwrap() - .extract(); - } - assert!( - requested - .iter_mut() - .all(|r| matches!(r, SubstreamRequested::Done)) + fn connection_poll_skips_done_substream_requested_entries() { + let mut connection = Connection::new( + StreamMuxerBox::new(ReadyOutboundStreamMuxer { remaining: 2 }), + MockConnectionHandler::new(Duration::from_secs(10)), + None, + 0, + Duration::ZERO, ); + + connection.handler.open_outbound_substreams(2); + + let _ = connection.poll_noop_waker(); + let _ = connection.poll_noop_waker(); } #[test] @@ -1136,6 +1123,47 @@ mod tests { } } + /// A [`StreamMuxer`] which immediately returns outbound streams. + struct ReadyOutboundStreamMuxer { + remaining: usize, + } + + impl StreamMuxer for ReadyOutboundStreamMuxer { + type Substream = PendingSubstream; + type Error = Infallible; + + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_outbound( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + if self.remaining == 0 { + return Poll::Pending; + } + + self.remaining -= 1; + + Poll::Ready(Ok(PendingSubstream { _weak: Weak::new() })) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + struct PendingSubstream { _weak: Weak<()>, } @@ -1169,7 +1197,7 @@ mod tests { } struct MockConnectionHandler { - outbound_requested: bool, + outbound_requested: usize, error: Option>, upgrade_timeout: Duration, } @@ -1177,14 +1205,18 @@ mod tests { impl MockConnectionHandler { fn new(upgrade_timeout: Duration) -> Self { Self { - outbound_requested: false, + outbound_requested: 0, error: None, upgrade_timeout, } } fn open_new_outbound(&mut self) { - self.outbound_requested = true; + self.open_outbound_substreams(1); + } + + fn open_outbound_substreams(&mut self, count: usize) { + self.outbound_requested += count; } } @@ -1269,8 +1301,8 @@ mod tests { &mut self, _: &mut Context<'_>, ) -> Poll> { - if self.outbound_requested { - self.outbound_requested = false; + if self.outbound_requested > 0 { + self.outbound_requested -= 1; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(DeniedUpgrade, ()) .with_timeout(self.upgrade_timeout), From f43eff81dbeb18215c3122eae61bfd268d3deb51 Mon Sep 17 00:00:00 2001 From: "Can H. Tartanoglu" Date: Tue, 26 May 2026 09:29:11 +0200 Subject: [PATCH 4/4] swarm: restore outbound request polling shape --- swarm/src/connection.rs | 45 +++++++++++++++++------------------------ 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 6346305ec75..dffb93e197b 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -411,33 +411,26 @@ where } } - // `extract()` leaves `Done` entries behind until the next - // `poll_next` cleanup. Skip them when fulfilling a waiting request. - if requested_substreams - .iter_mut() - .any(|r| matches!(r, SubstreamRequested::Waiting { .. })) - { - match muxing.poll_outbound_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(substream) => { - if let Some((user_data, timeout, upgrade)) = requested_substreams - .iter_mut() - .find_map(SubstreamRequested::extract) - { - negotiating_out.push(StreamUpgrade::new_outbound( - substream, - user_data, - timeout, - upgrade, - *substream_upgrade_protocol_override, - stream_counter.clone(), - )); - } - - // Go back to the top, - // handler can potentially make progress again. - continue; + match muxing.poll_outbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + if let Some((user_data, timeout, upgrade)) = requested_substreams + .iter_mut() + .find_map(SubstreamRequested::extract) + { + negotiating_out.push(StreamUpgrade::new_outbound( + substream, + user_data, + timeout, + upgrade, + *substream_upgrade_protocol_override, + stream_counter.clone(), + )); } + + // Go back to the top, + // handler can potentially make progress again. + continue; } }