diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/stream_ext.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/collect.rs | 6 | ||||
-rw-r--r-- | src/stream_ext/then.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/throttle.rs | 4 | ||||
-rw-r--r-- | src/stream_ext/timeout.rs | 2 | ||||
-rw-r--r-- | src/wrappers/broadcast.rs | 2 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 34 |
7 files changed, 39 insertions, 13 deletions
diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 6cea7b5..52d3202 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -982,6 +982,8 @@ pub trait StreamExt: Stream { /// Slows down a stream by enforcing a delay between items. /// + /// The underlying timer behind this utility has a granularity of one millisecond. + /// /// # Example /// /// Create a throttled stream. diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index 4b157a9..8548b74 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -195,11 +195,7 @@ where } else { let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); - if let Err(err) = res { - Err(err) - } else { - unreachable!(); - } + Err(res.map(drop).unwrap_err()) } } } diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs index 7f6b5a2..cc7caa7 100644 --- a/src/stream_ext/then.rs +++ b/src/stream_ext/then.rs @@ -72,7 +72,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs index f36c66a..5000139 100644 --- a/src/stream_ext/throttle.rs +++ b/src/stream_ext/throttle.rs @@ -4,7 +4,6 @@ use crate::Stream; use tokio::time::{Duration, Instant, Sleep}; use std::future::Future; -use std::marker::Unpin; use std::pin::Pin; use std::task::{self, Poll}; @@ -41,8 +40,7 @@ pin_project! { } } -// XXX: are these safe if `T: !Unpin`? -impl<T: Unpin> Throttle<T> { +impl<T> Throttle<T> { /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &T { diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index 98d7cd5..a440d20 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -24,7 +24,7 @@ pin_project! { } /// Error returned by `Timeout`. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); impl<S: Stream> Timeout<S> { diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs index 10184bf..7110664 100644 --- a/src/wrappers/broadcast.rs +++ b/src/wrappers/broadcast.rs @@ -18,7 +18,7 @@ pub struct BroadcastStream<T> { } /// An error returned from the inner stream of a [`BroadcastStream`]. -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStreamRecvError { /// The receiver lagged too far behind. Attempting to receive again will /// return the oldest message still retained by the channel. diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index c682c9c..ec8ead0 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardless of whether it was the initial value or sent afterwards. +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream<T>::from_changes`]. /// /// # Examples /// @@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new(rx); /// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream<T>::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } @@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } } impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { |