diff options
author | Joel Galenson <jgalenson@google.com> | 2021-08-09 10:50:19 -0700 |
---|---|---|
committer | Joel Galenson <jgalenson@google.com> | 2021-08-09 10:50:19 -0700 |
commit | bd64f25cb3cb0c116dd655ee5512a1c00f05ca37 (patch) | |
tree | b059d9c53f27d43cffdf23ece636b1e62411292a | |
parent | 56b11130b7e9da8f46cfb9ff1c63172da9d63b7b (diff) | |
download | tokio-stream-bd64f25cb3cb0c116dd655ee5512a1c00f05ca37.tar.gz |
Upgrade rust/crates/tokio-stream to 0.1.7
Test: make
Change-Id: I75d4af0b6011d83c434385fe323ec8694cbe6f28
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 22 | ||||
-rw-r--r-- | CHANGELOG.md | 10 | ||||
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | Cargo.toml.orig | 6 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/stream_ext.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/collect.rs | 4 | ||||
-rw-r--r-- | src/stream_ext/timeout.rs | 13 | ||||
-rw-r--r-- | src/stream_map.rs | 6 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 6 | ||||
-rw-r--r-- | tests/async_send_sync.rs | 2 | ||||
-rw-r--r-- | tests/stream_chain.rs | 4 | ||||
-rw-r--r-- | tests/stream_merge.rs | 4 | ||||
-rw-r--r-- | tests/watch.rs | 27 |
16 files changed, 87 insertions, 37 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 44a2cd9..749a519 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "aaa150d211a4f5c621369746da0804a3b786b861" + "sha1": "5d61c997e9281e00f35c7ea426786996945f47f5" } } @@ -41,24 +41,24 @@ rust_library { // autocfg-1.0.1 // bytes-1.0.1 "default,std" // cfg-if-1.0.0 -// futures-core-0.3.15 "alloc,default,std" -// instant-0.1.9 -// libc-0.2.94 "default,std" +// futures-core-0.3.16 "alloc,default,std" +// instant-0.1.10 +// libc-0.2.98 "default,std" // lock_api-0.4.4 // log-0.4.14 // memchr-2.4.0 "default,std" -// mio-0.7.11 "default,net,os-ext,os-poll,os-util,tcp,udp,uds" +// mio-0.7.13 "default,net,os-ext,os-poll,os-util,tcp,udp,uds" // num_cpus-1.13.0 -// once_cell-1.7.2 "alloc,default,race,std" +// once_cell-1.8.0 "alloc,default,race,std" // parking_lot-0.11.1 "default" // parking_lot_core-0.8.3 -// pin-project-lite-0.2.6 -// proc-macro2-1.0.26 "default,proc-macro" +// pin-project-lite-0.2.7 +// proc-macro2-1.0.28 "default,proc-macro" // quote-1.0.9 "default,proc-macro" // scopeguard-1.1.0 -// signal-hook-registry-1.3.0 +// signal-hook-registry-1.4.0 // smallvec-1.6.1 -// syn-1.0.72 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut" -// tokio-1.6.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi" -// tokio-macros-1.2.0 +// syn-1.0.74 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut" +// tokio-1.9.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi" +// tokio-macros-1.3.0 // unicode-xid-0.2.2 "default" diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c59804..a0cdef0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +# 0.1.7 (July 7, 2021) + +### Fixed + +- sync: fix watch wrapper ([#3914]) +- time: fix `Timeout::size_hint` ([#3902]) + +[#3902]: https://github.com/tokio-rs/tokio/pull/3902 +[#3914]: https://github.com/tokio-rs/tokio/pull/3914 + # 0.1.6 (May 14, 2021) ### Added @@ -13,11 +13,11 @@ [package] edition = "2018" name = "tokio-stream" -version = "0.1.6" +version = "0.1.7" authors = ["Tokio Contributors <team@tokio.rs>"] description = "Utilities to work with `Stream` and `tokio`.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.7/tokio_stream" categories = ["asynchronous"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" @@ -31,7 +31,7 @@ version = "0.3.0" version = "0.2.0" [dependencies.tokio] -version = "1.2.0" +version = "1.8.0" features = ["sync"] [dependencies.tokio-util] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 5c95917..911657c 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -6,13 +6,13 @@ name = "tokio-stream" # - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.6" +version = "0.1.7" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.7/tokio_stream" description = """ Utilities to work with `Stream` and `tokio`. """ @@ -30,7 +30,7 @@ signal = ["tokio/signal"] [dependencies] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" -tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] } +tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] } tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true } [dev-dependencies] @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.6.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.7.crate" } - version: "0.1.6" + version: "0.1.7" license_type: NOTICE last_upgrade_date { year: 2021 - month: 5 - day: 19 + month: 8 + day: 9 } } @@ -10,7 +10,7 @@ unreachable_pub )] #![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(docsrs, deny(broken_intra_doc_links))] +#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 51532ee..1157c9e 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -515,7 +515,7 @@ pub trait StreamExt: Stream { /// Skip elements from the underlying stream while the provided predicate /// resolves to `true`. /// - /// This function, like [`Iterator::skip_while`], will ignore elemets from the + /// This function, like [`Iterator::skip_while`], will ignore elements from the /// stream until the predicate `f` resolves to `false`. Once one element /// returns false, the rest of the elements will be yielded. /// diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index 23f48b0..a33a6d6 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -113,7 +113,7 @@ impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { } fn finalize(_: sealed::Internal, collection: &mut String) -> String { - mem::replace(collection, String::new()) + mem::take(collection) } } @@ -132,7 +132,7 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> { } fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { - mem::replace(collection, vec![]) + mem::take(collection) } } diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index de17dc0..98d7cd5 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -69,7 +69,18 @@ impl<S: Stream> Stream for Timeout<S> { } fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() + let (lower, upper) = self.stream.size_hint(); + + // The timeout stream may insert an error before and after each message + // from the underlying stream, but no more than one error between each + // message. Hence the upper bound is computed as 2x+1. + + // Using a helper function to enable use of question mark operator. + fn twice_plus_one(value: Option<usize>) -> Option<usize> { + value?.checked_mul(2)?.checked_add(1) + } + + (lower, twice_plus_one(upper)) } } diff --git a/src/stream_map.rs b/src/stream_map.rs index 7fc136f..9dc529a 100644 --- a/src/stream_map.rs +++ b/src/stream_map.rs @@ -364,11 +364,11 @@ impl<K, V> StreamMap<K, V> { /// # Examples /// /// ``` - /// use std::collections::HashMap; + /// use tokio_stream::{StreamMap, pending}; /// - /// let mut a = HashMap::new(); + /// let mut a = StreamMap::new(); /// assert!(a.is_empty()); - /// a.insert(1, "a"); + /// a.insert(1, pending::<i32>()); /// assert!(!a.is_empty()); /// ``` pub fn is_empty(&self) -> bool { diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index 0ffd1b8..1daca10 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// /// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardles of whether it was the initial value or sent afterwards. +/// regardless of whether it was the initial value or sent afterwards. /// /// # Examples /// @@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let (result, rx) = ready!(self.inner.poll(cx)); + let (result, mut rx) = ready!(self.inner.poll(cx)); match result { Ok(_) => { - let received = (*rx.borrow()).clone(); + let received = (*rx.borrow_and_update()).clone(); self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs index c06bebd..f1c8b4e 100644 --- a/tests/async_send_sync.rs +++ b/tests/async_send_sync.rs @@ -1,3 +1,5 @@ +#![allow(clippy::diverging_sub_expression)] + use std::rc::Rc; #[allow(dead_code)] diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs index 759de30..f3b7edb 100644 --- a/tests/stream_chain.rs +++ b/tests/stream_chain.rs @@ -89,12 +89,12 @@ fn size_overflow() { } fn size_hint(&self) -> (usize, Option<usize>) { - (usize::max_value(), Some(usize::max_value())) + (usize::MAX, Some(usize::MAX)) } } let m1 = Monster; let m2 = Monster; let m = m1.chain(m2); - assert_eq!(m.size_hint(), (usize::max_value(), None)); + assert_eq!(m.size_hint(), (usize::MAX, None)); } diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs index 69cd568..f603bcc 100644 --- a/tests/stream_merge.rs +++ b/tests/stream_merge.rs @@ -72,12 +72,12 @@ fn size_overflow() { } fn size_hint(&self) -> (usize, Option<usize>) { - (usize::max_value(), Some(usize::max_value())) + (usize::MAX, Some(usize::MAX)) } } let m1 = Monster; let m2 = Monster; let m = m1.merge(m2); - assert_eq!(m.size_hint(), (usize::max_value(), None)); + assert_eq!(m.size_hint(), (usize::MAX, None)); } diff --git a/tests/watch.rs b/tests/watch.rs new file mode 100644 index 0000000..92bcdf4 --- /dev/null +++ b/tests/watch.rs @@ -0,0 +1,27 @@ +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::StreamExt; + +#[tokio::test] +async fn message_not_twice() { + let (tx, rx) = watch::channel("hello"); + + let mut counter = 0; + let mut stream = WatchStream::new(rx).map(move |payload| { + println!("{}", payload); + if payload == "goodbye" { + counter += 1; + } + if counter >= 2 { + panic!("too many goodbyes"); + } + }); + + let task = tokio::spawn(async move { while stream.next().await.is_some() {} }); + + // Send goodbye just once + tx.send("goodbye").unwrap(); + + drop(tx); + task.await.unwrap(); +} |