aboutsummaryrefslogtreecommitdiff
path: root/src/sync/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/watch.rs')
-rw-r--r--src/sync/watch.rs95
1 files changed, 77 insertions, 18 deletions
diff --git a/src/sync/watch.rs b/src/sync/watch.rs
index 96d1d16..b5da218 100644
--- a/src/sync/watch.rs
+++ b/src/sync/watch.rs
@@ -123,9 +123,7 @@ pub mod error {
/// Error produced when sending a value fails.
#[derive(Debug)]
- pub struct SendError<T> {
- pub(crate) inner: T,
- }
+ pub struct SendError<T>(pub T);
// ===== impl SendError =====
@@ -165,6 +163,9 @@ mod state {
/// Snapshot of the state. The first bit is used as the CLOSED bit.
/// The remaining bits are used as the version.
+ ///
+ /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
+ /// receivers does not set it.
#[derive(Copy, Clone, Debug)]
pub(super) struct StateSnapshot(usize);
@@ -427,8 +428,8 @@ impl<T> Sender<T> {
/// every receiver has been dropped.
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
// This is pretty much only useful as a hint anyway, so synchronization isn't critical.
- if 0 == self.shared.ref_count_rx.load(Relaxed) {
- return Err(error::SendError { inner: value });
+ if 0 == self.receiver_count() {
+ return Err(error::SendError(value));
}
{
@@ -484,7 +485,7 @@ impl<T> Sender<T> {
/// assert!(tx.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
- self.shared.ref_count_rx.load(Relaxed) == 0
+ self.receiver_count() == 0
}
/// Completes when all receivers have dropped.
@@ -517,23 +518,81 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&self) {
- let notified = self.shared.notify_tx.notified();
+ while self.receiver_count() > 0 {
+ let notified = self.shared.notify_tx.notified();
- if self.shared.ref_count_rx.load(Relaxed) == 0 {
- return;
- }
+ if self.receiver_count() == 0 {
+ return;
+ }
- notified.await;
- debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed));
+ notified.await;
+ // The channel could have been reopened in the meantime by calling
+ // `subscribe`, so we loop again.
+ }
}
- cfg_signal_internal! {
- pub(crate) fn subscribe(&self) -> Receiver<T> {
- let shared = self.shared.clone();
- let version = shared.state.load().version();
+ /// Creates a new [`Receiver`] connected to this `Sender`.
+ ///
+ /// All messages sent before this call to `subscribe` are initially marked
+ /// as seen by the new `Receiver`.
+ ///
+ /// This method can be called even if there are no other receivers. In this
+ /// case, the channel is reopened.
+ ///
+ /// # Examples
+ ///
+ /// The new channel will receive messages sent on this `Sender`.
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, _rx) = watch::channel(0u64);
+ ///
+ /// tx.send(5).unwrap();
+ ///
+ /// let rx = tx.subscribe();
+ /// assert_eq!(5, *rx.borrow());
+ ///
+ /// tx.send(10).unwrap();
+ /// assert_eq!(10, *rx.borrow());
+ /// }
+ /// ```
+ ///
+ /// The most recent message is considered seen by the channel, so this test
+ /// is guaranteed to pass.
+ ///
+ /// ```
+ /// use tokio::sync::watch;
+ /// use tokio::time::Duration;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (tx, _rx) = watch::channel(0u64);
+ /// tx.send(5).unwrap();
+ /// let mut rx = tx.subscribe();
+ ///
+ /// tokio::spawn(async move {
+ /// // by spawning and sleeping, the message is sent after `main`
+ /// // hits the call to `changed`.
+ /// # if false {
+ /// tokio::time::sleep(Duration::from_millis(10)).await;
+ /// # }
+ /// tx.send(100).unwrap();
+ /// });
+ ///
+ /// rx.changed().await.unwrap();
+ /// assert_eq!(100, *rx.borrow());
+ /// }
+ /// ```
+ pub fn subscribe(&self) -> Receiver<T> {
+ let shared = self.shared.clone();
+ let version = shared.state.load().version();
- Receiver::from_shared(version, shared)
- }
+ // The CLOSED bit in the state tracks only whether the sender is
+ // dropped, so we do not need to unset it if this reopens the channel.
+ Receiver::from_shared(version, shared)
}
/// Returns the number of receivers that currently exist