diff options
Diffstat (limited to 'src/sync/mpsc/unbounded.rs')
-rw-r--r-- | src/sync/mpsc/unbounded.rs | 97 |
1 files changed, 93 insertions, 4 deletions
diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs index 1b2288a..fe882d5 100644 --- a/src/sync/mpsc/unbounded.rs +++ b/src/sync/mpsc/unbounded.rs @@ -47,7 +47,7 @@ impl<T> fmt::Debug for UnboundedReceiver<T> { } /// Creates an unbounded mpsc channel for communicating between asynchronous -/// tasks. +/// tasks without backpressure. /// /// A `send` on this channel will always succeed as long as the receive half has /// not been closed. If the receiver falls behind, messages will be arbitrarily @@ -73,8 +73,7 @@ impl<T> UnboundedReceiver<T> { UnboundedReceiver { chan } } - #[doc(hidden)] // TODO: doc - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -174,7 +173,97 @@ impl<T> UnboundedSender<T> { /// [`close`]: UnboundedReceiver::close /// [`UnboundedReceiver`]: UnboundedReceiver pub fn send(&self, message: T) -> Result<(), SendError<T>> { - self.chan.send_unbounded(message)?; + if !self.inc_num_messages() { + return Err(SendError(message)); + } + + self.chan.send(message); Ok(()) } + + fn inc_num_messages(&self) -> bool { + use std::process; + use std::sync::atomic::Ordering::{AcqRel, Acquire}; + + let mut curr = self.chan.semaphore().load(Acquire); + + loop { + if curr & 1 == 1 { + return false; + } + + if curr == usize::MAX ^ 1 { + // Overflowed the ref count. There is no safe way to recover, so + // abort the process. In practice, this should never happen. + process::abort() + } + + match self + .chan + .semaphore() + .compare_exchange(curr, curr + 2, AcqRel, Acquire) + { + Ok(_) => return true, + Err(actual) => { + curr = actual; + } + } + } + } + + /// Completes when the receiver has dropped. + /// + /// This allows the producers to get notified when interest in the produced + /// values is canceled and immediately stop doing work. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx1, rx) = mpsc::unbounded_channel::<()>(); + /// let tx2 = tx1.clone(); + /// let tx3 = tx1.clone(); + /// let tx4 = tx1.clone(); + /// let tx5 = tx1.clone(); + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// futures::join!( + /// tx1.closed(), + /// tx2.closed(), + /// tx3.closed(), + /// tx4.closed(), + /// tx5.closed() + /// ); + //// println!("Receiver dropped"); + /// } + /// ``` + pub async fn closed(&self) { + self.chan.closed().await + } + /// Checks if the channel has been closed. This happens when the + /// [`UnboundedReceiver`] is dropped, or when the + /// [`UnboundedReceiver::close`] method is called. + /// + /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver + /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close + /// + /// ``` + /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); + /// assert!(!tx.is_closed()); + /// + /// let tx2 = tx.clone(); + /// assert!(!tx2.is_closed()); + /// + /// drop(rx); + /// assert!(tx.is_closed()); + /// assert!(tx2.is_closed()); + /// ``` + pub fn is_closed(&self) -> bool { + self.chan.is_closed() + } } |