From 5e643d35f0fa64ac86e06925e500dd7a86df9045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Mon, 11 Oct 2021 23:48:11 +0200 Subject: [PATCH 1/4] Add ability to spawn new senders from the receiver. --- crossbeam-channel/src/channel.rs | 75 +++++++++++++++++++++++++++ crossbeam-channel/src/counter.rs | 12 +++++ crossbeam-channel/src/flavors/list.rs | 11 ++++ crossbeam-channel/src/lib.rs | 3 +- crossbeam-channel/tests/list.rs | 19 ++++++- 5 files changed, 118 insertions(+), 2 deletions(-) diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index a81a2735e..6e34e6c64 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -1561,3 +1561,78 @@ pub(crate) unsafe fn read(r: &Receiver, token: &mut Token) -> Result() -> (Sender, Receiver) { + let (s, r) = super::unbounded(); + (s, Receiver(r)) + } + + /// A [receiver](super::Receiver) that can survive periods + /// of time where no [Sender]s are alive. New senders may + /// be created from the receiver directly ([Self::new_sender]). + /// The channel is only deallocated when the last receiver dies. + /// If there are live senders at that point, they start producing + /// [SendError]s as usual. + #[derive(Debug)] + pub struct Receiver(NormalReceiver); + + impl Receiver { + /// Returns a new sender that communicates with this + /// receiver. + pub fn new_sender(&self) -> Sender { + match &self.0.flavor { + ReceiverFlavor::Array(chan) => Sender { + flavor: SenderFlavor::Array( + chan.new_sender(|_| unimplemented!("but unreachable")), + ), + }, + ReceiverFlavor::List(chan) => Sender { + flavor: SenderFlavor::List(chan.new_sender(|c| c.reconnect_senders())), + }, + ReceiverFlavor::Zero(chan) => Sender { + flavor: SenderFlavor::Zero( + chan.new_sender(|_| unimplemented!("but unreachable")), + ), + }, + _ => unreachable!("This type cannot be built with at/never/tick"), + } + } + } + + impl Deref for self::Receiver { + type Target = NormalReceiver; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } +} diff --git a/crossbeam-channel/src/counter.rs b/crossbeam-channel/src/counter.rs index 1b9c1df19..86fec2633 100644 --- a/crossbeam-channel/src/counter.rs +++ b/crossbeam-channel/src/counter.rs @@ -138,6 +138,18 @@ impl Receiver { pub(crate) fn addr(&self) -> usize { self.counter.as_ptr() as usize } + + pub(crate) fn new_sender(&self, reconnect: impl FnOnce(&C) -> bool) -> Sender { + if 0 == self.counter().senders.fetch_add(1, Ordering::SeqCst) { + // we're the first sender to be created + reconnect(&self.counter().chan); + self.counter().destroy.store(false, Ordering::Relaxed); + } + + Sender { + counter: self.counter, + } + } } impl ops::Deref for Receiver { diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 5e4f51f9d..3f110f89e 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -569,6 +569,17 @@ impl Channel { } } + /// Reconnects senders. There is no need to notify threads + /// as nobody is currently blocking, since we were in an + /// idle phase. + /// + /// Returns `true` if this call reconnected the channel. + pub(crate) fn reconnect_senders(&self) -> bool { + let tail = self.tail.index.fetch_and(!MARK_BIT, Ordering::SeqCst); + + tail & MARK_BIT == 0 + } + /// Disconnects receivers. /// /// Returns `true` if this call disconnected the channel. diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index cecf5c71c..8deb82a61 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -377,7 +377,8 @@ pub mod internal { #[cfg(feature = "std")] pub use crate::{ channel::{ - IntoIter, Iter, Receiver, Sender, TryIter, after, at, bounded, never, tick, unbounded, + IntoIter, Iter, Receiver, Sender, TryIter, after, at, bounded, never, reconnectable, tick, + unbounded, }, err::{ ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError, diff --git a/crossbeam-channel/tests/list.rs b/crossbeam-channel/tests/list.rs index 33216ef44..29746fad6 100644 --- a/crossbeam-channel/tests/list.rs +++ b/crossbeam-channel/tests/list.rs @@ -9,7 +9,7 @@ use std::{ use crossbeam_channel::{ Receiver, RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError, - select, unbounded, + reconnectable, select, unbounded, }; use crossbeam_utils::thread::scope; @@ -208,6 +208,23 @@ fn recv_after_disconnect() { assert_eq!(r.recv(), Err(RecvError)); } +#[test] +fn zero_receiver_revival() { + let (s, r) = reconnectable::unbounded(); + + s.send(1).unwrap(); + + drop(s); + + assert_eq!(r.recv(), Ok(1)); + assert_eq!(r.recv(), Err(RecvError)); + + let s = r.new_sender(); + + s.send(1).unwrap(); + assert_eq!(r.recv(), Ok(1)); +} + #[test] fn len() { let (s, r) = unbounded(); From c2770be5044354bc183d0c739c01bc0b7fed911c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Wed, 25 Feb 2026 13:18:52 +0100 Subject: [PATCH 2/4] Delete separate module Just put the new_sender function on regular Receiver. --- crossbeam-channel/src/channel.rs | 117 +++++++++++-------------------- crossbeam-channel/src/lib.rs | 3 +- crossbeam-channel/tests/after.rs | 7 ++ crossbeam-channel/tests/list.rs | 4 +- crossbeam-channel/tests/never.rs | 7 ++ crossbeam-channel/tests/tick.rs | 7 ++ 6 files changed, 66 insertions(+), 79 deletions(-) diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 6e34e6c64..95cd8388b 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -1177,6 +1177,48 @@ impl Receiver { ReceiverFlavor::Never(_chan) => 0, } } + + /// Returns a new sender that communicates with this + /// receiver. This can be used to revive a receiver + /// even when there are no senders currently alive. + /// + /// # Example + /// + /// ``` + /// use crossbeam_channel::{unbounded, RecvError}; + /// let (s, r) = unbounded::(); + /// s.send(1).unwrap(); + /// drop(s); // After this point no sender is alive + /// assert_eq!(r.recv(), Ok(1)); + /// // Here the channel is empty, but the rcv call doesn't block + /// // as there are no live senders capable of sending messages. + /// assert_eq!(r.recv(), Err(RecvError)); + /// // We can spawn a new sender + /// let s = r.new_sender(); + /// s.send(1).unwrap(); + /// assert_eq!(r.recv(), Ok(1)); + /// // Another recv call here would block as there exists a live sender. + /// ``` + /// + /// # Panics + /// + /// If this receiver was created from [tick], [at], or [never]. + /// Those channel flavors do not support explicit receivers. + #[track_caller] + pub fn new_sender(&self) -> Sender { + match &self.flavor { + ReceiverFlavor::Array(chan) => Sender { + flavor: SenderFlavor::Array(chan.new_sender(|_| unimplemented!("but unreachable"))), + }, + ReceiverFlavor::List(chan) => Sender { + flavor: SenderFlavor::List(chan.new_sender(|c| c.reconnect_senders())), + }, + ReceiverFlavor::Zero(chan) => Sender { + flavor: SenderFlavor::Zero(chan.new_sender(|_| unimplemented!("but unreachable"))), + }, + _ => panic!("new_sender cannot be called with a Receiver created by at/never/tick"), + } + } } impl Drop for Receiver { @@ -1561,78 +1603,3 @@ pub(crate) unsafe fn read(r: &Receiver, token: &mut Token) -> Result() -> (Sender, Receiver) { - let (s, r) = super::unbounded(); - (s, Receiver(r)) - } - - /// A [receiver](super::Receiver) that can survive periods - /// of time where no [Sender]s are alive. New senders may - /// be created from the receiver directly ([Self::new_sender]). - /// The channel is only deallocated when the last receiver dies. - /// If there are live senders at that point, they start producing - /// [SendError]s as usual. - #[derive(Debug)] - pub struct Receiver(NormalReceiver); - - impl Receiver { - /// Returns a new sender that communicates with this - /// receiver. - pub fn new_sender(&self) -> Sender { - match &self.0.flavor { - ReceiverFlavor::Array(chan) => Sender { - flavor: SenderFlavor::Array( - chan.new_sender(|_| unimplemented!("but unreachable")), - ), - }, - ReceiverFlavor::List(chan) => Sender { - flavor: SenderFlavor::List(chan.new_sender(|c| c.reconnect_senders())), - }, - ReceiverFlavor::Zero(chan) => Sender { - flavor: SenderFlavor::Zero( - chan.new_sender(|_| unimplemented!("but unreachable")), - ), - }, - _ => unreachable!("This type cannot be built with at/never/tick"), - } - } - } - - impl Deref for self::Receiver { - type Target = NormalReceiver; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } -} diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index 8deb82a61..cecf5c71c 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -377,8 +377,7 @@ pub mod internal { #[cfg(feature = "std")] pub use crate::{ channel::{ - IntoIter, Iter, Receiver, Sender, TryIter, after, at, bounded, never, reconnectable, tick, - unbounded, + IntoIter, Iter, Receiver, Sender, TryIter, after, at, bounded, never, tick, unbounded, }, err::{ ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError, diff --git a/crossbeam-channel/tests/after.rs b/crossbeam-channel/tests/after.rs index 331727612..43c3af9ca 100644 --- a/crossbeam-channel/tests/after.rs +++ b/crossbeam-channel/tests/after.rs @@ -339,3 +339,10 @@ fn fairness_duplicates() { assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); } } + +#[test] +#[should_panic] +fn new_sender() { + let r = after(Duration::from_micros(20)); + r.new_sender(); +} diff --git a/crossbeam-channel/tests/list.rs b/crossbeam-channel/tests/list.rs index 29746fad6..9b9af8abd 100644 --- a/crossbeam-channel/tests/list.rs +++ b/crossbeam-channel/tests/list.rs @@ -9,7 +9,7 @@ use std::{ use crossbeam_channel::{ Receiver, RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError, - reconnectable, select, unbounded, + select, unbounded, }; use crossbeam_utils::thread::scope; @@ -210,7 +210,7 @@ fn recv_after_disconnect() { #[test] fn zero_receiver_revival() { - let (s, r) = reconnectable::unbounded(); + let (s, r) = unbounded(); s.send(1).unwrap(); diff --git a/crossbeam-channel/tests/never.rs b/crossbeam-channel/tests/never.rs index 1e4a81384..4d5ee367b 100644 --- a/crossbeam-channel/tests/never.rs +++ b/crossbeam-channel/tests/never.rs @@ -96,3 +96,10 @@ fn recv_timeout() { assert!(now - start >= ms(200)); assert!(now - start <= ms(250)); } + +#[test] +#[should_panic] +fn new_sender() { + let r = never::(); + r.new_sender(); +} diff --git a/crossbeam-channel/tests/tick.rs b/crossbeam-channel/tests/tick.rs index 310c95df2..c90a8575e 100644 --- a/crossbeam-channel/tests/tick.rs +++ b/crossbeam-channel/tests/tick.rs @@ -358,3 +358,10 @@ fn fairness_duplicates() { assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); } } + +#[test] +#[should_panic] +fn new_sender() { + let r = tick(Duration::from_micros(20)); + r.new_sender(); +} From 04cc29d3384655639a2b75c3ec5d98f470a8fb94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Wed, 25 Feb 2026 14:05:23 +0100 Subject: [PATCH 3/4] Implement new_sender for Zero and Array channel --- crossbeam-channel/src/channel.rs | 6 ++-- crossbeam-channel/src/flavors/array.rs | 11 ++++++ crossbeam-channel/src/flavors/list.rs | 4 +-- crossbeam-channel/src/flavors/zero.rs | 15 ++++++++ crossbeam-channel/tests/array.rs | 17 ++++++++++ crossbeam-channel/tests/list.rs | 2 +- crossbeam-channel/tests/zero.rs | 47 ++++++++++++++++++++++++++ 7 files changed, 96 insertions(+), 6 deletions(-) diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 95cd8388b..e70c328b1 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -1202,19 +1202,19 @@ impl Receiver { /// /// # Panics /// - /// If this receiver was created from [tick], [at], or [never]. + /// If this receiver was created from [tick], [at], or [never()]. /// Those channel flavors do not support explicit receivers. #[track_caller] pub fn new_sender(&self) -> Sender { match &self.flavor { ReceiverFlavor::Array(chan) => Sender { - flavor: SenderFlavor::Array(chan.new_sender(|_| unimplemented!("but unreachable"))), + flavor: SenderFlavor::Array(chan.new_sender(|c| c.reconnect_senders())), }, ReceiverFlavor::List(chan) => Sender { flavor: SenderFlavor::List(chan.new_sender(|c| c.reconnect_senders())), }, ReceiverFlavor::Zero(chan) => Sender { - flavor: SenderFlavor::Zero(chan.new_sender(|_| unimplemented!("but unreachable"))), + flavor: SenderFlavor::Zero(chan.new_sender(|c| c.reconnect_senders())), }, _ => panic!("new_sender cannot be called with a Receiver created by at/never/tick"), } diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index e8c5f81da..70f951129 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -495,6 +495,17 @@ impl Channel { } } + /// Reconnects senders. There is no need to notify threads + /// as nobody is currently blocking, since we were in a phase + /// where no senders are alive. + /// + /// Returns `true` if this call reconnected the channel. + pub(crate) fn reconnect_senders(&self) -> bool { + let tail = self.tail.fetch_and(!self.mark_bit, Ordering::SeqCst); + + tail & self.mark_bit == 0 + } + /// Returns `true` if the channel is disconnected. pub(crate) fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 3f110f89e..96155c110 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -570,8 +570,8 @@ impl Channel { } /// Reconnects senders. There is no need to notify threads - /// as nobody is currently blocking, since we were in an - /// idle phase. + /// as nobody is currently blocking, since we were in a phase + /// where no senders are alive. /// /// Returns `true` if this call reconnected the channel. pub(crate) fn reconnect_senders(&self) -> bool { diff --git a/crossbeam-channel/src/flavors/zero.rs b/crossbeam-channel/src/flavors/zero.rs index 386530aac..3e6268acb 100644 --- a/crossbeam-channel/src/flavors/zero.rs +++ b/crossbeam-channel/src/flavors/zero.rs @@ -356,6 +356,21 @@ impl Channel { }) } + /// Reconnects senders. There is no need to notify threads + /// as nobody is currently blocking, since we were in a phase + /// where no senders are alive. + /// + /// Returns `true` if this call reconnected the channel. + pub(crate) fn reconnect_senders(&self) -> bool { + let mut inner = self.inner.lock().unwrap(); + if inner.is_disconnected { + inner.is_disconnected = false; + true + } else { + false + } + } + /// Disconnects the channel and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. diff --git a/crossbeam-channel/tests/array.rs b/crossbeam-channel/tests/array.rs index 080a391de..f5a12c7e7 100644 --- a/crossbeam-channel/tests/array.rs +++ b/crossbeam-channel/tests/array.rs @@ -707,3 +707,20 @@ fn panic_on_drop() { // Elements after the panicked element will leak. assert!(!b); } + +#[test] +fn zero_sender_revival() { + let (s, r) = bounded::(1); + + s.send(1).unwrap(); + + drop(s); + + assert_eq!(r.recv(), Ok(1)); + assert_eq!(r.recv(), Err(RecvError)); + + let s = r.new_sender(); + + s.send(1).unwrap(); + assert_eq!(r.recv(), Ok(1)); +} diff --git a/crossbeam-channel/tests/list.rs b/crossbeam-channel/tests/list.rs index 9b9af8abd..7eec6c681 100644 --- a/crossbeam-channel/tests/list.rs +++ b/crossbeam-channel/tests/list.rs @@ -209,7 +209,7 @@ fn recv_after_disconnect() { } #[test] -fn zero_receiver_revival() { +fn zero_sender_revival() { let (s, r) = unbounded(); s.send(1).unwrap(); diff --git a/crossbeam-channel/tests/zero.rs b/crossbeam-channel/tests/zero.rs index 5f6614299..85cf7bae7 100644 --- a/crossbeam-channel/tests/zero.rs +++ b/crossbeam-channel/tests/zero.rs @@ -556,3 +556,50 @@ fn channel_through_channel() { }) .unwrap(); } + +#[test] +fn zero_sender_revival() { + + let (s, r) = bounded::(0); + + let rref = &r; + + scope(|scope| { + scope.spawn(move |_| { + let s = s; + + s.send(1).unwrap(); + // sender is dropped + }); + + scope.spawn(move |_| { + let r = rref; + + assert_eq!(r.recv(), Ok(1)); + thread::sleep(Duration::from_millis(100)); + assert_eq!(r.recv(), Err(RecvError)); // non blocking + + let mut msg = r.recv(); + assert_eq!(msg, Err(RecvError)); // no senders + + while msg == Err(RecvError) { + // Yield to the other thread + thread::sleep(ms(30)); + msg = r.recv(); + } + + assert_eq!(msg, Ok(2)); // received from second sender thread + }); + + scope.spawn(move |_| { + thread::sleep(ms(200)); // start with a delay + + let r = rref; + + let s = r.new_sender(); + + s.send(2).unwrap(); + }); + }) + .unwrap(); +} From 1dcab1e8566a60ddd90f2ff54767ba696b61f2ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Fournier?= Date: Wed, 25 Feb 2026 14:29:20 +0100 Subject: [PATCH 4/4] Fmt --- crossbeam-channel/tests/zero.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crossbeam-channel/tests/zero.rs b/crossbeam-channel/tests/zero.rs index 85cf7bae7..fa10630c1 100644 --- a/crossbeam-channel/tests/zero.rs +++ b/crossbeam-channel/tests/zero.rs @@ -559,7 +559,6 @@ fn channel_through_channel() { #[test] fn zero_sender_revival() { - let (s, r) = bounded::(0); let rref = &r;