summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Vander Stoep <jeffv@google.com>2023-03-31 11:14:29 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-03-31 11:14:29 +0000
commitf9adf8b06b5f71b47ae781aea31c03a06ba6ef35 (patch)
treea1abb0fc9b7e3d95c763e59b97e3a01b6101b5cf
parent39fcf15fa360b9edadecadf8ff28a5d4911f5ecd (diff)
parent7dcbe30e030e553bb44f1c0391d87048e62578bb (diff)
downloadtokio-stream-f9adf8b06b5f71b47ae781aea31c03a06ba6ef35.tar.gz
Upgrade tokio-stream to 0.1.12 am: a78006577e am: a9cba4aff2 am: 0e4b52ea4f am: 18447d24b8 am: 7dcbe30e03
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/2513497 Change-Id: I96064a21b88c684b7c235634cd4f54a8e46c3920 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp2
-rw-r--r--CHANGELOG.md8
-rw-r--r--Cargo.toml5
-rw-r--r--Cargo.toml.orig5
-rw-r--r--LICENSE2
-rw-r--r--METADATA10
-rw-r--r--src/stream_ext.rs2
-rw-r--r--src/stream_ext/collect.rs6
-rw-r--r--src/stream_ext/then.rs2
-rw-r--r--src/stream_ext/throttle.rs4
-rw-r--r--src/stream_ext/timeout.rs2
-rw-r--r--src/wrappers/broadcast.rs2
-rw-r--r--src/wrappers/watch.rs34
-rw-r--r--tests/stream_stream_map.rs57
-rw-r--r--tests/watch.rs30
16 files changed, 86 insertions, 87 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 94e3fd4..d866201 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,6 +1,6 @@
{
"git": {
- "sha1": "ae0d49d59c0c63efafde73306af5d0d94046b50d"
+ "sha1": "46f974d8cfcb56c251d80cf1dc4a6bcf9fd1d7a0"
},
"path_in_vcs": "tokio-stream"
} \ No newline at end of file
diff --git a/Android.bp b/Android.bp
index 08e9901..d80fdf7 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,7 +23,7 @@ rust_library {
host_supported: true,
crate_name: "tokio_stream",
cargo_env_compat: true,
- cargo_pkg_version: "0.1.11",
+ cargo_pkg_version: "0.1.12",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 05c2b18..c475c7c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,11 @@
+# 0.1.12 (January 20, 2022)
+
+- time: remove `Unpin` bound on `Throttle` methods ([#5105])
+- time: document that `throttle` operates on ms granularity ([#5101])
+
+[#5105]: https://github.com/tokio-rs/tokio/pull/5105
+[#5101]: https://github.com/tokio-rs/tokio/pull/5101
+
# 0.1.11 (October 11, 2022)
- time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036])
diff --git a/Cargo.toml b/Cargo.toml
index df1dc53..5a3542e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
edition = "2018"
rust-version = "1.49"
name = "tokio-stream"
-version = "0.1.11"
+version = "0.1.12"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = """
Utilities to work with `Stream` and `tokio`.
@@ -76,6 +76,3 @@ sync = [
"tokio-util",
]
time = ["tokio/time"]
-
-[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.proptest]
-version = "1"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 6dfa978..f87b59a 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -4,7 +4,7 @@ name = "tokio-stream"
# - Remove path dependencies
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.11"
+version = "0.1.12"
edition = "2018"
rust-version = "1.49"
authors = ["Tokio Contributors <team@tokio.rs>"]
@@ -38,9 +38,6 @@ parking_lot = "0.12.0"
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }
-[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
-proptest = "1"
-
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
diff --git a/LICENSE b/LICENSE
index 8af5baf..8bdf6bd 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2022 Tokio Contributors
+Copyright (c) 2023 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
diff --git a/METADATA b/METADATA
index 1bf6cf9..390ea84 100644
--- a/METADATA
+++ b/METADATA
@@ -11,13 +11,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.11.crate"
+ value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.12.crate"
}
- version: "0.1.11"
+ version: "0.1.12"
license_type: NOTICE
last_upgrade_date {
- year: 2022
- month: 12
- day: 12
+ year: 2023
+ month: 3
+ day: 30
}
}
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 6cea7b5..52d3202 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -982,6 +982,8 @@ pub trait StreamExt: Stream {
/// Slows down a stream by enforcing a delay between items.
///
+ /// The underlying timer behind this utility has a granularity of one millisecond.
+ ///
/// # Example
///
/// Create a throttled stream.
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index 4b157a9..8548b74 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -195,11 +195,7 @@ where
} else {
let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
- if let Err(err) = res {
- Err(err)
- } else {
- unreachable!();
- }
+ Err(res.map(drop).unwrap_err())
}
}
}
diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs
index 7f6b5a2..cc7caa7 100644
--- a/src/stream_ext/then.rs
+++ b/src/stream_ext/then.rs
@@ -72,7 +72,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs
index f36c66a..5000139 100644
--- a/src/stream_ext/throttle.rs
+++ b/src/stream_ext/throttle.rs
@@ -4,7 +4,6 @@ use crate::Stream;
use tokio::time::{Duration, Instant, Sleep};
use std::future::Future;
-use std::marker::Unpin;
use std::pin::Pin;
use std::task::{self, Poll};
@@ -41,8 +40,7 @@ pin_project! {
}
}
-// XXX: are these safe if `T: !Unpin`?
-impl<T: Unpin> Throttle<T> {
+impl<T> Throttle<T> {
/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &T {
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index 98d7cd5..a440d20 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -24,7 +24,7 @@ pin_project! {
}
/// Error returned by `Timeout`.
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Eq)]
pub struct Elapsed(());
impl<S: Stream> Timeout<S> {
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index 10184bf..7110664 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -18,7 +18,7 @@ pub struct BroadcastStream<T> {
}
/// An error returned from the inner stream of a [`BroadcastStream`].
-#[derive(Debug, PartialEq, Clone)]
+#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStreamRecvError {
/// The receiver lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the channel.
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index c682c9c..ec8ead0 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
-/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardless of whether it was the initial value or sent afterwards.
+/// This stream will start by yielding the current value when the WatchStream is polled,
+/// regardless of whether it was the initial value or sent afterwards,
+/// unless you use [`WatchStream<T>::from_changes`].
///
/// # Examples
///
@@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError;
/// let (tx, rx) = watch::channel("hello");
/// let mut rx = WatchStream::new(rx);
///
+/// // existing rx output with "hello" is ignored here
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// Example with [`WatchStream<T>::from_changes`]:
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use futures::future::FutureExt;
+/// use tokio::sync::watch;
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::from_changes(rx);
+///
+/// // no output from rx is available at this point - let's check this:
+/// assert!(rx.next().now_or_never().is_none());
+///
/// tx.send("goodbye").unwrap();
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
@@ -66,6 +89,13 @@ impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
+
+ /// Create a new `WatchStream` that waits for the value to be changed.
+ pub fn from_changes(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
}
impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs
index ffc489b..b6b87e9 100644
--- a/tests/stream_stream_map.rs
+++ b/tests/stream_stream_map.rs
@@ -325,63 +325,6 @@ fn one_ready_many_none() {
}
}
-#[cfg(not(target_os = "wasi"))]
-proptest::proptest! {
- #[test]
- fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
- use std::task::{Context, Poll};
-
- struct DidPoll<T> {
- did_poll: bool,
- inner: T,
- }
-
- impl<T: Stream + Unpin> Stream for DidPoll<T> {
- type Item = T::Item;
-
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
- -> Poll<Option<T::Item>>
- {
- self.did_poll = true;
- Pin::new(&mut self.inner).poll_next(cx)
- }
- }
-
- for _ in 0..10 {
- let mut map = task::spawn(StreamMap::new());
- let mut expect = 0;
-
- for (i, &is_empty) in kinds.iter().enumerate() {
- let inner = if is_empty {
- pin_box(stream::empty::<()>())
- } else {
- expect += 1;
- pin_box(stream::pending::<()>())
- };
-
- let stream = DidPoll {
- did_poll: false,
- inner,
- };
-
- map.insert(i, stream);
- }
-
- if expect == 0 {
- assert_ready_none!(map.poll_next());
- } else {
- assert_pending!(map.poll_next());
-
- assert_eq!(expect, map.values().count());
-
- for stream in map.values() {
- assert!(stream.did_poll);
- }
- }
- }
- }
-}
-
fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
Box::pin(s)
}
diff --git a/tests/watch.rs b/tests/watch.rs
index a56254e..3a39aaf 100644
--- a/tests/watch.rs
+++ b/tests/watch.rs
@@ -3,9 +3,11 @@
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
+use tokio_test::assert_pending;
+use tokio_test::task::spawn;
#[tokio::test]
-async fn message_not_twice() {
+async fn watch_stream_message_not_twice() {
let (tx, rx) = watch::channel("hello");
let mut counter = 0;
@@ -27,3 +29,29 @@ async fn message_not_twice() {
drop(tx);
task.await.unwrap();
}
+
+#[tokio::test]
+async fn watch_stream_from_rx() {
+ let (tx, rx) = watch::channel("hello");
+
+ let mut stream = WatchStream::from(rx);
+
+ assert_eq!(stream.next().await.unwrap(), "hello");
+
+ tx.send("bye").unwrap();
+
+ assert_eq!(stream.next().await.unwrap(), "bye");
+}
+
+#[tokio::test]
+async fn watch_stream_from_changes() {
+ let (tx, rx) = watch::channel("hello");
+
+ let mut stream = WatchStream::from_changes(rx);
+
+ assert_pending!(spawn(&mut stream).poll_next());
+
+ tx.send("bye").unwrap();
+
+ assert_eq!(stream.next().await.unwrap(), "bye");
+}