fix(swarm): avoid re-extracting completed outbound substreams#6427
fix(swarm): avoid re-extracting completed outbound substreams#6427caniko wants to merge 4 commits into
Conversation
`Connection::poll` selects the next outbound-ready `SubstreamRequested`
via `iter_mut().next()` and calls `.extract()` on it. `extract()` moves
the data out and replaces the variant with `Done`, then relies on
`extracted_waker.wake()` to schedule the cleanup poll that removes the
entry from `requested_substreams`.
The waker is only populated by the `Future::poll` impl when the entry
first observes a `Poll::Pending`. If the entry is extracted before that
first poll ever happens — which can occur when `FuturesUnordered`'s
ordering puts a freshly-pushed entry at the head of `iter_mut()`, or
when the muxer returns `poll_outbound = Ready` on the same loop
iteration as the request was pushed — the waker is `None`, no wake is
scheduled, and the `Done` entry persists.
The next outbound-ready iteration's `iter_mut().next()` lands on that
stale `Done` entry and `extract()` panics.
The intent at this site has always been "find a request waiting for a
substream"; making that explicit with `iter_mut().find(|r| matches!(r,
SubstreamRequested::Waiting { .. }))` removes the dependence on
`FuturesUnordered`'s scheduling. Eager removal of the `Done` entry is
not possible through `FuturesUnordered`'s public API while iterating
it, so the existing wake-driven cleanup stays in place. The number of
un-cleaned `Done` entries is bounded by outstanding outbound requests
and is drained naturally as later events trigger connection polls.
Adds a unit test reproducing the failure shape directly on a
`FuturesUnordered<SubstreamRequested<()>, DeniedUpgrade>>`.
panic!("cannot extract twice") in Connection::poll
|
Thank you for the PR @caniko. We appreciate all contributions, including ones that were created with the help of LLMs/ AI-tools. That said, the sheer amount of text in this PR (descriptions, code comments) makes it very time-consuming to review a change that appears to be just a one-liner.
|
|
Apologies @elenaf9, I usually let these sit and wait as draft, and improve the PR before asking for review. This PR seems to have skipped this step, anyway, it is in the past. I tried reducing the text, and hope it is good enough for review |
|
It seems this issue might have been automatically generated. To help us address it effectively, please provide additional details. We value the use of LLMs for code generation and welcome your contributions but please ensure your submission is of such quality that a maintainer will spend less time reviewing it than implementing it themselves. Verify the code functions correctly and meets our standards. If your change requires tests, kindly include them and ensure they pass. If no further information is provided, the issue will be automatically closed in 7 days. Thank you for your understanding and for aiding us in maintaining quality contributions! |
|
It seems this issue might have been automatically generated. To help us address it effectively, please provide additional details. We value the use of LLMs for code generation and welcome your contributions but please ensure your submission is of such quality that a maintainer will spend less time reviewing it than implementing it themselves. Verify the code functions correctly and meets our standards. If your change requires tests, kindly include them and ensure they pass. If no further information is provided, the issue will be automatically closed in 7 days. Thank you for your understanding and for aiding us in maintaining quality contributions! |
|
@elenaf9 I think there is something wrong with the bot |
jxs
left a comment
There was a problem hiding this comment.
Hi, and thanks for this! Left some comments
| if let Some(requested_substream) = requested_substreams | ||
| .iter_mut() | ||
| .find(|r| matches!(r, SubstreamRequested::Waiting { .. })) | ||
| { |
There was a problem hiding this comment.
we could just make extract return an Option .
We could then
if let Some((user_data, timeout, upgrade)) = requested_substreams.iter_mut().next().and_then(|r| r.extract()) {finding here for a SubstreamRequested::Waiting to then match again inside extract feels redundant.
There was a problem hiding this comment.
Done. extract() now returns Option, and the connection path uses find_map(SubstreamRequested::extract) to skip stale Done entries instead of panicking. I kept a separate waiting-request precheck before polling the muxer so we do not open an outbound stream unless there is a request to pair with it.
| // Regression test for `panic!("cannot extract twice")`: pushing two | ||
| // entries and calling `extract()` without ever polling them leaves | ||
| // a `Done` entry that `iter_mut().next()` would re-extract. The | ||
| // `find(Waiting)` filter must skip past it. | ||
| #[test] | ||
| fn iter_mut_skips_done_substream_requested_entries() { |
There was a problem hiding this comment.
this test manually creates the race condition scenario by directly manipulating the internal state, it doesn't actually prove the panic can happen in real-world usage.
Do you have an actual Minimal, Reproducible Example?
There was a problem hiding this comment.
Done. I replaced the direct FuturesUnordered<SubstreamRequested<_>> state test with a Connection::poll-level test using a mock handler that queues outbound requests and a muxer that returns outbound streams immediately. This now covers the real connection polling path instead of calling extract() directly. One caveat: when I back-applied this two-request test to the current base, it did not reproduce the panic deterministically, so I am treating it as path coverage for the stale-Done handling rather than a standalone deterministic MRE for the original downstream contention failure.
panic!("cannot extract twice") in Connection::poll|
Updated per review: Validation run locally with an ad hoc Nix Rust shell because the downstream project flake is currently blocked by a stale
|
| if requested_substreams | ||
| .iter_mut() | ||
| .any(|r| matches!(r, SubstreamRequested::Waiting { .. })) | ||
| { |
There was a problem hiding this comment.
if extract now return an option this becomes unnecessary right? We can leave the previous code
There was a problem hiding this comment.
Good call, I dropped the extra precheck and kept the outbound polling path in the previous shape, with extract() handling stale entries via Option.
What
Connection::pollcan panic withcannot extract twice(swarm/src/connection.rs:708on master) when an outboundSubstreamRequestedis extracted before itsFuture::pollimpl has stored a waker — typically when the muxer reportspoll_outbound = Readyon the same iteration the request was pushed. Without a stored waker,extract()cannot schedule the cleanup poll that would remove the resultingDoneentry, and the next outbound-ready iteration'siter_mut().next()re-extracts the staleDoneand panics.Fix
Select a
Waitingentry explicitly at the extraction site:requested_substreams .iter_mut() .find(|r| matches!(r, SubstreamRequested::Waiting { .. }))This makes the existing intent ("pick a waiting request") explicit and removes the dependence on
FuturesUnordered's internal poll-ordering.Doneentries are still cleaned up by the existing wake-driven path whenever the connection is polled again.Test
Added a unit test that reproduces the failure directly on a
FuturesUnordered<SubstreamRequested<…>>(push,extract()before any poll, then re-iterate). Without the fix it panics; with the fix the filter skips theDoneentry.Also verified against the downstream tournament mesh workload that originally surfaced the panic.