diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2023-03-31 10:03:40 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2023-03-31 10:03:40 +0000 |
commit | 18447d24b892fc11813b28c959da7caf88585d42 (patch) | |
tree | a1abb0fc9b7e3d95c763e59b97e3a01b6101b5cf | |
parent | 1b82718a76e1804faf0a8776b4904edb6c582ec9 (diff) | |
parent | 0e4b52ea4fe2539d2d29d85ea5fb92eda51d9428 (diff) | |
download | tokio-stream-18447d24b892fc11813b28c959da7caf88585d42.tar.gz |
Upgrade tokio-stream to 0.1.12 am: a78006577e am: a9cba4aff2 am: 0e4b52ea4fandroid14-dev
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/2513497
Change-Id: I01729956a2b77894a50671d4c497f39f4ba8355b
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 2 | ||||
-rw-r--r-- | CHANGELOG.md | 8 | ||||
-rw-r--r-- | Cargo.toml | 5 | ||||
-rw-r--r-- | Cargo.toml.orig | 5 | ||||
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | src/stream_ext.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/collect.rs | 6 | ||||
-rw-r--r-- | src/stream_ext/then.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/throttle.rs | 4 | ||||
-rw-r--r-- | src/stream_ext/timeout.rs | 2 | ||||
-rw-r--r-- | src/wrappers/broadcast.rs | 2 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 34 | ||||
-rw-r--r-- | tests/stream_stream_map.rs | 57 | ||||
-rw-r--r-- | tests/watch.rs | 30 |
16 files changed, 86 insertions, 87 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 94e3fd4..d866201 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "ae0d49d59c0c63efafde73306af5d0d94046b50d" + "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0" }, "path_in_vcs": "tokio-stream" }
\ No newline at end of file @@ -23,7 +23,7 @@ rust_library { host_supported: true, crate_name: "tokio_stream", cargo_env_compat: true, - cargo_pkg_version: "0.1.11", + cargo_pkg_version: "0.1.12", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/CHANGELOG.md b/CHANGELOG.md index 05c2b18..c475c7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.1.12 (January 20, 2022) + +- time: remove `Unpin` bound on `Throttle` methods ([#5105]) +- time: document that `throttle` operates on ms granularity ([#5101]) + +[#5105]: https://github.com/tokio-rs/tokio/pull/5105 +[#5101]: https://github.com/tokio-rs/tokio/pull/5101 + # 0.1.11 (October 11, 2022) - time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036]) @@ -13,7 +13,7 @@ edition = "2018" rust-version = "1.49" name = "tokio-stream" -version = "0.1.11" +version = "0.1.12" authors = ["Tokio Contributors <team@tokio.rs>"] description = """ Utilities to work with `Stream` and `tokio`. @@ -76,6 +76,3 @@ sync = [ "tokio-util", ] time = ["tokio/time"] - -[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.proptest] -version = "1" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 6dfa978..f87b59a 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,7 +4,7 @@ name = "tokio-stream" # - Remove path dependencies # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.11" +version = "0.1.12" edition = "2018" rust-version = "1.49" authors = ["Tokio Contributors <team@tokio.rs>"] @@ -38,9 +38,6 @@ parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -proptest = "1" - [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] @@ -1,4 +1,4 @@ -Copyright (c) 2022 Tokio Contributors +Copyright (c) 2023 Tokio Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated @@ -11,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.11.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate" } - version: "0.1.11" + version: "0.1.12" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 12 - day: 12 + year: 2023 + month: 3 + day: 30 } } diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 6cea7b5..52d3202 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -982,6 +982,8 @@ pub trait StreamExt: Stream { /// Slows down a stream by enforcing a delay between items. /// + /// The underlying timer behind this utility has a granularity of one millisecond. + /// /// # Example /// /// Create a throttled stream. diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index 4b157a9..8548b74 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -195,11 +195,7 @@ where } else { let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); - if let Err(err) = res { - Err(err) - } else { - unreachable!(); - } + Err(res.map(drop).unwrap_err()) } } } diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs index 7f6b5a2..cc7caa7 100644 --- a/src/stream_ext/then.rs +++ b/src/stream_ext/then.rs @@ -72,7 +72,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs index f36c66a..5000139 100644 --- a/src/stream_ext/throttle.rs +++ b/src/stream_ext/throttle.rs @@ -4,7 +4,6 @@ use crate::Stream; use tokio::time::{Duration, Instant, Sleep}; use std::future::Future; -use std::marker::Unpin; use std::pin::Pin; use std::task::{self, Poll}; @@ -41,8 +40,7 @@ pin_project! { } } -// XXX: are these safe if `T: !Unpin`? -impl<T: Unpin> Throttle<T> { +impl<T> Throttle<T> { /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &T { diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index 98d7cd5..a440d20 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -24,7 +24,7 @@ pin_project! { } /// Error returned by `Timeout`. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); impl<S: Stream> Timeout<S> { diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs index 10184bf..7110664 100644 --- a/src/wrappers/broadcast.rs +++ b/src/wrappers/broadcast.rs @@ -18,7 +18,7 @@ pub struct BroadcastStream<T> { } /// An error returned from the inner stream of a [`BroadcastStream`]. -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStreamRecvError { /// The receiver lagged too far behind. Attempting to receive again will /// return the oldest message still retained by the channel. diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index c682c9c..ec8ead0 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardless of whether it was the initial value or sent afterwards. +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream<T>::from_changes`]. /// /// # Examples /// @@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new(rx); /// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream<T>::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } @@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } } impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs index ffc489b..b6b87e9 100644 --- a/tests/stream_stream_map.rs +++ b/tests/stream_stream_map.rs @@ -325,63 +325,6 @@ fn one_ready_many_none() { } } -#[cfg(not(target_os = "wasi"))] -proptest::proptest! { - #[test] - fn fuzz_pending_complete_mix(kinds: Vec<bool>) { - use std::task::{Context, Poll}; - - struct DidPoll<T> { - did_poll: bool, - inner: T, - } - - impl<T: Stream + Unpin> Stream for DidPoll<T> { - type Item = T::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll<Option<T::Item>> - { - self.did_poll = true; - Pin::new(&mut self.inner).poll_next(cx) - } - } - - for _ in 0..10 { - let mut map = task::spawn(StreamMap::new()); - let mut expect = 0; - - for (i, &is_empty) in kinds.iter().enumerate() { - let inner = if is_empty { - pin_box(stream::empty::<()>()) - } else { - expect += 1; - pin_box(stream::pending::<()>()) - }; - - let stream = DidPoll { - did_poll: false, - inner, - }; - - map.insert(i, stream); - } - - if expect == 0 { - assert_ready_none!(map.poll_next()); - } else { - assert_pending!(map.poll_next()); - - assert_eq!(expect, map.values().count()); - - for stream in map.values() { - assert!(stream.did_poll); - } - } - } - } -} - fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> { Box::pin(s) } diff --git a/tests/watch.rs b/tests/watch.rs index a56254e..3a39aaf 100644 --- a/tests/watch.rs +++ b/tests/watch.rs @@ -3,9 +3,11 @@ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; +use tokio_test::assert_pending; +use tokio_test::task::spawn; #[tokio::test] -async fn message_not_twice() { +async fn watch_stream_message_not_twice() { let (tx, rx) = watch::channel("hello"); let mut counter = 0; @@ -27,3 +29,29 @@ async fn message_not_twice() { drop(tx); task.await.unwrap(); } + +#[tokio::test] +async fn watch_stream_from_rx() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from(rx); + + assert_eq!(stream.next().await.unwrap(), "hello"); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} + +#[tokio::test] +async fn watch_stream_from_changes() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from_changes(rx); + + assert_pending!(spawn(&mut stream).poll_next()); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} |