summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoel Galenson <jgalenson@google.com>2021-08-19 08:51:36 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-08-19 08:51:36 +0000
commit1e276cc72a1971cc9b0880dc5de808095d264a71 (patch)
treeb059d9c53f27d43cffdf23ece636b1e62411292a
parent25c304ab80d631db4e6c163edbf1eb3ae0fe1bf8 (diff)
parentcf97149b2a2faf32b817a8cd76f6a7b52396d115 (diff)
downloadtokio-stream-1e276cc72a1971cc9b0880dc5de808095d264a71.tar.gz
Upgrade rust/crates/tokio-stream to 0.1.7 am: bd64f25cb3 am: 22cb8a79d5 am: cf97149b2a
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1790954 Change-Id: Iae825e0674465fb1ba35df3082a03a98edff1430
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp22
-rw-r--r--CHANGELOG.md10
-rw-r--r--Cargo.toml6
-rw-r--r--Cargo.toml.orig6
-rw-r--r--METADATA8
-rw-r--r--src/lib.rs2
-rw-r--r--src/stream_ext.rs2
-rw-r--r--src/stream_ext/collect.rs4
-rw-r--r--src/stream_ext/timeout.rs13
-rw-r--r--src/stream_map.rs6
-rw-r--r--src/wrappers/watch.rs6
-rw-r--r--tests/async_send_sync.rs2
-rw-r--r--tests/stream_chain.rs4
-rw-r--r--tests/stream_merge.rs4
-rw-r--r--tests/watch.rs27
16 files changed, 87 insertions, 37 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 44a2cd9..749a519 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "aaa150d211a4f5c621369746da0804a3b786b861"
+ "sha1": "5d61c997e9281e00f35c7ea426786996945f47f5"
}
}
diff --git a/Android.bp b/Android.bp
index 5c46ebc..b9bf77c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,24 +41,24 @@ rust_library {
// autocfg-1.0.1
// bytes-1.0.1 "default,std"
// cfg-if-1.0.0
-// futures-core-0.3.15 "alloc,default,std"
-// instant-0.1.9
-// libc-0.2.94 "default,std"
+// futures-core-0.3.16 "alloc,default,std"
+// instant-0.1.10
+// libc-0.2.98 "default,std"
// lock_api-0.4.4
// log-0.4.14
// memchr-2.4.0 "default,std"
-// mio-0.7.11 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
+// mio-0.7.13 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
// num_cpus-1.13.0
-// once_cell-1.7.2 "alloc,default,race,std"
+// once_cell-1.8.0 "alloc,default,race,std"
// parking_lot-0.11.1 "default"
// parking_lot_core-0.8.3
-// pin-project-lite-0.2.6
-// proc-macro2-1.0.26 "default,proc-macro"
+// pin-project-lite-0.2.7
+// proc-macro2-1.0.28 "default,proc-macro"
// quote-1.0.9 "default,proc-macro"
// scopeguard-1.1.0
-// signal-hook-registry-1.3.0
+// signal-hook-registry-1.4.0
// smallvec-1.6.1
-// syn-1.0.72 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-// tokio-1.6.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
-// tokio-macros-1.2.0
+// syn-1.0.74 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
+// tokio-1.9.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
+// tokio-macros-1.3.0
// unicode-xid-0.2.2 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0c59804..a0cdef0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,13 @@
+# 0.1.7 (July 7, 2021)
+
+### Fixed
+
+- sync: fix watch wrapper ([#3914])
+- time: fix `Timeout::size_hint` ([#3902])
+
+[#3902]: https://github.com/tokio-rs/tokio/pull/3902
+[#3914]: https://github.com/tokio-rs/tokio/pull/3914
+
# 0.1.6 (May 14, 2021)
### Added
diff --git a/Cargo.toml b/Cargo.toml
index 10e6e4a..5a8627a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "tokio-stream"
-version = "0.1.6"
+version = "0.1.7"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "Utilities to work with `Stream` and `tokio`.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.7/tokio_stream"
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
@@ -31,7 +31,7 @@ version = "0.3.0"
version = "0.2.0"
[dependencies.tokio]
-version = "1.2.0"
+version = "1.8.0"
features = ["sync"]
[dependencies.tokio-util]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 5c95917..911657c 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -6,13 +6,13 @@ name = "tokio-stream"
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.6"
+version = "0.1.7"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.7/tokio_stream"
description = """
Utilities to work with `Stream` and `tokio`.
"""
@@ -30,7 +30,7 @@ signal = ["tokio/signal"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
-tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
+tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }
[dev-dependencies]
diff --git a/METADATA b/METADATA
index 17893f4..8693d16 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.6.crate"
+ value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.7.crate"
}
- version: "0.1.6"
+ version: "0.1.7"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 5
- day: 19
+ month: 8
+ day: 9
}
}
diff --git a/src/lib.rs b/src/lib.rs
index af99488..b7f232f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,7 +10,7 @@
unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
-#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 51532ee..1157c9e 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -515,7 +515,7 @@ pub trait StreamExt: Stream {
/// Skip elements from the underlying stream while the provided predicate
/// resolves to `true`.
///
- /// This function, like [`Iterator::skip_while`], will ignore elemets from the
+ /// This function, like [`Iterator::skip_while`], will ignore elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false, the rest of the elements will be yielded.
///
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index 23f48b0..a33a6d6 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -113,7 +113,7 @@ impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
}
fn finalize(_: sealed::Internal, collection: &mut String) -> String {
- mem::replace(collection, String::new())
+ mem::take(collection)
}
}
@@ -132,7 +132,7 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> {
}
fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
- mem::replace(collection, vec![])
+ mem::take(collection)
}
}
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index de17dc0..98d7cd5 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -69,7 +69,18 @@ impl<S: Stream> Stream for Timeout<S> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
+ let (lower, upper) = self.stream.size_hint();
+
+ // The timeout stream may insert an error before and after each message
+ // from the underlying stream, but no more than one error between each
+ // message. Hence the upper bound is computed as 2x+1.
+
+ // Using a helper function to enable use of question mark operator.
+ fn twice_plus_one(value: Option<usize>) -> Option<usize> {
+ value?.checked_mul(2)?.checked_add(1)
+ }
+
+ (lower, twice_plus_one(upper))
}
}
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 7fc136f..9dc529a 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -364,11 +364,11 @@ impl<K, V> StreamMap<K, V> {
/// # Examples
///
/// ```
- /// use std::collections::HashMap;
+ /// use tokio_stream::{StreamMap, pending};
///
- /// let mut a = HashMap::new();
+ /// let mut a = StreamMap::new();
/// assert!(a.is_empty());
- /// a.insert(1, "a");
+ /// a.insert(1, pending::<i32>());
/// assert!(!a.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index 0ffd1b8..1daca10 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardles of whether it was the initial value or sent afterwards.
+/// regardless of whether it was the initial value or sent afterwards.
///
/// # Examples
///
@@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let (result, rx) = ready!(self.inner.poll(cx));
+ let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
- let received = (*rx.borrow()).clone();
+ let received = (*rx.borrow_and_update()).clone();
self.inner.set(make_future(rx));
Poll::Ready(Some(received))
}
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs
index c06bebd..f1c8b4e 100644
--- a/tests/async_send_sync.rs
+++ b/tests/async_send_sync.rs
@@ -1,3 +1,5 @@
+#![allow(clippy::diverging_sub_expression)]
+
use std::rc::Rc;
#[allow(dead_code)]
diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs
index 759de30..f3b7edb 100644
--- a/tests/stream_chain.rs
+++ b/tests/stream_chain.rs
@@ -89,12 +89,12 @@ fn size_overflow() {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- (usize::max_value(), Some(usize::max_value()))
+ (usize::MAX, Some(usize::MAX))
}
}
let m1 = Monster;
let m2 = Monster;
let m = m1.chain(m2);
- assert_eq!(m.size_hint(), (usize::max_value(), None));
+ assert_eq!(m.size_hint(), (usize::MAX, None));
}
diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs
index 69cd568..f603bcc 100644
--- a/tests/stream_merge.rs
+++ b/tests/stream_merge.rs
@@ -72,12 +72,12 @@ fn size_overflow() {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- (usize::max_value(), Some(usize::max_value()))
+ (usize::MAX, Some(usize::MAX))
}
}
let m1 = Monster;
let m2 = Monster;
let m = m1.merge(m2);
- assert_eq!(m.size_hint(), (usize::max_value(), None));
+ assert_eq!(m.size_hint(), (usize::MAX, None));
}
diff --git a/tests/watch.rs b/tests/watch.rs
new file mode 100644
index 0000000..92bcdf4
--- /dev/null
+++ b/tests/watch.rs
@@ -0,0 +1,27 @@
+use tokio::sync::watch;
+use tokio_stream::wrappers::WatchStream;
+use tokio_stream::StreamExt;
+
+#[tokio::test]
+async fn message_not_twice() {
+ let (tx, rx) = watch::channel("hello");
+
+ let mut counter = 0;
+ let mut stream = WatchStream::new(rx).map(move |payload| {
+ println!("{}", payload);
+ if payload == "goodbye" {
+ counter += 1;
+ }
+ if counter >= 2 {
+ panic!("too many goodbyes");
+ }
+ });
+
+ let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
+
+ // Send goodbye just once
+ tx.send("goodbye").unwrap();
+
+ drop(tx);
+ task.await.unwrap();
+}