Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

- Fix `panic!("cannot extract twice")` in `Connection::poll` by selecting
a `Waiting` `SubstreamRequested` explicitly instead of the first entry,
which could be a stale `Done` left behind by a prior extraction.
See [PR 6427](https://github.com/libp2p/rust-libp2p/pull/6427).

## 0.47.1

- Replace `lru::LruCache` with `hashlink::LruCache`.
Expand Down
40 changes: 39 additions & 1 deletion swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,15 @@ where
}
}

if let Some(requested_substream) = requested_substreams.iter_mut().next() {
// Skip `Done` entries: `extract()` leaves them behind until the
// entry's `Future::poll` removes them on the next `poll_next`.
// If `extract()` runs before that future has stored its waker,
// the `Done` entry can persist and a plain `iter_mut().next()`
// would re-extract it and panic with "cannot extract twice".
if let Some(requested_substream) = requested_substreams
.iter_mut()
.find(|r| matches!(r, SubstreamRequested::Waiting { .. }))
{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could just make extract return an Option .
We could then

if let Some((user_data, timeout, upgrade)) = requested_substreams.iter_mut().next().and_then(|r| r.extract()) {

finding here for a SubstreamRequested::Waiting to then match again inside extract feels redundant.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. extract() now returns Option, and the connection path uses find_map(SubstreamRequested::extract) to skip stale Done entries instead of panicking. I kept a separate waiting-request precheck before polling the muxer so we do not open an outbound stream unless there is a request to pair with it.

match muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
Expand Down Expand Up @@ -855,6 +863,36 @@ mod tests {
))
}

// Regression test for `panic!("cannot extract twice")`: pushing two
// entries and calling `extract()` without ever polling them leaves
// a `Done` entry that `iter_mut().next()` would re-extract. The
// `find(Waiting)` filter must skip past it.
#[test]
fn iter_mut_skips_done_substream_requested_entries() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test manually creates the race condition scenario by directly manipulating the internal state, it doesn't actually prove the panic can happen in real-world usage.
Do you have an actual Minimal, Reproducible Example?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I replaced the direct FuturesUnordered<SubstreamRequested<_>> state test with a Connection::poll-level test using a mock handler that queues outbound requests and a muxer that returns outbound streams immediately. This now covers the real connection polling path instead of calling extract() directly. One caveat: when I back-applied this two-request test to the current base, it did not reproduce the panic deterministically, so I am treating it as path coverage for the stale-Done handling rather than a standalone deterministic MRE for the original downstream contention failure.

let mut requested: FuturesUnordered<SubstreamRequested<(), DeniedUpgrade>> =
FuturesUnordered::new();
for _ in 0..2 {
requested.push(SubstreamRequested::new(
(),
Duration::from_secs(60),
DeniedUpgrade,
));
}

for _ in 0..2 {
let _ = requested
.iter_mut()
.find(|r| matches!(r, SubstreamRequested::Waiting { .. }))
.unwrap()
.extract();
}
assert!(
requested
.iter_mut()
.all(|r| matches!(r, SubstreamRequested::Done))
);
}

#[test]
fn propagates_changes_to_supported_inbound_protocols() {
let mut connection = Connection::new(
Expand Down
Loading