summaryrefslogtreecommitdiff
path: root/src/wrappers/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wrappers/watch.rs')
-rw-r--r--src/wrappers/watch.rs37
1 files changed, 36 insertions, 1 deletions
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index a98a72c..0ffd1b8 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -10,6 +10,41 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
+/// This stream will always start by yielding the current value when the WatchStream is polled,
+/// regardles of whether it was the initial value or sent afterwards.
+///
+/// # Examples
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// assert_eq!(rx.next().await, Some("hello"));
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
@@ -28,7 +63,7 @@ impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
- inner: ReusableBoxFuture::new(make_future(rx)),
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
}