diff options
Diffstat (limited to 'src/sync')
-rw-r--r-- | src/sync/mod.rs | 5 | ||||
-rw-r--r-- | src/sync/mpsc/bounded.rs | 6 | ||||
-rw-r--r-- | src/sync/mpsc/error.rs | 4 | ||||
-rw-r--r-- | src/sync/notify.rs | 2 | ||||
-rw-r--r-- | src/sync/semaphore.rs | 219 | ||||
-rw-r--r-- | src/sync/task/atomic_waker.rs | 2 | ||||
-rw-r--r-- | src/sync/watch.rs | 22 |
7 files changed, 253 insertions, 7 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 5f97c1a..457e6ab 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -428,6 +428,11 @@ //! bounding of any kind. cfg_sync! { + /// Named future types. + pub mod futures { + pub use super::notify::Notified; + } + mod barrier; pub use barrier::{Barrier, BarrierWaitResult}; diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs index ce857d7..cfd8da0 100644 --- a/src/sync/mpsc/bounded.rs +++ b/src/sync/mpsc/bounded.rs @@ -65,7 +65,7 @@ pub struct Receiver<T> { /// with backpressure. /// /// The channel will buffer up to the provided number of messages. Once the -/// buffer is full, attempts to `send` new messages will wait until a message is +/// buffer is full, attempts to send new messages will wait until a message is /// received from the channel. The provided buffer capacity must be at least 1. /// /// All data sent on `Sender` will become available on `Receiver` in the same @@ -76,7 +76,7 @@ pub struct Receiver<T> { /// /// If the `Receiver` is disconnected while trying to `send`, the `send` method /// will return a `SendError`. Similarly, if `Sender` is disconnected while -/// trying to `recv`, the `recv` method will return a `RecvError`. +/// trying to `recv`, the `recv` method will return `None`. /// /// # Panics /// @@ -887,7 +887,7 @@ impl<T> Sender<T> { /// let permit = tx.reserve().await.unwrap(); /// assert_eq!(tx.capacity(), 4); /// - /// // Sending and receiving a value increases the caapcity by one. + /// // Sending and receiving a value increases the capacity by one. /// permit.send(()); /// rx.recv().await.unwrap(); /// assert_eq!(tx.capacity(), 5); diff --git a/src/sync/mpsc/error.rs b/src/sync/mpsc/error.rs index a2d2824..0d25ad3 100644 --- a/src/sync/mpsc/error.rs +++ b/src/sync/mpsc/error.rs @@ -55,14 +55,18 @@ impl<T> From<SendError<T>> for TrySendError<T> { /// Error returned by `Receiver`. #[derive(Debug)] +#[doc(hidden)] +#[deprecated(note = "This type is unused because recv returns an Option.")] pub struct RecvError(()); +#[allow(deprecated)] impl fmt::Display for RecvError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { write!(fmt, "channel closed") } } +#[allow(deprecated)] impl Error for RecvError {} cfg_time! { diff --git a/src/sync/notify.rs b/src/sync/notify.rs index 5d2132f..07be759 100644 --- a/src/sync/notify.rs +++ b/src/sync/notify.rs @@ -140,7 +140,7 @@ struct Waiter { _p: PhantomPinned, } -/// Future returned from `notified()` +/// Future returned from [`Notify::notified()`] #[derive(Debug)] pub struct Notified<'a> { /// The `Notify` being received on. diff --git a/src/sync/semaphore.rs b/src/sync/semaphore.rs index af75042..5d42d1c 100644 --- a/src/sync/semaphore.rs +++ b/src/sync/semaphore.rs @@ -24,7 +24,55 @@ use std::sync::Arc; /// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`] /// utility. /// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// use tokio::sync::{Semaphore, TryAcquireError}; +/// +/// #[tokio::main] +/// async fn main() { +/// let semaphore = Semaphore::new(3); +/// +/// let a_permit = semaphore.acquire().await.unwrap(); +/// let two_permits = semaphore.acquire_many(2).await.unwrap(); +/// +/// assert_eq!(semaphore.available_permits(), 0); +/// +/// let permit_attempt = semaphore.try_acquire(); +/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits)); +/// } +/// ``` +/// +/// Use [`Semaphore::acquire_owned`] to move permits across tasks: +/// +/// ``` +/// use std::sync::Arc; +/// use tokio::sync::Semaphore; +/// +/// #[tokio::main] +/// async fn main() { +/// let semaphore = Arc::new(Semaphore::new(3)); +/// let mut join_handles = Vec::new(); +/// +/// for _ in 0..5 { +/// let permit = semaphore.clone().acquire_owned().await.unwrap(); +/// join_handles.push(tokio::spawn(async move { +/// // perform task... +/// // explicitly own `permit` in the task +/// drop(permit); +/// })); +/// } +/// +/// for handle in join_handles { +/// handle.await.unwrap(); +/// } +/// } +/// ``` +/// /// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html +/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned #[derive(Debug)] pub struct Semaphore { /// The low level semaphore @@ -79,6 +127,15 @@ impl Semaphore { } /// Creates a new semaphore with the initial number of permits. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Semaphore; + /// + /// static SEM: Semaphore = Semaphore::const_new(10); + /// ``` + /// #[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] pub const fn const_new(permits: usize) -> Self { @@ -105,6 +162,26 @@ impl Semaphore { /// Otherwise, this returns a [`SemaphorePermit`] representing the /// acquired permit. /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Semaphore; + /// + /// #[tokio::main] + /// async fn main() { + /// let semaphore = Semaphore::new(2); + /// + /// let permit_1 = semaphore.acquire().await.unwrap(); + /// assert_eq!(semaphore.available_permits(), 1); + /// + /// let permit_2 = semaphore.acquire().await.unwrap(); + /// assert_eq!(semaphore.available_permits(), 0); + /// + /// drop(permit_1); + /// assert_eq!(semaphore.available_permits(), 1); + /// } + /// ``` + /// /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> { @@ -121,6 +198,20 @@ impl Semaphore { /// Otherwise, this returns a [`SemaphorePermit`] representing the /// acquired permits. /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Semaphore; + /// + /// #[tokio::main] + /// async fn main() { + /// let semaphore = Semaphore::new(5); + /// + /// let permit = semaphore.acquire_many(3).await.unwrap(); + /// assert_eq!(semaphore.available_permits(), 2); + /// } + /// ``` + /// /// [`AcquireError`]: crate::sync::AcquireError /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> { @@ -137,6 +228,25 @@ impl Semaphore { /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, /// this returns a [`SemaphorePermit`] representing the acquired permits. /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Semaphore, TryAcquireError}; + /// + /// # fn main() { + /// let semaphore = Semaphore::new(2); + /// + /// let permit_1 = semaphore.try_acquire().unwrap(); + /// assert_eq!(semaphore.available_permits(), 1); + /// + /// let permit_2 = semaphore.try_acquire().unwrap(); + /// assert_eq!(semaphore.available_permits(), 0); + /// + /// let permit_3 = semaphore.try_acquire(); + /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits)); + /// # } + /// ``` + /// /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits /// [`SemaphorePermit`]: crate::sync::SemaphorePermit @@ -153,8 +263,24 @@ impl Semaphore { /// Tries to acquire `n` permits from the semaphore. /// /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] - /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, - /// this returns a [`SemaphorePermit`] representing the acquired permits. + /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left. + /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{Semaphore, TryAcquireError}; + /// + /// # fn main() { + /// let semaphore = Semaphore::new(4); + /// + /// let permit_1 = semaphore.try_acquire_many(3).unwrap(); + /// assert_eq!(semaphore.available_permits(), 1); + /// + /// let permit_2 = semaphore.try_acquire_many(2); + /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits)); + /// # } + /// ``` /// /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits @@ -176,6 +302,32 @@ impl Semaphore { /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the /// acquired permit. /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::Semaphore; + /// + /// #[tokio::main] + /// async fn main() { + /// let semaphore = Arc::new(Semaphore::new(3)); + /// let mut join_handles = Vec::new(); + /// + /// for _ in 0..5 { + /// let permit = semaphore.clone().acquire_owned().await.unwrap(); + /// join_handles.push(tokio::spawn(async move { + /// // perform task... + /// // explicitly own `permit` in the task + /// drop(permit); + /// })); + /// } + /// + /// for handle in join_handles { + /// handle.await.unwrap(); + /// } + /// } + /// ``` + /// /// [`Arc`]: std::sync::Arc /// [`AcquireError`]: crate::sync::AcquireError /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit @@ -194,6 +346,32 @@ impl Semaphore { /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the /// acquired permit. /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::Semaphore; + /// + /// #[tokio::main] + /// async fn main() { + /// let semaphore = Arc::new(Semaphore::new(10)); + /// let mut join_handles = Vec::new(); + /// + /// for _ in 0..5 { + /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap(); + /// join_handles.push(tokio::spawn(async move { + /// // perform task... + /// // explicitly own `permit` in the task + /// drop(permit); + /// })); + /// } + /// + /// for handle in join_handles { + /// handle.await.unwrap(); + /// } + /// } + /// ``` + /// /// [`Arc`]: std::sync::Arc /// [`AcquireError`]: crate::sync::AcquireError /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit @@ -216,6 +394,26 @@ impl Semaphore { /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the /// acquired permit. /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::{Semaphore, TryAcquireError}; + /// + /// # fn main() { + /// let semaphore = Arc::new(Semaphore::new(2)); + /// + /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap(); + /// assert_eq!(semaphore.available_permits(), 1); + /// + /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap(); + /// assert_eq!(semaphore.available_permits(), 0); + /// + /// let permit_3 = semaphore.try_acquire_owned(); + /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits)); + /// # } + /// ``` + /// /// [`Arc`]: std::sync::Arc /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits @@ -238,6 +436,23 @@ impl Semaphore { /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the /// acquired permit. /// + /// # Examples + /// + /// ``` + /// use std::sync::Arc; + /// use tokio::sync::{Semaphore, TryAcquireError}; + /// + /// # fn main() { + /// let semaphore = Arc::new(Semaphore::new(4)); + /// + /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap(); + /// assert_eq!(semaphore.available_permits(), 1); + /// + /// let permit_2 = semaphore.try_acquire_many_owned(2); + /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits)); + /// # } + /// ``` + /// /// [`Arc`]: std::sync::Arc /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits diff --git a/src/sync/task/atomic_waker.rs b/src/sync/task/atomic_waker.rs index 5917204..8616007 100644 --- a/src/sync/task/atomic_waker.rs +++ b/src/sync/task/atomic_waker.rs @@ -29,7 +29,7 @@ pub(crate) struct AtomicWaker { // `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell // stores a `Waker` value produced by calls to `register` and many threads can -// race to take the waker by calling `wake. +// race to take the waker by calling `wake`. // // If a new `Waker` instance is produced by calling `register` before an existing // one is consumed, then the existing one is overwritten. diff --git a/src/sync/watch.rs b/src/sync/watch.rs index db65e5a..42d417a 100644 --- a/src/sync/watch.rs +++ b/src/sync/watch.rs @@ -417,6 +417,28 @@ impl<T> Sender<T> { Receiver::from_shared(version, shared) } } + + /// Returns the number of receivers that currently exist + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx1) = watch::channel("hello"); + /// + /// assert_eq!(1, tx.receiver_count()); + /// + /// let mut _rx2 = rx1.clone(); + /// + /// assert_eq!(2, tx.receiver_count()); + /// } + /// ``` + pub fn receiver_count(&self) -> usize { + self.shared.ref_count_rx.load(Relaxed) + } } impl<T> Drop for Sender<T> { |