Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,48 @@ impl<T> Receiver<T> {
ReceiverFlavor::Never(_chan) => 0,
}
}

/// Returns a new sender that communicates with this
/// receiver. This can be used to revive a receiver
/// even when there are no senders currently alive.
///
/// # Example
///
/// ```
/// use crossbeam_channel::{unbounded, RecvError};
/// let (s, r) = unbounded::<i32>();
/// s.send(1).unwrap();
/// drop(s); // After this point no sender is alive
/// assert_eq!(r.recv(), Ok(1));
/// // Here the channel is empty, but the rcv call doesn't block
/// // as there are no live senders capable of sending messages.
/// assert_eq!(r.recv(), Err(RecvError));
/// // We can spawn a new sender
/// let s = r.new_sender();
/// s.send(1).unwrap();
/// assert_eq!(r.recv(), Ok(1));
/// // Another recv call here would block as there exists a live sender.
/// ```
///
/// # Panics
///
/// If this receiver was created from [tick], [at], or [never()].
/// Those channel flavors do not support explicit receivers.
#[track_caller]
pub fn new_sender(&self) -> Sender<T> {
match &self.flavor {
ReceiverFlavor::Array(chan) => Sender {
flavor: SenderFlavor::Array(chan.new_sender(|c| c.reconnect_senders())),
},
ReceiverFlavor::List(chan) => Sender {
flavor: SenderFlavor::List(chan.new_sender(|c| c.reconnect_senders())),
},
ReceiverFlavor::Zero(chan) => Sender {
flavor: SenderFlavor::Zero(chan.new_sender(|c| c.reconnect_senders())),
},
_ => panic!("new_sender cannot be called with a Receiver created by at/never/tick"),
}
}
}

impl<T> Drop for Receiver<T> {
Expand Down
12 changes: 12 additions & 0 deletions crossbeam-channel/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ impl<C> Receiver<C> {
pub(crate) fn addr(&self) -> usize {
self.counter.as_ptr() as usize
}

pub(crate) fn new_sender(&self, reconnect: impl FnOnce(&C) -> bool) -> Sender<C> {
if 0 == self.counter().senders.fetch_add(1, Ordering::SeqCst) {
// we're the first sender to be created
reconnect(&self.counter().chan);
self.counter().destroy.store(false, Ordering::Relaxed);
}

Sender {
counter: self.counter,
}
}
}

impl<C> ops::Deref for Receiver<C> {
Expand Down
11 changes: 11 additions & 0 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,17 @@ impl<T> Channel<T> {
}
}

/// Reconnects senders. There is no need to notify threads
/// as nobody is currently blocking, since we were in a phase
/// where no senders are alive.
///
/// Returns `true` if this call reconnected the channel.
pub(crate) fn reconnect_senders(&self) -> bool {
let tail = self.tail.fetch_and(!self.mark_bit, Ordering::SeqCst);

tail & self.mark_bit == 0
}

/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
Expand Down
11 changes: 11 additions & 0 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,17 @@ impl<T> Channel<T> {
}
}

/// Reconnects senders. There is no need to notify threads
/// as nobody is currently blocking, since we were in a phase
/// where no senders are alive.
///
/// Returns `true` if this call reconnected the channel.
pub(crate) fn reconnect_senders(&self) -> bool {
let tail = self.tail.index.fetch_and(!MARK_BIT, Ordering::SeqCst);

tail & MARK_BIT == 0
}

/// Disconnects receivers.
///
/// Returns `true` if this call disconnected the channel.
Expand Down
15 changes: 15 additions & 0 deletions crossbeam-channel/src/flavors/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,21 @@ impl<T> Channel<T> {
})
}

/// Reconnects senders. There is no need to notify threads
/// as nobody is currently blocking, since we were in a phase
/// where no senders are alive.
///
/// Returns `true` if this call reconnected the channel.
pub(crate) fn reconnect_senders(&self) -> bool {
let mut inner = self.inner.lock().unwrap();
if inner.is_disconnected {
inner.is_disconnected = false;
true
} else {
false
}
}

/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
Expand Down
7 changes: 7 additions & 0 deletions crossbeam-channel/tests/after.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,10 @@ fn fairness_duplicates() {
assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
}
}

#[test]
#[should_panic]
fn new_sender() {
let r = after(Duration::from_micros(20));
r.new_sender();
}
17 changes: 17 additions & 0 deletions crossbeam-channel/tests/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,20 @@ fn panic_on_drop() {
// Elements after the panicked element will leak.
assert!(!b);
}

#[test]
fn zero_sender_revival() {
let (s, r) = bounded::<i32>(1);

s.send(1).unwrap();

drop(s);

assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Err(RecvError));

let s = r.new_sender();

s.send(1).unwrap();
assert_eq!(r.recv(), Ok(1));
}
17 changes: 17 additions & 0 deletions crossbeam-channel/tests/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ fn recv_after_disconnect() {
assert_eq!(r.recv(), Err(RecvError));
}

#[test]
fn zero_sender_revival() {
let (s, r) = unbounded();

s.send(1).unwrap();

drop(s);

assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Err(RecvError));

let s = r.new_sender();

s.send(1).unwrap();
assert_eq!(r.recv(), Ok(1));
}

#[test]
fn len() {
let (s, r) = unbounded();
Expand Down
7 changes: 7 additions & 0 deletions crossbeam-channel/tests/never.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,10 @@ fn recv_timeout() {
assert!(now - start >= ms(200));
assert!(now - start <= ms(250));
}

#[test]
#[should_panic]
fn new_sender() {
let r = never::<i32>();
r.new_sender();
}
7 changes: 7 additions & 0 deletions crossbeam-channel/tests/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,10 @@ fn fairness_duplicates() {
assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
}
}

#[test]
#[should_panic]
fn new_sender() {
let r = tick(Duration::from_micros(20));
r.new_sender();
}
46 changes: 46 additions & 0 deletions crossbeam-channel/tests/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,49 @@ fn channel_through_channel() {
})
.unwrap();
}

#[test]
fn zero_sender_revival() {
let (s, r) = bounded::<i32>(0);

let rref = &r;

scope(|scope| {
scope.spawn(move |_| {
let s = s;

s.send(1).unwrap();
// sender is dropped
});

scope.spawn(move |_| {
let r = rref;

assert_eq!(r.recv(), Ok(1));
thread::sleep(Duration::from_millis(100));
assert_eq!(r.recv(), Err(RecvError)); // non blocking

let mut msg = r.recv();
assert_eq!(msg, Err(RecvError)); // no senders

while msg == Err(RecvError) {
// Yield to the other thread
thread::sleep(ms(30));
msg = r.recv();
}

assert_eq!(msg, Ok(2)); // received from second sender thread
});

scope.spawn(move |_| {
thread::sleep(ms(200)); // start with a delay

let r = rref;

let s = r.new_sender();

s.send(2).unwrap();
});
})
.unwrap();
}