Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.


## [Unreleased]
### Added
- Add `is_closed` and `has_message` methods to `AsyncReceiver`.


## [0.2.0] - 2026-02-23
Expand Down
40 changes: 38 additions & 2 deletions src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,8 @@ impl<T> Receiver<T> {
})
}

/// Returns true if the associated [`Sender`](crate::Sender) was dropped before sending a message. Or if
/// the message has already been received.
/// Returns true if the associated [`Sender`](crate::Sender) was dropped before sending a
/// message. Or if the message has already been received.
///
/// If `true` is returned, all future calls to receive methods are guaranteed to return
/// a disconnected error. And future calls to this method is guaranteed to also return `true`.
Expand Down Expand Up @@ -616,6 +616,42 @@ impl<T> Receiver<T> {
}
}

#[cfg(feature = "async")]
impl<T> AsyncReceiver<T> {
/// Returns true if the associated [`Sender`](crate::Sender) was dropped before sending a
/// message. Or if the message has already been received.
pub fn is_closed(&self) -> bool {
// SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
// is still alive, meaning that even if the sender was dropped then it would have observed
// the fact that we're still alive and left the responsibility of deallocating the
// channel to us, so `self.channel` is valid
let channel = unsafe { self.channel_ptr.as_ref() };

// ORDERING: We *chose* a Relaxed ordering here as it is sufficient to
// enforce the method's contract. Once true has been observed, it will remain true.
// However, if false is observed, the sender might have just disconnected but this thread
// has not observed it yet.
channel.state().load(Relaxed) == DISCONNECTED
}

/// Returns true if there is a message in the channel, ready to be received.
///
/// If `true` is returned, the next poll on the future is guaranteed to return
/// a message.
pub fn has_message(&self) -> bool {
// SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
// is still alive, meaning that even if the sender was dropped then it would have observed
// the fact that we're still alive and left the responsibility of deallocating the
// channel to us, so `self.channel` is valid
let channel = unsafe { self.channel_ptr.as_ref() };

// ORDERING: An acquire ordering is used to guarantee no subsequent loads is reordered
// before this one. This upholds the contract that if true is returned, the next call to
// a receive method is guaranteed to also observe the `MESSAGE` state and return a message.
channel.state().load(Acquire) == MESSAGE
}
}

#[cfg(feature = "async")]
impl<T> core::future::IntoFuture for Receiver<T> {
type Output = Result<T, RecvError>;
Expand Down
18 changes: 18 additions & 0 deletions tests/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,21 @@ async fn poll_receiver_then_drop_it() {
// Make sure the receiver has been dropped by the runtime.
assert!(sender.send(()).is_err());
}

#[test]
fn dropping_sender_disconnects_async_receiver() {
let (sender, receiver) = oneshot::async_channel::<()>();
assert!(!sender.is_closed());
assert!(!receiver.is_closed());
drop(sender);
assert!(receiver.is_closed());
}

#[test]
fn async_receiver_has_message() {
let (sender, receiver) = oneshot::async_channel();
assert!(!receiver.has_message());
assert!(sender.send(19i128).is_ok());

assert!(receiver.has_message());
}