aboutsummaryrefslogtreecommitdiff
path: root/src/sync/mpsc/bounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mpsc/bounded.rs')
-rw-r--r--src/sync/mpsc/bounded.rs137
1 files changed, 130 insertions, 7 deletions
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs
index 5a2bfa6..8babdc7 100644
--- a/src/sync/mpsc/bounded.rs
+++ b/src/sync/mpsc/bounded.rs
@@ -1,3 +1,4 @@
+use crate::loom::sync::Arc;
use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
@@ -22,6 +23,40 @@ pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
}
+/// A sender that does not prevent the channel from being closed.
+///
+/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
+/// instances remain, the channel is closed.
+///
+/// In order to send messages, the `WeakSender` needs to be upgraded using
+/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
+/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
+///
+/// [`Sender`]: Sender
+/// [`WeakSender::upgrade`]: WeakSender::upgrade
+///
+/// #Examples
+///
+/// ```
+/// use tokio::sync::mpsc::channel;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, _rx) = channel::<i32>(15);
+/// let tx_weak = tx.downgrade();
+///
+/// // Upgrading will succeed because `tx` still exists.
+/// assert!(tx_weak.upgrade().is_some());
+///
+/// // If we drop `tx`, then it will fail.
+/// drop(tx);
+/// assert!(tx_weak.clone().upgrade().is_none());
+/// }
+/// ```
+pub struct WeakSender<T> {
+ chan: Arc<chan::Chan<T, Semaphore>>,
+}
+
/// Permits to send one value into the channel.
///
/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
@@ -105,9 +140,13 @@ pub struct Receiver<T> {
/// }
/// }
/// ```
+#[track_caller]
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
- let semaphore = (semaphore::Semaphore::new(buffer), buffer);
+ let semaphore = Semaphore {
+ semaphore: semaphore::Semaphore::new(buffer),
+ bound: buffer,
+ };
let (tx, rx) = chan::channel(semaphore);
let tx = Sender::new(tx);
@@ -118,7 +157,11 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
/// representing the channel bound.
-type Semaphore = (semaphore::Semaphore, usize);
+#[derive(Debug)]
+pub(crate) struct Semaphore {
+ pub(crate) semaphore: semaphore::Semaphore,
+ pub(crate) bound: usize,
+}
impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
@@ -281,6 +324,7 @@ impl<T> Receiver<T> {
/// sync_code.join().unwrap()
/// }
/// ```
+ #[track_caller]
#[cfg(feature = "sync")]
pub fn blocking_recv(&mut self) -> Option<T> {
crate::future::block_on(self.recv())
@@ -535,7 +579,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
- match self.chan.semaphore().0.try_acquire(1) {
+ match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
@@ -563,6 +607,11 @@ impl<T> Sender<T> {
/// [`close`]: Receiver::close
/// [`Receiver`]: Receiver
///
+ /// # Panics
+ ///
+ /// This function panics if it is called outside the context of a Tokio
+ /// runtime [with time enabled](crate::runtime::Builder::enable_time).
+ ///
/// # Examples
///
/// In the following example, each call to `send_timeout` will block until the
@@ -645,6 +694,7 @@ impl<T> Sender<T> {
/// sync_code.join().unwrap()
/// }
/// ```
+ #[track_caller]
#[cfg(feature = "sync")]
pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
crate::future::block_on(self.send(value))
@@ -809,7 +859,7 @@ impl<T> Sender<T> {
}
async fn reserve_inner(&self) -> Result<(), SendError<()>> {
- match self.chan.semaphore().0.acquire(1).await {
+ match self.chan.semaphore().semaphore.acquire(1).await {
Ok(_) => Ok(()),
Err(_) => Err(SendError(())),
}
@@ -859,7 +909,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
- match self.chan.semaphore().0.try_acquire(1) {
+ match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
@@ -924,7 +974,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
- match self.chan.semaphore().0.try_acquire(1) {
+ match self.chan.semaphore().semaphore.try_acquire(1) {
Ok(_) => {}
Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
@@ -955,6 +1005,8 @@ impl<T> Sender<T> {
///
/// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
/// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
+ /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
+ /// specified when calling [`channel`]
///
/// # Examples
///
@@ -980,8 +1032,56 @@ impl<T> Sender<T> {
///
/// [`send`]: Sender::send
/// [`reserve`]: Sender::reserve
+ /// [`channel`]: channel
+ /// [`max_capacity`]: Sender::max_capacity
pub fn capacity(&self) -> usize {
- self.chan.semaphore().0.available_permits()
+ self.chan.semaphore().semaphore.available_permits()
+ }
+
+ /// Converts the `Sender` to a [`WeakSender`] that does not count
+ /// towards RAII semantics, i.e. if all `Sender` instances of the
+ /// channel were dropped and only `WeakSender` instances remain,
+ /// the channel is closed.
+ pub fn downgrade(&self) -> WeakSender<T> {
+ WeakSender {
+ chan: self.chan.downgrade(),
+ }
+ }
+
+ /// Returns the maximum buffer capacity of the channel.
+ ///
+ /// The maximum capacity is the buffer capacity initially specified when calling
+ /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
+ /// available buffer capacity: as messages are sent and received, the
+ /// value returned by [`capacity`] will go up or down, whereas the value
+ /// returned by `max_capacity` will remain constant.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, _rx) = mpsc::channel::<()>(5);
+ ///
+ /// // both max capacity and capacity are the same at first
+ /// assert_eq!(tx.max_capacity(), 5);
+ /// assert_eq!(tx.capacity(), 5);
+ ///
+ /// // Making a reservation doesn't change the max capacity.
+ /// let permit = tx.reserve().await.unwrap();
+ /// assert_eq!(tx.max_capacity(), 5);
+ /// // but drops the capacity by one
+ /// assert_eq!(tx.capacity(), 4);
+ /// }
+ /// ```
+ ///
+ /// [`channel`]: channel
+ /// [`max_capacity`]: Sender::max_capacity
+ /// [`capacity`]: Sender::capacity
+ pub fn max_capacity(&self) -> usize {
+ self.chan.semaphore().bound
}
}
@@ -1001,6 +1101,29 @@ impl<T> fmt::Debug for Sender<T> {
}
}
+impl<T> Clone for WeakSender<T> {
+ fn clone(&self) -> Self {
+ WeakSender {
+ chan: self.chan.clone(),
+ }
+ }
+}
+
+impl<T> WeakSender<T> {
+ /// Tries to convert a WeakSender into a [`Sender`]. This will return `Some`
+ /// if there are other `Sender` instances alive and the channel wasn't
+ /// previously dropped, otherwise `None` is returned.
+ pub fn upgrade(&self) -> Option<Sender<T>> {
+ chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
+ }
+}
+
+impl<T> fmt::Debug for WeakSender<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("WeakSender").finish()
+ }
+}
+
// ===== impl Permit =====
impl<T> Permit<'_, T> {