aboutsummaryrefslogtreecommitdiff
path: root/src/sync/mpsc/unbounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mpsc/unbounded.rs')
-rw-r--r--src/sync/mpsc/unbounded.rs97
1 files changed, 93 insertions, 4 deletions
diff --git a/src/sync/mpsc/unbounded.rs b/src/sync/mpsc/unbounded.rs
index 1b2288a..fe882d5 100644
--- a/src/sync/mpsc/unbounded.rs
+++ b/src/sync/mpsc/unbounded.rs
@@ -47,7 +47,7 @@ impl<T> fmt::Debug for UnboundedReceiver<T> {
}
/// Creates an unbounded mpsc channel for communicating between asynchronous
-/// tasks.
+/// tasks without backpressure.
///
/// A `send` on this channel will always succeed as long as the receive half has
/// not been closed. If the receiver falls behind, messages will be arbitrarily
@@ -73,8 +73,7 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
- #[doc(hidden)] // TODO: doc
- pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -174,7 +173,97 @@ impl<T> UnboundedSender<T> {
/// [`close`]: UnboundedReceiver::close
/// [`UnboundedReceiver`]: UnboundedReceiver
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
- self.chan.send_unbounded(message)?;
+ if !self.inc_num_messages() {
+ return Err(SendError(message));
+ }
+
+ self.chan.send(message);
Ok(())
}
+
+ fn inc_num_messages(&self) -> bool {
+ use std::process;
+ use std::sync::atomic::Ordering::{AcqRel, Acquire};
+
+ let mut curr = self.chan.semaphore().load(Acquire);
+
+ loop {
+ if curr & 1 == 1 {
+ return false;
+ }
+
+ if curr == usize::MAX ^ 1 {
+ // Overflowed the ref count. There is no safe way to recover, so
+ // abort the process. In practice, this should never happen.
+ process::abort()
+ }
+
+ match self
+ .chan
+ .semaphore()
+ .compare_exchange(curr, curr + 2, AcqRel, Acquire)
+ {
+ Ok(_) => return true,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
+
+ /// Completes when the receiver has dropped.
+ ///
+ /// This allows the producers to get notified when interest in the produced
+ /// values is canceled and immediately stop doing work.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
+ /// let tx2 = tx1.clone();
+ /// let tx3 = tx1.clone();
+ /// let tx4 = tx1.clone();
+ /// let tx5 = tx1.clone();
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// futures::join!(
+ /// tx1.closed(),
+ /// tx2.closed(),
+ /// tx3.closed(),
+ /// tx4.closed(),
+ /// tx5.closed()
+ /// );
+ //// println!("Receiver dropped");
+ /// }
+ /// ```
+ pub async fn closed(&self) {
+ self.chan.closed().await
+ }
+ /// Checks if the channel has been closed. This happens when the
+ /// [`UnboundedReceiver`] is dropped, or when the
+ /// [`UnboundedReceiver::close`] method is called.
+ ///
+ /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
+ /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
+ ///
+ /// ```
+ /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
+ /// assert!(!tx.is_closed());
+ ///
+ /// let tx2 = tx.clone();
+ /// assert!(!tx2.is_closed());
+ ///
+ /// drop(rx);
+ /// assert!(tx.is_closed());
+ /// assert!(tx2.is_closed());
+ /// ```
+ pub fn is_closed(&self) -> bool {
+ self.chan.is_closed()
+ }
}