Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

- Fix `panic!("cannot extract twice")` in `Connection::poll` by selecting
a `Waiting` `SubstreamRequested` explicitly instead of the first entry,
which could be a stale `Done` left behind by a prior extraction.
See [PR 6427](https://github.com/libp2p/rust-libp2p/pull/6427).

## 0.47.1

- Replace `lru::LruCache` with `hashlink::LruCache`.
Expand Down
99 changes: 81 additions & 18 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,13 @@ where
}
}

if let Some(requested_substream) = requested_substreams.iter_mut().next() {
match muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
let (user_data, timeout, upgrade) = requested_substream.extract();

match muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
if let Some((user_data, timeout, upgrade)) = requested_substreams
.iter_mut()
.find_map(SubstreamRequested::extract)
{
negotiating_out.push(StreamUpgrade::new_outbound(
substream,
user_data,
Expand All @@ -425,11 +426,11 @@ where
*substream_upgrade_protocol_override,
stream_counter.clone(),
));

// Go back to the top,
// handler can potentially make progress again.
continue;
}

// Go back to the top,
// handler can potentially make progress again.
continue;
}
}

Expand Down Expand Up @@ -691,7 +692,7 @@ impl<UserData, Upgrade> SubstreamRequested<UserData, Upgrade> {
}
}

fn extract(&mut self) -> (UserData, Delay, Upgrade) {
fn extract(&mut self) -> Option<(UserData, Delay, Upgrade)> {
match mem::replace(self, Self::Done) {
SubstreamRequested::Waiting {
user_data,
Expand All @@ -703,9 +704,9 @@ impl<UserData, Upgrade> SubstreamRequested<UserData, Upgrade> {
waker.wake();
}

(user_data, timeout, upgrade)
Some((user_data, timeout, upgrade))
}
SubstreamRequested::Done => panic!("cannot extract twice"),
SubstreamRequested::Done => None,
}
}
}
Expand Down Expand Up @@ -855,6 +856,23 @@ mod tests {
))
}

/// Regression test for "cannot extract twice".
#[test]
fn connection_poll_skips_done_substream_requested_entries() {
let mut connection = Connection::new(
StreamMuxerBox::new(ReadyOutboundStreamMuxer { remaining: 2 }),
MockConnectionHandler::new(Duration::from_secs(10)),
None,
0,
Duration::ZERO,
);

connection.handler.open_outbound_substreams(2);

let _ = connection.poll_noop_waker();
let _ = connection.poll_noop_waker();
}

#[test]
fn propagates_changes_to_supported_inbound_protocols() {
let mut connection = Connection::new(
Expand Down Expand Up @@ -1098,6 +1116,47 @@ mod tests {
}
}

/// A [`StreamMuxer`] which immediately returns outbound streams.
struct ReadyOutboundStreamMuxer {
remaining: usize,
}

impl StreamMuxer for ReadyOutboundStreamMuxer {
type Substream = PendingSubstream;
type Error = Infallible;

fn poll_inbound(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
Poll::Pending
}

fn poll_outbound(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
if self.remaining == 0 {
return Poll::Pending;
}

self.remaining -= 1;

Poll::Ready(Ok(PendingSubstream { _weak: Weak::new() }))
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Pending
}

fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}
}

struct PendingSubstream {
_weak: Weak<()>,
}
Expand Down Expand Up @@ -1131,22 +1190,26 @@ mod tests {
}

struct MockConnectionHandler {
outbound_requested: bool,
outbound_requested: usize,
error: Option<StreamUpgradeError<Infallible>>,
upgrade_timeout: Duration,
}

impl MockConnectionHandler {
fn new(upgrade_timeout: Duration) -> Self {
Self {
outbound_requested: false,
outbound_requested: 0,
error: None,
upgrade_timeout,
}
}

fn open_new_outbound(&mut self) {
self.outbound_requested = true;
self.open_outbound_substreams(1);
}

fn open_outbound_substreams(&mut self, count: usize) {
self.outbound_requested += count;
}
}

Expand Down Expand Up @@ -1231,8 +1294,8 @@ mod tests {
&mut self,
_: &mut Context<'_>,
) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
if self.outbound_requested {
self.outbound_requested = false;
if self.outbound_requested > 0 {
self.outbound_requested -= 1;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(DeniedUpgrade, ())
.with_timeout(self.upgrade_timeout),
Expand Down