diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/stream_ext.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/collect.rs | 4 | ||||
-rw-r--r-- | src/stream_ext/timeout.rs | 13 | ||||
-rw-r--r-- | src/stream_map.rs | 6 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 6 |
6 files changed, 22 insertions, 11 deletions
@@ -10,7 +10,7 @@ unreachable_pub )] #![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(docsrs, deny(broken_intra_doc_links))] +#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 51532ee..1157c9e 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -515,7 +515,7 @@ pub trait StreamExt: Stream { /// Skip elements from the underlying stream while the provided predicate /// resolves to `true`. /// - /// This function, like [`Iterator::skip_while`], will ignore elemets from the + /// This function, like [`Iterator::skip_while`], will ignore elements from the /// stream until the predicate `f` resolves to `false`. Once one element /// returns false, the rest of the elements will be yielded. /// diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index 23f48b0..a33a6d6 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -113,7 +113,7 @@ impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { } fn finalize(_: sealed::Internal, collection: &mut String) -> String { - mem::replace(collection, String::new()) + mem::take(collection) } } @@ -132,7 +132,7 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> { } fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { - mem::replace(collection, vec![]) + mem::take(collection) } } diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index de17dc0..98d7cd5 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -69,7 +69,18 @@ impl<S: Stream> Stream for Timeout<S> { } fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() + let (lower, upper) = self.stream.size_hint(); + + // The timeout stream may insert an error before and after each message + // from the underlying stream, but no more than one error between each + // message. Hence the upper bound is computed as 2x+1. + + // Using a helper function to enable use of question mark operator. + fn twice_plus_one(value: Option<usize>) -> Option<usize> { + value?.checked_mul(2)?.checked_add(1) + } + + (lower, twice_plus_one(upper)) } } diff --git a/src/stream_map.rs b/src/stream_map.rs index 7fc136f..9dc529a 100644 --- a/src/stream_map.rs +++ b/src/stream_map.rs @@ -364,11 +364,11 @@ impl<K, V> StreamMap<K, V> { /// # Examples /// /// ``` - /// use std::collections::HashMap; + /// use tokio_stream::{StreamMap, pending}; /// - /// let mut a = HashMap::new(); + /// let mut a = StreamMap::new(); /// assert!(a.is_empty()); - /// a.insert(1, "a"); + /// a.insert(1, pending::<i32>()); /// assert!(!a.is_empty()); /// ``` pub fn is_empty(&self) -> bool { diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index 0ffd1b8..1daca10 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// /// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardles of whether it was the initial value or sent afterwards. +/// regardless of whether it was the initial value or sent afterwards. /// /// # Examples /// @@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let (result, rx) = ready!(self.inner.poll(cx)); + let (result, mut rx) = ready!(self.inner.poll(cx)); match result { Ok(_) => { - let received = (*rx.borrow()).clone(); + let received = (*rx.borrow_and_update()).clone(); self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } |