Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

- Fix `panic!("cannot extract twice")` in `Connection::poll` by selecting
a `Waiting` `SubstreamRequested` explicitly instead of the first entry,
which could be a stale `Done` left behind by a prior extraction.
See [PR 6427](https://github.com/libp2p/rust-libp2p/pull/6427).

## 0.47.1

- Replace `lru::LruCache` with `hashlink::LruCache`.
Expand Down
108 changes: 89 additions & 19 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,20 +411,28 @@ where
}
}

if let Some(requested_substream) = requested_substreams.iter_mut().next() {
// `extract()` leaves `Done` entries behind until the next
// `poll_next` cleanup. Skip them when fulfilling a waiting request.
if requested_substreams
.iter_mut()
.any(|r| matches!(r, SubstreamRequested::Waiting { .. }))
{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if extract now return an option this becomes unnecessary right? We can leave the previous code

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I dropped the extra precheck and kept the outbound polling path in the previous shape, with extract() handling stale entries via Option.

match muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
let (user_data, timeout, upgrade) = requested_substream.extract();

negotiating_out.push(StreamUpgrade::new_outbound(
substream,
user_data,
timeout,
upgrade,
*substream_upgrade_protocol_override,
stream_counter.clone(),
));
if let Some((user_data, timeout, upgrade)) = requested_substreams
.iter_mut()
.find_map(SubstreamRequested::extract)
{
negotiating_out.push(StreamUpgrade::new_outbound(
substream,
user_data,
timeout,
upgrade,
*substream_upgrade_protocol_override,
stream_counter.clone(),
));
}

// Go back to the top,
// handler can potentially make progress again.
Expand Down Expand Up @@ -691,7 +699,7 @@ impl<UserData, Upgrade> SubstreamRequested<UserData, Upgrade> {
}
}

fn extract(&mut self) -> (UserData, Delay, Upgrade) {
fn extract(&mut self) -> Option<(UserData, Delay, Upgrade)> {
match mem::replace(self, Self::Done) {
SubstreamRequested::Waiting {
user_data,
Expand All @@ -703,9 +711,9 @@ impl<UserData, Upgrade> SubstreamRequested<UserData, Upgrade> {
waker.wake();
}

(user_data, timeout, upgrade)
Some((user_data, timeout, upgrade))
}
SubstreamRequested::Done => panic!("cannot extract twice"),
SubstreamRequested::Done => None,
}
}
}
Expand Down Expand Up @@ -855,6 +863,23 @@ mod tests {
))
}

/// Regression test for "cannot extract twice".
#[test]
fn connection_poll_skips_done_substream_requested_entries() {
let mut connection = Connection::new(
StreamMuxerBox::new(ReadyOutboundStreamMuxer { remaining: 2 }),
MockConnectionHandler::new(Duration::from_secs(10)),
None,
0,
Duration::ZERO,
);

connection.handler.open_outbound_substreams(2);

let _ = connection.poll_noop_waker();
let _ = connection.poll_noop_waker();
}

#[test]
fn propagates_changes_to_supported_inbound_protocols() {
let mut connection = Connection::new(
Expand Down Expand Up @@ -1098,6 +1123,47 @@ mod tests {
}
}

/// A [`StreamMuxer`] which immediately returns outbound streams.
struct ReadyOutboundStreamMuxer {
remaining: usize,
}

impl StreamMuxer for ReadyOutboundStreamMuxer {
type Substream = PendingSubstream;
type Error = Infallible;

fn poll_inbound(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
Poll::Pending
}

fn poll_outbound(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
if self.remaining == 0 {
return Poll::Pending;
}

self.remaining -= 1;

Poll::Ready(Ok(PendingSubstream { _weak: Weak::new() }))
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Pending
}

fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}
}

struct PendingSubstream {
_weak: Weak<()>,
}
Expand Down Expand Up @@ -1131,22 +1197,26 @@ mod tests {
}

struct MockConnectionHandler {
outbound_requested: bool,
outbound_requested: usize,
error: Option<StreamUpgradeError<Infallible>>,
upgrade_timeout: Duration,
}

impl MockConnectionHandler {
fn new(upgrade_timeout: Duration) -> Self {
Self {
outbound_requested: false,
outbound_requested: 0,
error: None,
upgrade_timeout,
}
}

fn open_new_outbound(&mut self) {
self.outbound_requested = true;
self.open_outbound_substreams(1);
}

fn open_outbound_substreams(&mut self, count: usize) {
self.outbound_requested += count;
}
}

Expand Down Expand Up @@ -1231,8 +1301,8 @@ mod tests {
&mut self,
_: &mut Context<'_>,
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
if self.outbound_requested {
self.outbound_requested = false;
if self.outbound_requested > 0 {
self.outbound_requested -= 1;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(DeniedUpgrade, ())
.with_timeout(self.upgrade_timeout),
Expand Down
Loading