summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs2
-rw-r--r--src/stream_ext.rs2
-rw-r--r--src/stream_ext/collect.rs4
-rw-r--r--src/stream_ext/timeout.rs13
-rw-r--r--src/stream_map.rs6
-rw-r--r--src/wrappers/watch.rs6
6 files changed, 22 insertions, 11 deletions
diff --git a/src/lib.rs b/src/lib.rs
index af99488..b7f232f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,7 +10,7 @@
unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
-#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 51532ee..1157c9e 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -515,7 +515,7 @@ pub trait StreamExt: Stream {
/// Skip elements from the underlying stream while the provided predicate
/// resolves to `true`.
///
- /// This function, like [`Iterator::skip_while`], will ignore elemets from the
+ /// This function, like [`Iterator::skip_while`], will ignore elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false, the rest of the elements will be yielded.
///
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index 23f48b0..a33a6d6 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -113,7 +113,7 @@ impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
}
fn finalize(_: sealed::Internal, collection: &mut String) -> String {
- mem::replace(collection, String::new())
+ mem::take(collection)
}
}
@@ -132,7 +132,7 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> {
}
fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
- mem::replace(collection, vec![])
+ mem::take(collection)
}
}
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index de17dc0..98d7cd5 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -69,7 +69,18 @@ impl<S: Stream> Stream for Timeout<S> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
+ let (lower, upper) = self.stream.size_hint();
+
+ // The timeout stream may insert an error before and after each message
+ // from the underlying stream, but no more than one error between each
+ // message. Hence the upper bound is computed as 2x+1.
+
+ // Using a helper function to enable use of question mark operator.
+ fn twice_plus_one(value: Option<usize>) -> Option<usize> {
+ value?.checked_mul(2)?.checked_add(1)
+ }
+
+ (lower, twice_plus_one(upper))
}
}
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 7fc136f..9dc529a 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -364,11 +364,11 @@ impl<K, V> StreamMap<K, V> {
/// # Examples
///
/// ```
- /// use std::collections::HashMap;
+ /// use tokio_stream::{StreamMap, pending};
///
- /// let mut a = HashMap::new();
+ /// let mut a = StreamMap::new();
/// assert!(a.is_empty());
- /// a.insert(1, "a");
+ /// a.insert(1, pending::<i32>());
/// assert!(!a.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index 0ffd1b8..1daca10 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardles of whether it was the initial value or sent afterwards.
+/// regardless of whether it was the initial value or sent afterwards.
///
/// # Examples
///
@@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let (result, rx) = ready!(self.inner.poll(cx));
+ let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
- let received = (*rx.borrow()).clone();
+ let received = (*rx.borrow_and_update()).clone();
self.inner.set(make_future(rx));
Poll::Ready(Some(received))
}