summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/stream_ext.rs2
-rw-r--r--src/stream_ext/collect.rs6
-rw-r--r--src/stream_ext/then.rs2
-rw-r--r--src/stream_ext/throttle.rs4
-rw-r--r--src/stream_ext/timeout.rs2
-rw-r--r--src/wrappers/broadcast.rs2
-rw-r--r--src/wrappers/watch.rs34
7 files changed, 39 insertions, 13 deletions
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 6cea7b5..52d3202 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -982,6 +982,8 @@ pub trait StreamExt: Stream {
/// Slows down a stream by enforcing a delay between items.
///
+ /// The underlying timer behind this utility has a granularity of one millisecond.
+ ///
/// # Example
///
/// Create a throttled stream.
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index 4b157a9..8548b74 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -195,11 +195,7 @@ where
} else {
let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
- if let Err(err) = res {
- Err(err)
- } else {
- unreachable!();
- }
+ Err(res.map(drop).unwrap_err())
}
}
}
diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs
index 7f6b5a2..cc7caa7 100644
--- a/src/stream_ext/then.rs
+++ b/src/stream_ext/then.rs
@@ -72,7 +72,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs
index f36c66a..5000139 100644
--- a/src/stream_ext/throttle.rs
+++ b/src/stream_ext/throttle.rs
@@ -4,7 +4,6 @@ use crate::Stream;
use tokio::time::{Duration, Instant, Sleep};
use std::future::Future;
-use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};
@@ -41,8 +40,7 @@ pin_project! {
}
}
-// XXX: are these safe if `T: !Unpin`?
-impl<T: Unpin> Throttle<T> {
+impl<T> Throttle<T> {
/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &T {
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index 98d7cd5..a440d20 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -24,7 +24,7 @@ pin_project! {
}
/// Error returned by `Timeout`.
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Eq)]
pub struct Elapsed(());
impl<S: Stream> Timeout<S> {
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index 10184bf..7110664 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -18,7 +18,7 @@ pub struct BroadcastStream<T> {
}
/// An error returned from the inner stream of a [`BroadcastStream`].
-#[derive(Debug, PartialEq, Clone)]
+#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStreamRecvError {
/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index c682c9c..ec8ead0 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
-/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardless of whether it was the initial value or sent afterwards.
+/// This stream will start by yielding the current value when the WatchStream is polled,
+/// regardless of whether it was the initial value or sent afterwards,
+/// unless you use [`WatchStream<T>::from_changes`].
///
/// # Examples
///
@@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError;
/// let (tx, rx) = watch::channel("hello");
/// let mut rx = WatchStream::new(rx);
///
+/// // existing rx output with "hello" is ignored here
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// Example with [`WatchStream<T>::from_changes`]:
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::future::FutureExt;
+/// use tokio::sync::watch;
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::from_changes(rx);
+///
+/// // no output from rx is available at this point - let's check this:
+/// assert!(rx.next().now_or_never().is_none());
+///
/// tx.send("goodbye").unwrap();
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
@@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
+
+ /// Create a new `WatchStream` that waits for the value to be changed.
+ pub fn from_changes(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
}
impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {