summaryrefslogtreecommitdiff
path: root/src/wrappers/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wrappers/watch.rs')
-rw-r--r--src/wrappers/watch.rs14
1 files changed, 10 insertions, 4 deletions
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index 0ffd1b8..bd3a18a 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardles of whether it was the initial value or sent afterwards.
+/// regardless of whether it was the initial value or sent afterwards.
///
/// # Examples
///
@@ -59,7 +59,7 @@ async fn make_future<T: Clone + Send + Sync>(
(result, rx)
}
-impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
+impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
@@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let (result, rx) = ready!(self.inner.poll(cx));
+ let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
- let received = (*rx.borrow()).clone();
+ let received = (*rx.borrow_and_update()).clone();
self.inner.set(make_future(rx));
Poll::Ready(Some(received))
}
@@ -94,3 +94,9 @@ impl<T> fmt::Debug for WatchStream<T> {
f.debug_struct("WatchStream").finish()
}
}
+
+impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}