aboutsummaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/mod.rs5
-rw-r--r--src/sync/mpsc/bounded.rs6
-rw-r--r--src/sync/mpsc/error.rs4
-rw-r--r--src/sync/notify.rs2
-rw-r--r--src/sync/semaphore.rs219
-rw-r--r--src/sync/task/atomic_waker.rs2
-rw-r--r--src/sync/watch.rs22
7 files changed, 253 insertions, 7 deletions
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
index 5f97c1a..457e6ab 100644
--- a/src/sync/mod.rs
+++ b/src/sync/mod.rs
@@ -428,6 +428,11 @@
//! bounding of any kind.
cfg_sync! {
+ /// Named future types.
+ pub mod futures {
+ pub use super::notify::Notified;
+ }
+
mod barrier;
pub use barrier::{Barrier, BarrierWaitResult};
diff --git a/src/sync/mpsc/bounded.rs b/src/sync/mpsc/bounded.rs
index ce857d7..cfd8da0 100644
--- a/src/sync/mpsc/bounded.rs
+++ b/src/sync/mpsc/bounded.rs
@@ -65,7 +65,7 @@ pub struct Receiver<T> {
/// with backpressure.
///
/// The channel will buffer up to the provided number of messages. Once the
-/// buffer is full, attempts to `send` new messages will wait until a message is
+/// buffer is full, attempts to send new messages will wait until a message is
/// received from the channel. The provided buffer capacity must be at least 1.
///
/// All data sent on `Sender` will become available on `Receiver` in the same
@@ -76,7 +76,7 @@ pub struct Receiver<T> {
///
/// If the `Receiver` is disconnected while trying to `send`, the `send` method
/// will return a `SendError`. Similarly, if `Sender` is disconnected while
-/// trying to `recv`, the `recv` method will return a `RecvError`.
+/// trying to `recv`, the `recv` method will return `None`.
///
/// # Panics
///
@@ -887,7 +887,7 @@ impl<T> Sender<T> {
/// let permit = tx.reserve().await.unwrap();
/// assert_eq!(tx.capacity(), 4);
///
- /// // Sending and receiving a value increases the caapcity by one.
+ /// // Sending and receiving a value increases the capacity by one.
/// permit.send(());
/// rx.recv().await.unwrap();
/// assert_eq!(tx.capacity(), 5);
diff --git a/src/sync/mpsc/error.rs b/src/sync/mpsc/error.rs
index a2d2824..0d25ad3 100644
--- a/src/sync/mpsc/error.rs
+++ b/src/sync/mpsc/error.rs
@@ -55,14 +55,18 @@ impl<T> From<SendError<T>> for TrySendError<T> {
/// Error returned by `Receiver`.
#[derive(Debug)]
+#[doc(hidden)]
+#[deprecated(note = "This type is unused because recv returns an Option.")]
pub struct RecvError(());
+#[allow(deprecated)]
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
+#[allow(deprecated)]
impl Error for RecvError {}
cfg_time! {
diff --git a/src/sync/notify.rs b/src/sync/notify.rs
index 5d2132f..07be759 100644
--- a/src/sync/notify.rs
+++ b/src/sync/notify.rs
@@ -140,7 +140,7 @@ struct Waiter {
_p: PhantomPinned,
}
-/// Future returned from `notified()`
+/// Future returned from [`Notify::notified()`]
#[derive(Debug)]
pub struct Notified<'a> {
/// The `Notify` being received on.
diff --git a/src/sync/semaphore.rs b/src/sync/semaphore.rs
index af75042..5d42d1c 100644
--- a/src/sync/semaphore.rs
+++ b/src/sync/semaphore.rs
@@ -24,7 +24,55 @@ use std::sync::Arc;
/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
/// utility.
///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```
+/// use tokio::sync::{Semaphore, TryAcquireError};
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let semaphore = Semaphore::new(3);
+///
+/// let a_permit = semaphore.acquire().await.unwrap();
+/// let two_permits = semaphore.acquire_many(2).await.unwrap();
+///
+/// assert_eq!(semaphore.available_permits(), 0);
+///
+/// let permit_attempt = semaphore.try_acquire();
+/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
+/// }
+/// ```
+///
+/// Use [`Semaphore::acquire_owned`] to move permits across tasks:
+///
+/// ```
+/// use std::sync::Arc;
+/// use tokio::sync::Semaphore;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let semaphore = Arc::new(Semaphore::new(3));
+/// let mut join_handles = Vec::new();
+///
+/// for _ in 0..5 {
+/// let permit = semaphore.clone().acquire_owned().await.unwrap();
+/// join_handles.push(tokio::spawn(async move {
+/// // perform task...
+/// // explicitly own `permit` in the task
+/// drop(permit);
+/// }));
+/// }
+///
+/// for handle in join_handles {
+/// handle.await.unwrap();
+/// }
+/// }
+/// ```
+///
/// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
+/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
#[derive(Debug)]
pub struct Semaphore {
/// The low level semaphore
@@ -79,6 +127,15 @@ impl Semaphore {
}
/// Creates a new semaphore with the initial number of permits.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Semaphore;
+ ///
+ /// static SEM: Semaphore = Semaphore::const_new(10);
+ /// ```
+ ///
#[cfg(all(feature = "parking_lot", not(all(loom, test))))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new(permits: usize) -> Self {
@@ -105,6 +162,26 @@ impl Semaphore {
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permit.
///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Semaphore::new(2);
+ ///
+ /// let permit_1 = semaphore.acquire().await.unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.acquire().await.unwrap();
+ /// assert_eq!(semaphore.available_permits(), 0);
+ ///
+ /// drop(permit_1);
+ /// assert_eq!(semaphore.available_permits(), 1);
+ /// }
+ /// ```
+ ///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
@@ -121,6 +198,20 @@ impl Semaphore {
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permits.
///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Semaphore::new(5);
+ ///
+ /// let permit = semaphore.acquire_many(3).await.unwrap();
+ /// assert_eq!(semaphore.available_permits(), 2);
+ /// }
+ /// ```
+ ///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
@@ -137,6 +228,25 @@ impl Semaphore {
/// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
/// this returns a [`SemaphorePermit`] representing the acquired permits.
///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Semaphore::new(2);
+ ///
+ /// let permit_1 = semaphore.try_acquire().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.try_acquire().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 0);
+ ///
+ /// let permit_3 = semaphore.try_acquire();
+ /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
+ ///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
@@ -153,8 +263,24 @@ impl Semaphore {
/// Tries to acquire `n` permits from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
- /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
- /// this returns a [`SemaphorePermit`] representing the acquired permits.
+ /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
+ /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Semaphore::new(4);
+ ///
+ /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.try_acquire_many(2);
+ /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
@@ -176,6 +302,32 @@ impl Semaphore {
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(3));
+ /// let mut join_handles = Vec::new();
+ ///
+ /// for _ in 0..5 {
+ /// let permit = semaphore.clone().acquire_owned().await.unwrap();
+ /// join_handles.push(tokio::spawn(async move {
+ /// // perform task...
+ /// // explicitly own `permit` in the task
+ /// drop(permit);
+ /// }));
+ /// }
+ ///
+ /// for handle in join_handles {
+ /// handle.await.unwrap();
+ /// }
+ /// }
+ /// ```
+ ///
/// [`Arc`]: std::sync::Arc
/// [`AcquireError`]: crate::sync::AcquireError
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
@@ -194,6 +346,32 @@ impl Semaphore {
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::Semaphore;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(10));
+ /// let mut join_handles = Vec::new();
+ ///
+ /// for _ in 0..5 {
+ /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
+ /// join_handles.push(tokio::spawn(async move {
+ /// // perform task...
+ /// // explicitly own `permit` in the task
+ /// drop(permit);
+ /// }));
+ /// }
+ ///
+ /// for handle in join_handles {
+ /// handle.await.unwrap();
+ /// }
+ /// }
+ /// ```
+ ///
/// [`Arc`]: std::sync::Arc
/// [`AcquireError`]: crate::sync::AcquireError
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
@@ -216,6 +394,26 @@ impl Semaphore {
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(2));
+ ///
+ /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
+ /// assert_eq!(semaphore.available_permits(), 0);
+ ///
+ /// let permit_3 = semaphore.try_acquire_owned();
+ /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
+ ///
/// [`Arc`]: std::sync::Arc
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
@@ -238,6 +436,23 @@ impl Semaphore {
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use tokio::sync::{Semaphore, TryAcquireError};
+ ///
+ /// # fn main() {
+ /// let semaphore = Arc::new(Semaphore::new(4));
+ ///
+ /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
+ /// assert_eq!(semaphore.available_permits(), 1);
+ ///
+ /// let permit_2 = semaphore.try_acquire_many_owned(2);
+ /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
+ /// # }
+ /// ```
+ ///
/// [`Arc`]: std::sync::Arc
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
diff --git a/src/sync/task/atomic_waker.rs b/src/sync/task/atomic_waker.rs
index 5917204..8616007 100644
--- a/src/sync/task/atomic_waker.rs
+++ b/src/sync/task/atomic_waker.rs
@@ -29,7 +29,7 @@ pub(crate) struct AtomicWaker {
// `AtomicWaker` is a multi-consumer, single-producer transfer cell. The cell
// stores a `Waker` value produced by calls to `register` and many threads can
-// race to take the waker by calling `wake.
+// race to take the waker by calling `wake`.
//
// If a new `Waker` instance is produced by calling `register` before an existing
// one is consumed, then the existing one is overwritten.
diff --git a/src/sync/watch.rs b/src/sync/watch.rs
index db65e5a..42d417a 100644
--- a/src/sync/watch.rs
+++ b/src/sync/watch.rs
@@ -417,6 +417,28 @@ impl<T> Sender<T> {
Receiver::from_shared(version, shared)
}
}
+
+ /// Returns the number of receivers that currently exist
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, rx1) = watch::channel("hello");
+ ///
+ /// assert_eq!(1, tx.receiver_count());
+ ///
+ /// let mut _rx2 = rx1.clone();
+ ///
+ /// assert_eq!(2, tx.receiver_count());
+ /// }
+ /// ```
+ pub fn receiver_count(&self) -> usize {
+ self.shared.ref_count_rx.load(Relaxed)
+ }
}
impl<T> Drop for Sender<T> {