From ae36afa8ece4478f976c8a8711bd4cf3b8408a3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Mon, 23 Feb 2026 23:43:48 +0100 Subject: [PATCH] Add is_closed and has_message to AsyncReceiver --- CHANGELOG.md | 2 ++ src/receiver.rs | 40 ++++++++++++++++++++++++++++++++++++++-- tests/async.rs | 18 ++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b91ea7..3849400 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +### Added +- Add `is_closed` and `has_message` methods to `AsyncReceiver`. ## [0.2.0] - 2026-02-23 diff --git a/src/receiver.rs b/src/receiver.rs index 21b6897..c86a7dc 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -461,8 +461,8 @@ impl Receiver { }) } - /// Returns true if the associated [`Sender`](crate::Sender) was dropped before sending a message. Or if - /// the message has already been received. + /// Returns true if the associated [`Sender`](crate::Sender) was dropped before sending a + /// message. Or if the message has already been received. /// /// If `true` is returned, all future calls to receive methods are guaranteed to return /// a disconnected error. And future calls to this method is guaranteed to also return `true`. @@ -616,6 +616,42 @@ impl Receiver { } } +#[cfg(feature = "async")] +impl AsyncReceiver { + /// Returns true if the associated [`Sender`](crate::Sender) was dropped before sending a + /// message. Or if the message has already been received. + pub fn is_closed(&self) -> bool { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: We *chose* a Relaxed ordering here as it is sufficient to + // enforce the method's contract. Once true has been observed, it will remain true. + // However, if false is observed, the sender might have just disconnected but this thread + // has not observed it yet. + channel.state().load(Relaxed) == DISCONNECTED + } + + /// Returns true if there is a message in the channel, ready to be received. + /// + /// If `true` is returned, the next poll on the future is guaranteed to return + /// a message. + pub fn has_message(&self) -> bool { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered + // before this one. This upholds the contract that if true is returned, the next call to + // a receive method is guaranteed to also observe the `MESSAGE` state and return a message. + channel.state().load(Acquire) == MESSAGE + } +} + #[cfg(feature = "async")] impl core::future::IntoFuture for Receiver { type Output = Result; diff --git a/tests/async.rs b/tests/async.rs index b8b0841..4431ae0 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -98,3 +98,21 @@ async fn poll_receiver_then_drop_it() { // Make sure the receiver has been dropped by the runtime. assert!(sender.send(()).is_err()); } + +#[test] +fn dropping_sender_disconnects_async_receiver() { + let (sender, receiver) = oneshot::async_channel::<()>(); + assert!(!sender.is_closed()); + assert!(!receiver.is_closed()); + drop(sender); + assert!(receiver.is_closed()); +} + +#[test] +fn async_receiver_has_message() { + let (sender, receiver) = oneshot::async_channel(); + assert!(!receiver.has_message()); + assert!(sender.send(19i128).is_ok()); + + assert!(receiver.has_message()); +}