From a78006577ebfa95eaa523a657f2763bcd7aeac3b Mon Sep 17 00:00:00 2001 From: Jeff Vander Stoep Date: Thu, 30 Mar 2023 10:29:21 +0200 Subject: Upgrade tokio-stream to 0.1.12 This project was upgraded with external_updater. Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md Test: TreeHugger Change-Id: I7a8393fe09a15ef4e6490ad01d772494a1a6e1b9 --- .cargo_vcs_info.json | 2 +- Android.bp | 2 +- CHANGELOG.md | 8 +++++++ Cargo.toml | 5 +--- Cargo.toml.orig | 5 +--- LICENSE | 2 +- METADATA | 10 ++++---- src/stream_ext.rs | 2 ++ src/stream_ext/collect.rs | 6 +---- src/stream_ext/then.rs | 2 +- src/stream_ext/throttle.rs | 4 +--- src/stream_ext/timeout.rs | 2 +- src/wrappers/broadcast.rs | 2 +- src/wrappers/watch.rs | 34 +++++++++++++++++++++++++-- tests/stream_stream_map.rs | 57 ---------------------------------------------- tests/watch.rs | 30 +++++++++++++++++++++++- 16 files changed, 86 insertions(+), 87 deletions(-) diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 94e3fd4..d866201 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "ae0d49d59c0c63efafde73306af5d0d94046b50d" + "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0" }, "path_in_vcs": "tokio-stream" } \ No newline at end of file diff --git a/Android.bp b/Android.bp index 08e9901..d80fdf7 100644 --- a/Android.bp +++ b/Android.bp @@ -23,7 +23,7 @@ rust_library { host_supported: true, crate_name: "tokio_stream", cargo_env_compat: true, - cargo_pkg_version: "0.1.11", + cargo_pkg_version: "0.1.12", srcs: ["src/lib.rs"], edition: "2018", features: [ diff --git a/CHANGELOG.md b/CHANGELOG.md index 05c2b18..c475c7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.1.12 (January 20, 2022) + +- time: remove `Unpin` bound on `Throttle` methods ([#5105]) +- time: document that `throttle` operates on ms granularity ([#5101]) + +[#5105]: https://github.com/tokio-rs/tokio/pull/5105 +[#5101]: https://github.com/tokio-rs/tokio/pull/5101 + # 0.1.11 (October 11, 2022) - time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036]) diff --git a/Cargo.toml b/Cargo.toml index df1dc53..5a3542e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" rust-version = "1.49" name = "tokio-stream" -version = "0.1.11" +version = "0.1.12" authors = ["Tokio Contributors "] description = """ Utilities to work with `Stream` and `tokio`. @@ -76,6 +76,3 @@ sync = [ "tokio-util", ] time = ["tokio/time"] - -[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.proptest] -version = "1" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 6dfa978..f87b59a 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -4,7 +4,7 @@ name = "tokio-stream" # - Remove path dependencies # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.11" +version = "0.1.12" edition = "2018" rust-version = "1.49" authors = ["Tokio Contributors "] @@ -38,9 +38,6 @@ parking_lot = "0.12.0" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -proptest = "1" - [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/LICENSE b/LICENSE index 8af5baf..8bdf6bd 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2022 Tokio Contributors +Copyright (c) 2023 Tokio Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated diff --git a/METADATA b/METADATA index 1bf6cf9..390ea84 100644 --- a/METADATA +++ b/METADATA @@ -11,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.11.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate" } - version: "0.1.11" + version: "0.1.12" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 12 - day: 12 + year: 2023 + month: 3 + day: 30 } } diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 6cea7b5..52d3202 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -982,6 +982,8 @@ pub trait StreamExt: Stream { /// Slows down a stream by enforcing a delay between items. /// + /// The underlying timer behind this utility has a granularity of one millisecond. + /// /// # Example /// /// Create a throttled stream. diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index 4b157a9..8548b74 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -195,11 +195,7 @@ where } else { let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0)))); - if let Err(err) = res { - Err(err) - } else { - unreachable!(); - } + Err(res.map(drop).unwrap_err()) } } } diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs index 7f6b5a2..cc7caa7 100644 --- a/src/stream_ext/then.rs +++ b/src/stream_ext/then.rs @@ -72,7 +72,7 @@ where } fn size_hint(&self) -> (usize, Option) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs index f36c66a..5000139 100644 --- a/src/stream_ext/throttle.rs +++ b/src/stream_ext/throttle.rs @@ -4,7 +4,6 @@ use crate::Stream; use tokio::time::{Duration, Instant, Sleep}; use std::future::Future; -use std::marker::Unpin; use std::pin::Pin; use std::task::{self, Poll}; @@ -41,8 +40,7 @@ pin_project! { } } -// XXX: are these safe if `T: !Unpin`? -impl Throttle { +impl Throttle { /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &T { diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index 98d7cd5..a440d20 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -24,7 +24,7 @@ pin_project! { } /// Error returned by `Timeout`. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); impl Timeout { diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs index 10184bf..7110664 100644 --- a/src/wrappers/broadcast.rs +++ b/src/wrappers/broadcast.rs @@ -18,7 +18,7 @@ pub struct BroadcastStream { } /// An error returned from the inner stream of a [`BroadcastStream`]. -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStreamRecvError { /// The receiver lagged too far behind. Attempting to receive again will /// return the oldest message still retained by the channel. diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index c682c9c..ec8ead0 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardless of whether it was the initial value or sent afterwards. +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream::from_changes`]. /// /// # Examples /// @@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new(rx); /// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } @@ -66,6 +89,13 @@ impl WatchStream { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } } impl Stream for WatchStream { diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs index ffc489b..b6b87e9 100644 --- a/tests/stream_stream_map.rs +++ b/tests/stream_stream_map.rs @@ -325,63 +325,6 @@ fn one_ready_many_none() { } } -#[cfg(not(target_os = "wasi"))] -proptest::proptest! { - #[test] - fn fuzz_pending_complete_mix(kinds: Vec) { - use std::task::{Context, Poll}; - - struct DidPoll { - did_poll: bool, - inner: T, - } - - impl Stream for DidPoll { - type Item = T::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { - self.did_poll = true; - Pin::new(&mut self.inner).poll_next(cx) - } - } - - for _ in 0..10 { - let mut map = task::spawn(StreamMap::new()); - let mut expect = 0; - - for (i, &is_empty) in kinds.iter().enumerate() { - let inner = if is_empty { - pin_box(stream::empty::<()>()) - } else { - expect += 1; - pin_box(stream::pending::<()>()) - }; - - let stream = DidPoll { - did_poll: false, - inner, - }; - - map.insert(i, stream); - } - - if expect == 0 { - assert_ready_none!(map.poll_next()); - } else { - assert_pending!(map.poll_next()); - - assert_eq!(expect, map.values().count()); - - for stream in map.values() { - assert!(stream.did_poll); - } - } - } - } -} - fn pin_box + 'static, U>(s: T) -> Pin>> { Box::pin(s) } diff --git a/tests/watch.rs b/tests/watch.rs index a56254e..3a39aaf 100644 --- a/tests/watch.rs +++ b/tests/watch.rs @@ -3,9 +3,11 @@ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; +use tokio_test::assert_pending; +use tokio_test::task::spawn; #[tokio::test] -async fn message_not_twice() { +async fn watch_stream_message_not_twice() { let (tx, rx) = watch::channel("hello"); let mut counter = 0; @@ -27,3 +29,29 @@ async fn message_not_twice() { drop(tx); task.await.unwrap(); } + +#[tokio::test] +async fn watch_stream_from_rx() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from(rx); + + assert_eq!(stream.next().await.unwrap(), "hello"); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} + +#[tokio::test] +async fn watch_stream_from_changes() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from_changes(rx); + + assert_pending!(spawn(&mut stream).poll_next()); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} -- cgit v1.2.3