diff options
Diffstat (limited to 'src/wrappers/watch.rs')
-rw-r--r-- | src/wrappers/watch.rs | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index 0ffd1b8..bd3a18a 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// /// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardles of whether it was the initial value or sent afterwards. +/// regardless of whether it was the initial value or sent afterwards. /// /// # Examples /// @@ -59,7 +59,7 @@ async fn make_future<T: Clone + Send + Sync>( (result, rx) } -impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> { +impl<T: 'static + Clone + Send + Sync> WatchStream<T> { /// Create a new `WatchStream`. pub fn new(rx: Receiver<T>) -> Self { Self { @@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let (result, rx) = ready!(self.inner.poll(cx)); + let (result, mut rx) = ready!(self.inner.poll(cx)); match result { Ok(_) => { - let received = (*rx.borrow()).clone(); + let received = (*rx.borrow_and_update()).clone(); self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } @@ -94,3 +94,9 @@ impl<T> fmt::Debug for WatchStream<T> { f.debug_struct("WatchStream").finish() } } + +impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} |