aboutsummaryrefslogtreecommitdiff
path: root/src/sync/mpsc/bounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/mpsc/bounded.rs')
-rw-r--r--src/sync/mpsc/bounded.rs301
1 files changed, 295 insertions, 6 deletions
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs
index 1f670bf..ce857d7 100644
--- a/src/sync/mpsc/bounded.rs
+++ b/src/sync/mpsc/bounded.rs
@@ -33,6 +33,22 @@ pub struct Permit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
}
+/// Owned permit to send one value into the channel.
+///
+/// This is identical to the [`Permit`] type, except that it moves the sender
+/// rather than borrowing it.
+///
+/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
+/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
+/// before generating a message to send.
+///
+/// [`Permit`]: Permit
+/// [`Sender::reserve_owned()`]: Sender::reserve_owned
+/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
+pub struct OwnedPermit<T> {
+ chan: Option<chan::Tx<T, Semaphore>>,
+}
+
/// Receive values from the associated `Sender`.
///
/// Instances are created by the [`channel`](channel) function.
@@ -229,10 +245,11 @@ impl<T> Receiver<T> {
///
/// To guarantee that no messages are dropped, after calling `close()`,
/// `recv()` must be called until `None` is returned. If there are
- /// outstanding [`Permit`] values, the `recv` method will not return `None`
- /// until those are released.
+ /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
+ /// not return `None` until those are released.
///
/// [`Permit`]: Permit
+ /// [`OwnedPermit`]: OwnedPermit
///
/// # Examples
///
@@ -624,12 +641,96 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
+ self.reserve_inner().await?;
+ Ok(Permit { chan: &self.chan })
+ }
+
+ /// Wait for channel capacity, moving the `Sender` and returning an owned
+ /// permit. Once capacity to send one message is available, it is reserved
+ /// for the caller.
+ ///
+ /// This moves the sender _by value_, and returns an owned permit that can
+ /// be used to send a message into the channel. Unlike [`Sender::reserve`],
+ /// this method may be used in cases where the permit must be valid for the
+ /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
+ /// essentially a reference count increment, comparable to [`Arc::clone`]),
+ /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
+ /// moved, it can be cloned prior to calling `reserve_owned`.
+ ///
+ /// If the channel is full, the function waits for the number of unreceived
+ /// messages to become less than the channel capacity. Capacity to send one
+ /// message is reserved for the caller. An [`OwnedPermit`] is returned to
+ /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
+ /// consumes the reserved capacity.
+ ///
+ /// Dropping the [`OwnedPermit`] without sending a message releases the
+ /// capacity back to the channel.
+ ///
+ /// # Examples
+ /// Sending a message using an [`OwnedPermit`]:
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity, moving the sender.
+ /// let permit = tx.reserve_owned().await.unwrap();
+ ///
+ /// // Send a message, consuming the permit and returning
+ /// // the moved sender.
+ /// let tx = permit.send(123);
+ ///
+ /// // The value sent on the permit is received.
+ /// assert_eq!(rx.recv().await.unwrap(), 123);
+ ///
+ /// // The sender can now be used again.
+ /// tx.send(456).await.unwrap();
+ /// }
+ /// ```
+ ///
+ /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
+ /// by value, it can be inexpensively cloned before calling `reserve_owned`:
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Clone the sender and reserve capacity.
+ /// let permit = tx.clone().reserve_owned().await.unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Sending on the permit succeeds.
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ /// }
+ /// ```
+ ///
+ /// [`Sender::reserve`]: Sender::reserve
+ /// [`OwnedPermit`]: OwnedPermit
+ /// [`send`]: OwnedPermit::send
+ /// [`Arc::clone`]: std::sync::Arc::clone
+ pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
+ self.reserve_inner().await?;
+ Ok(OwnedPermit {
+ chan: Some(self.chan),
+ })
+ }
+
+ async fn reserve_inner(&self) -> Result<(), SendError<()>> {
match self.chan.semaphore().0.acquire(1).await {
- Ok(_) => {}
- Err(_) => return Err(SendError(())),
+ Ok(_) => Ok(()),
+ Err(_) => Err(SendError(())),
}
-
- Ok(Permit { chan: &self.chan })
}
/// Try to acquire a slot in the channel without waiting for the slot to become
@@ -684,6 +785,72 @@ impl<T> Sender<T> {
Ok(Permit { chan: &self.chan })
}
+ /// Try to acquire a slot in the channel without waiting for the slot to become
+ /// available, returning an owned permit.
+ ///
+ /// This moves the sender _by value_, and returns an owned permit that can
+ /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
+ /// this method may be used in cases where the permit must be valid for the
+ /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
+ /// essentially a reference count increment, comparable to [`Arc::clone`]),
+ /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
+ /// moved, it can be cloned prior to calling `try_reserve_owned`.
+ ///
+ /// If the channel is full this function will return a [`TrySendError`].
+ /// Since the sender is taken by value, the `TrySendError` returned in this
+ /// case contains the sender, so that it may be used again. Otherwise, if
+ /// there is a slot available, this method will return an [`OwnedPermit`]
+ /// that can then be used to [`send`] on the channel with a guaranteed slot.
+ /// This function is similar to [`reserve_owned`] except it does not await
+ /// for the slot to become available.
+ ///
+ /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
+ /// to the channel.
+ ///
+ /// [`OwnedPermit`]: OwnedPermit
+ /// [`send`]: OwnedPermit::send
+ /// [`reserve_owned`]: Sender::reserve_owned
+ /// [`Arc::clone`]: std::sync::Arc::clone
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.clone().try_reserve_owned().unwrap();
+ ///
+ /// // Trying to send directly on the `tx` will fail due to no
+ /// // available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Trying to reserve an additional slot on the `tx` will
+ /// // fail because there is no capacity.
+ /// assert!(tx.try_reserve().is_err());
+ ///
+ /// // Sending on the permit succeeds
+ /// permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// }
+ /// ```
+ pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
+ match self.chan.semaphore().0.try_acquire(1) {
+ Ok(_) => {}
+ Err(_) => return Err(TrySendError::Full(self)),
+ }
+
+ Ok(OwnedPermit {
+ chan: Some(self.chan),
+ })
+ }
+
/// Returns `true` if senders belong to the same channel.
///
/// # Examples
@@ -804,6 +971,8 @@ impl<T> Drop for Permit<'_, T> {
// Add the permit back to the semaphore
semaphore.add_permit();
+ // If this is the last sender for this channel, wake the receiver so
+ // that it can be notified that the channel is closed.
if semaphore.is_closed() && semaphore.is_idle() {
self.chan.wake_rx();
}
@@ -817,3 +986,123 @@ impl<T> fmt::Debug for Permit<'_, T> {
.finish()
}
}
+
+// ===== impl Permit =====
+
+impl<T> OwnedPermit<T> {
+ /// Sends a value using the reserved capacity.
+ ///
+ /// Capacity for the message has already been reserved. The message is sent
+ /// to the receiver and the permit is consumed. The operation will succeed
+ /// even if the receiver half has been closed. See [`Receiver::close`] for
+ /// more details on performing a clean shutdown.
+ ///
+ /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
+ /// the `OwnedPermit` was reserved.
+ ///
+ /// [`Receiver::close`]: Receiver::close
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, mut rx) = mpsc::channel(1);
+ ///
+ /// // Reserve capacity
+ /// let permit = tx.reserve_owned().await.unwrap();
+ ///
+ /// // Send a message on the permit, returning the sender.
+ /// let tx = permit.send(456);
+ ///
+ /// // The value sent on the permit is received
+ /// assert_eq!(rx.recv().await.unwrap(), 456);
+ ///
+ /// // We may now reuse `tx` to send another message.
+ /// tx.send(789).await.unwrap();
+ /// }
+ /// ```
+ pub fn send(mut self, value: T) -> Sender<T> {
+ let chan = self.chan.take().unwrap_or_else(|| {
+ unreachable!("OwnedPermit channel is only taken when the permit is moved")
+ });
+ chan.send(value);
+
+ Sender { chan }
+ }
+
+ /// Release the reserved capacity *without* sending a message, returning the
+ /// [`Sender`].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx) = mpsc::channel(1);
+ ///
+ /// // Clone the sender and reserve capacity
+ /// let permit = tx.clone().reserve_owned().await.unwrap();
+ ///
+ /// // Trying to send on the original `tx` will fail, since the `permit`
+ /// // has reserved all the available capacity.
+ /// assert!(tx.try_send(123).is_err());
+ ///
+ /// // Release the permit without sending a message, returning the clone
+ /// // of the sender.
+ /// let tx2 = permit.release();
+ ///
+ /// // We may now reuse `tx` to send another message.
+ /// tx.send(789).await.unwrap();
+ /// # drop(rx); drop(tx2);
+ /// }
+ /// ```
+ ///
+ /// [`Sender`]: Sender
+ pub fn release(mut self) -> Sender<T> {
+ use chan::Semaphore;
+
+ let chan = self.chan.take().unwrap_or_else(|| {
+ unreachable!("OwnedPermit channel is only taken when the permit is moved")
+ });
+
+ // Add the permit back to the semaphore
+ chan.semaphore().add_permit();
+ Sender { chan }
+ }
+}
+
+impl<T> Drop for OwnedPermit<T> {
+ fn drop(&mut self) {
+ use chan::Semaphore;
+
+ // Are we still holding onto the sender?
+ if let Some(chan) = self.chan.take() {
+ let semaphore = chan.semaphore();
+
+ // Add the permit back to the semaphore
+ semaphore.add_permit();
+
+ // If this `OwnedPermit` is holding the last sender for this
+ // channel, wake the receiver so that it can be notified that the
+ // channel is closed.
+ if semaphore.is_closed() && semaphore.is_idle() {
+ chan.wake_rx();
+ }
+ }
+
+ // Otherwise, do nothing.
+ }
+}
+
+impl<T> fmt::Debug for OwnedPermit<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("OwnedPermit")
+ .field("chan", &self.chan)
+ .finish()
+ }
+}