diff options
Diffstat (limited to 'src/sync/mpsc/bounded.rs')
-rw-r--r-- | src/sync/mpsc/bounded.rs | 137 |
1 files changed, 130 insertions, 7 deletions
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs index 5a2bfa6..8babdc7 100644 --- a/src/sync/mpsc/bounded.rs +++ b/src/sync/mpsc/bounded.rs @@ -1,3 +1,4 @@ +use crate::loom::sync::Arc; use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError}; use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError}; @@ -22,6 +23,40 @@ pub struct Sender<T> { chan: chan::Tx<T, Semaphore>, } +/// A sender that does not prevent the channel from being closed. +/// +/// If all [`Sender`] instances of a channel were dropped and only `WeakSender` +/// instances remain, the channel is closed. +/// +/// In order to send messages, the `WeakSender` needs to be upgraded using +/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None` +/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`. +/// +/// [`Sender`]: Sender +/// [`WeakSender::upgrade`]: WeakSender::upgrade +/// +/// #Examples +/// +/// ``` +/// use tokio::sync::mpsc::channel; +/// +/// #[tokio::main] +/// async fn main() { +/// let (tx, _rx) = channel::<i32>(15); +/// let tx_weak = tx.downgrade(); +/// +/// // Upgrading will succeed because `tx` still exists. +/// assert!(tx_weak.upgrade().is_some()); +/// +/// // If we drop `tx`, then it will fail. +/// drop(tx); +/// assert!(tx_weak.clone().upgrade().is_none()); +/// } +/// ``` +pub struct WeakSender<T> { + chan: Arc<chan::Chan<T, Semaphore>>, +} + /// Permits to send one value into the channel. /// /// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`] @@ -105,9 +140,13 @@ pub struct Receiver<T> { /// } /// } /// ``` +#[track_caller] pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); - let semaphore = (semaphore::Semaphore::new(buffer), buffer); + let semaphore = Semaphore { + semaphore: semaphore::Semaphore::new(buffer), + bound: buffer, + }; let (tx, rx) = chan::channel(semaphore); let tx = Sender::new(tx); @@ -118,7 +157,11 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { /// Channel semaphore is a tuple of the semaphore implementation and a `usize` /// representing the channel bound. -type Semaphore = (semaphore::Semaphore, usize); +#[derive(Debug)] +pub(crate) struct Semaphore { + pub(crate) semaphore: semaphore::Semaphore, + pub(crate) bound: usize, +} impl<T> Receiver<T> { pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> { @@ -281,6 +324,7 @@ impl<T> Receiver<T> { /// sync_code.join().unwrap() /// } /// ``` + #[track_caller] #[cfg(feature = "sync")] pub fn blocking_recv(&mut self) -> Option<T> { crate::future::block_on(self.recv()) @@ -535,7 +579,7 @@ impl<T> Sender<T> { /// } /// ``` pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> { - match self.chan.semaphore().0.try_acquire(1) { + match self.chan.semaphore().semaphore.try_acquire(1) { Ok(_) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)), @@ -563,6 +607,11 @@ impl<T> Sender<T> { /// [`close`]: Receiver::close /// [`Receiver`]: Receiver /// + /// # Panics + /// + /// This function panics if it is called outside the context of a Tokio + /// runtime [with time enabled](crate::runtime::Builder::enable_time). + /// /// # Examples /// /// In the following example, each call to `send_timeout` will block until the @@ -645,6 +694,7 @@ impl<T> Sender<T> { /// sync_code.join().unwrap() /// } /// ``` + #[track_caller] #[cfg(feature = "sync")] pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> { crate::future::block_on(self.send(value)) @@ -809,7 +859,7 @@ impl<T> Sender<T> { } async fn reserve_inner(&self) -> Result<(), SendError<()>> { - match self.chan.semaphore().0.acquire(1).await { + match self.chan.semaphore().semaphore.acquire(1).await { Ok(_) => Ok(()), Err(_) => Err(SendError(())), } @@ -859,7 +909,7 @@ impl<T> Sender<T> { /// } /// ``` pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> { - match self.chan.semaphore().0.try_acquire(1) { + match self.chan.semaphore().semaphore.try_acquire(1) { Ok(_) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())), @@ -924,7 +974,7 @@ impl<T> Sender<T> { /// } /// ``` pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> { - match self.chan.semaphore().0.try_acquire(1) { + match self.chan.semaphore().semaphore.try_acquire(1) { Ok(_) => {} Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)), Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)), @@ -955,6 +1005,8 @@ impl<T> Sender<T> { /// /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`]. + /// This is distinct from [`max_capacity`], which always returns buffer capacity initially + /// specified when calling [`channel`] /// /// # Examples /// @@ -980,8 +1032,56 @@ impl<T> Sender<T> { /// /// [`send`]: Sender::send /// [`reserve`]: Sender::reserve + /// [`channel`]: channel + /// [`max_capacity`]: Sender::max_capacity pub fn capacity(&self) -> usize { - self.chan.semaphore().0.available_permits() + self.chan.semaphore().semaphore.available_permits() + } + + /// Converts the `Sender` to a [`WeakSender`] that does not count + /// towards RAII semantics, i.e. if all `Sender` instances of the + /// channel were dropped and only `WeakSender` instances remain, + /// the channel is closed. + pub fn downgrade(&self) -> WeakSender<T> { + WeakSender { + chan: self.chan.downgrade(), + } + } + + /// Returns the maximum buffer capacity of the channel. + /// + /// The maximum capacity is the buffer capacity initially specified when calling + /// [`channel`]. This is distinct from [`capacity`], which returns the *current* + /// available buffer capacity: as messages are sent and received, the + /// value returned by [`capacity`] will go up or down, whereas the value + /// returned by `max_capacity` will remain constant. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, _rx) = mpsc::channel::<()>(5); + /// + /// // both max capacity and capacity are the same at first + /// assert_eq!(tx.max_capacity(), 5); + /// assert_eq!(tx.capacity(), 5); + /// + /// // Making a reservation doesn't change the max capacity. + /// let permit = tx.reserve().await.unwrap(); + /// assert_eq!(tx.max_capacity(), 5); + /// // but drops the capacity by one + /// assert_eq!(tx.capacity(), 4); + /// } + /// ``` + /// + /// [`channel`]: channel + /// [`max_capacity`]: Sender::max_capacity + /// [`capacity`]: Sender::capacity + pub fn max_capacity(&self) -> usize { + self.chan.semaphore().bound } } @@ -1001,6 +1101,29 @@ impl<T> fmt::Debug for Sender<T> { } } +impl<T> Clone for WeakSender<T> { + fn clone(&self) -> Self { + WeakSender { + chan: self.chan.clone(), + } + } +} + +impl<T> WeakSender<T> { + /// Tries to convert a WeakSender into a [`Sender`]. This will return `Some` + /// if there are other `Sender` instances alive and the channel wasn't + /// previously dropped, otherwise `None` is returned. + pub fn upgrade(&self) -> Option<Sender<T>> { + chan::Tx::upgrade(self.chan.clone()).map(Sender::new) + } +} + +impl<T> fmt::Debug for WeakSender<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("WeakSender").finish() + } +} + // ===== impl Permit ===== impl<T> Permit<'_, T> { |