diff options
Diffstat (limited to 'src/sync/watch.rs')
-rw-r--r-- | src/sync/watch.rs | 95 |
1 files changed, 77 insertions, 18 deletions
diff --git a/src/sync/watch.rs b/src/sync/watch.rs index 96d1d16..b5da218 100644 --- a/src/sync/watch.rs +++ b/src/sync/watch.rs @@ -123,9 +123,7 @@ pub mod error { /// Error produced when sending a value fails. #[derive(Debug)] - pub struct SendError<T> { - pub(crate) inner: T, - } + pub struct SendError<T>(pub T); // ===== impl SendError ===== @@ -165,6 +163,9 @@ mod state { /// Snapshot of the state. The first bit is used as the CLOSED bit. /// The remaining bits are used as the version. + /// + /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all + /// receivers does not set it. #[derive(Copy, Clone, Debug)] pub(super) struct StateSnapshot(usize); @@ -427,8 +428,8 @@ impl<T> Sender<T> { /// every receiver has been dropped. pub fn send(&self, value: T) -> Result<(), error::SendError<T>> { // This is pretty much only useful as a hint anyway, so synchronization isn't critical. - if 0 == self.shared.ref_count_rx.load(Relaxed) { - return Err(error::SendError { inner: value }); + if 0 == self.receiver_count() { + return Err(error::SendError(value)); } { @@ -484,7 +485,7 @@ impl<T> Sender<T> { /// assert!(tx.is_closed()); /// ``` pub fn is_closed(&self) -> bool { - self.shared.ref_count_rx.load(Relaxed) == 0 + self.receiver_count() == 0 } /// Completes when all receivers have dropped. @@ -517,23 +518,81 @@ impl<T> Sender<T> { /// } /// ``` pub async fn closed(&self) { - let notified = self.shared.notify_tx.notified(); + while self.receiver_count() > 0 { + let notified = self.shared.notify_tx.notified(); - if self.shared.ref_count_rx.load(Relaxed) == 0 { - return; - } + if self.receiver_count() == 0 { + return; + } - notified.await; - debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed)); + notified.await; + // The channel could have been reopened in the meantime by calling + // `subscribe`, so we loop again. + } } - cfg_signal_internal! { - pub(crate) fn subscribe(&self) -> Receiver<T> { - let shared = self.shared.clone(); - let version = shared.state.load().version(); + /// Creates a new [`Receiver`] connected to this `Sender`. + /// + /// All messages sent before this call to `subscribe` are initially marked + /// as seen by the new `Receiver`. + /// + /// This method can be called even if there are no other receivers. In this + /// case, the channel is reopened. + /// + /// # Examples + /// + /// The new channel will receive messages sent on this `Sender`. + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, _rx) = watch::channel(0u64); + /// + /// tx.send(5).unwrap(); + /// + /// let rx = tx.subscribe(); + /// assert_eq!(5, *rx.borrow()); + /// + /// tx.send(10).unwrap(); + /// assert_eq!(10, *rx.borrow()); + /// } + /// ``` + /// + /// The most recent message is considered seen by the channel, so this test + /// is guaranteed to pass. + /// + /// ``` + /// use tokio::sync::watch; + /// use tokio::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, _rx) = watch::channel(0u64); + /// tx.send(5).unwrap(); + /// let mut rx = tx.subscribe(); + /// + /// tokio::spawn(async move { + /// // by spawning and sleeping, the message is sent after `main` + /// // hits the call to `changed`. + /// # if false { + /// tokio::time::sleep(Duration::from_millis(10)).await; + /// # } + /// tx.send(100).unwrap(); + /// }); + /// + /// rx.changed().await.unwrap(); + /// assert_eq!(100, *rx.borrow()); + /// } + /// ``` + pub fn subscribe(&self) -> Receiver<T> { + let shared = self.shared.clone(); + let version = shared.state.load().version(); - Receiver::from_shared(version, shared) - } + // The CLOSED bit in the state tracks only whether the sender is + // dropped, so we do not need to unset it if this reopens the channel. + Receiver::from_shared(version, shared) } /// Returns the number of receivers that currently exist |