summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-11 05:12:58 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-05-11 05:12:58 +0000
commit4b63517ec1207e482cbfe34003bdfe85a23b9078 (patch)
treec7137dd1d515ee77f69864a1aa7986dcf3015730
parent297433102af660120efe69e24a5f256e8785ed95 (diff)
parentb582a58c918f3cc89abbbb8c20744351a47d96de (diff)
downloadtokio-stream-android13-mainline-networking-release.tar.gz
Change-Id: I9137b3157763dbdd27a6b0632b605df3fb493414
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp35
-rw-r--r--CHANGELOG.md33
-rw-r--r--Cargo.toml18
-rw-r--r--Cargo.toml.orig15
-rw-r--r--LICENSE2
-rw-r--r--METADATA10
-rw-r--r--TEST_MAPPING108
-rw-r--r--cargo2android.json10
-rw-r--r--src/lib.rs2
-rw-r--r--src/stream_ext.rs2
-rw-r--r--src/stream_ext/all.rs21
-rw-r--r--src/stream_ext/any.rs21
-rw-r--r--src/stream_ext/collect.rs4
-rw-r--r--src/stream_ext/timeout.rs13
-rw-r--r--src/stream_map.rs29
-rw-r--r--src/wrappers.rs13
-rw-r--r--src/wrappers/broadcast.rs16
-rw-r--r--src/wrappers/mpsc_bounded.rs6
-rw-r--r--src/wrappers/mpsc_unbounded.rs6
-rw-r--r--src/wrappers/watch.rs14
-rw-r--r--tests/async_send_sync.rs2
-rw-r--r--tests/stream_chain.rs4
-rw-r--r--tests/stream_merge.rs4
-rw-r--r--tests/watch.rs29
25 files changed, 218 insertions, 201 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 77b1c80..f2822a6 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "b4918adbd8194e1ed7fd0c7fbe635b039b70c584"
+ "sha1": "d1a400912e82505c18c6c0c1f05cda06f334e201"
}
}
diff --git a/Android.bp b/Android.bp
index 4948062..11ef3d4 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,4 @@
-// This file is generated by cargo2android.py --device --run --features=time,net,io-util,fs --dependencies.
+// This file is generated by cargo2android.py --config cargo2android.json.
// Do not modify this file as changes will be overridden on upgrade.
package {
@@ -22,6 +22,8 @@ rust_library {
name: "libtokio_stream",
host_supported: true,
crate_name: "tokio_stream",
+ cargo_env_compat: true,
+ cargo_pkg_version: "0.1.8",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -35,30 +37,9 @@ rust_library {
"libpin_project_lite",
"libtokio",
],
+ apex_available: [
+ "//apex_available:platform",
+ "com.android.bluetooth",
+ ],
+ min_sdk_version: "29",
}
-
-// dependent_library ["feature_list"]
-// autocfg-1.0.1
-// bytes-1.0.1 "default,std"
-// cfg-if-1.0.0
-// futures-core-0.3.13 "alloc,default,std"
-// instant-0.1.9
-// libc-0.2.92 "default,std"
-// lock_api-0.4.2
-// log-0.4.14
-// memchr-2.3.4 "default,std"
-// mio-0.7.11 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
-// num_cpus-1.13.0
-// once_cell-1.7.2 "alloc,default,race,std"
-// parking_lot-0.11.1 "default"
-// parking_lot_core-0.8.3
-// pin-project-lite-0.2.6
-// proc-macro2-1.0.26 "default,proc-macro"
-// quote-1.0.9 "default,proc-macro"
-// scopeguard-1.1.0
-// signal-hook-registry-1.3.0
-// smallvec-1.6.1
-// syn-1.0.68 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-// tokio-1.4.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
-// tokio-macros-1.1.0
-// unicode-xid-0.2.1 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 59bb0fa..4ef469e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,36 @@
+# 0.1.8 (October 29, 2021)
+
+- stream: add `From<Receiver<T>>` impl for receiver streams ([#4080])
+- stream: impl `FromIterator` for `StreamMap` ([#4052])
+- signal: make windows docs for signal module show up on unix builds ([#3770])
+
+[#3770]: https://github.com/tokio-rs/tokio/pull/3770
+[#4052]: https://github.com/tokio-rs/tokio/pull/4052
+[#4080]: https://github.com/tokio-rs/tokio/pull/4080
+
+# 0.1.7 (July 7, 2021)
+
+### Fixed
+
+- sync: fix watch wrapper ([#3914])
+- time: fix `Timeout::size_hint` ([#3902])
+
+[#3902]: https://github.com/tokio-rs/tokio/pull/3902
+[#3914]: https://github.com/tokio-rs/tokio/pull/3914
+
+# 0.1.6 (May 14, 2021)
+
+### Added
+
+- stream: implement `Error` and `Display` for `BroadcastStreamRecvError` ([#3745])
+
+### Fixed
+
+- stream: avoid yielding in `AllFuture` and `AnyFuture` ([#3625])
+
+[#3745]: https://github.com/tokio-rs/tokio/pull/3745
+[#3625]: https://github.com/tokio-rs/tokio/pull/3625
+
# 0.1.5 (March 20, 2021)
### Fixed
diff --git a/Cargo.toml b/Cargo.toml
index c734987..699d94a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,26 +3,26 @@
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
#
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
[package]
edition = "2018"
name = "tokio-stream"
-version = "0.1.5"
+version = "0.1.8"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "Utilities to work with `Stream` and `tokio`.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream"
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
[package.metadata.docs.rs]
all-features = true
+rustc-args = ["--cfg", "docsrs"]
rustdoc-args = ["--cfg", "docsrs"]
[dependencies.futures-core]
version = "0.3.0"
@@ -31,7 +31,7 @@ version = "0.3.0"
version = "0.2.0"
[dependencies.tokio]
-version = "1.2.0"
+version = "1.8.0"
features = ["sync"]
[dependencies.tokio-util]
@@ -45,7 +45,7 @@ version = "0.3"
default-features = false
[dev-dependencies.proptest]
-version = "0.10.0"
+version = "1"
[dev-dependencies.tokio]
version = "1.2.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 8588325..83f8551 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -6,13 +6,13 @@ name = "tokio-stream"
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.5"
+version = "0.1.8"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream"
description = """
Utilities to work with `Stream` and `tokio`.
"""
@@ -30,8 +30,8 @@ signal = ["tokio/signal"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
-tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
-tokio-util = { version = "0.6.3", optional = true }
+tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
+tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }
[dev-dependencies]
tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] }
@@ -39,8 +39,13 @@ async-stream = "0.3"
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }
-proptest = "0.10.0"
+proptest = "1"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
+# Issue #3770
+#
+# This should allow `docsrs` to be read across projects, so that `tokio-stream`
+# can pick up stubbed types exported by `tokio`.
+rustc-args = ["--cfg", "docsrs"]
diff --git a/LICENSE b/LICENSE
index 243fcd6..ffa38bb 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2020 Tokio Contributors
+Copyright (c) 2021 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
diff --git a/METADATA b/METADATA
index 2dc18b1..68e56e9 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.5.crate"
+ value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.8.crate"
}
- version: "0.1.5"
+ version: "0.1.8"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 4
- day: 2
+ year: 2022
+ month: 3
+ day: 1
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 73ad094..dfc3524 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -1,113 +1,11 @@
// Generated by update_crate_tests.py for tests that depend on this crate.
{
- "presubmit": [
+ "imports": [
{
- "name": "tokio-test_device_test_src_lib"
+ "path": "external/rust/crates/tokio"
},
{
- "name": "tokio-test_device_test_tests_block_on"
- },
- {
- "name": "tokio-test_device_test_tests_io"
- },
- {
- "name": "tokio-test_device_test_tests_macros"
- },
- {
- "name": "tokio_device_test_tests_buffered"
- },
- {
- "name": "tokio_device_test_tests_io_async_read"
- },
- {
- "name": "tokio_device_test_tests_io_copy_bidirectional"
- },
- {
- "name": "tokio_device_test_tests_io_lines"
- },
- {
- "name": "tokio_device_test_tests_io_mem_stream"
- },
- {
- "name": "tokio_device_test_tests_io_read"
- },
- {
- "name": "tokio_device_test_tests_io_read_buf"
- },
- {
- "name": "tokio_device_test_tests_io_read_to_end"
- },
- {
- "name": "tokio_device_test_tests_io_take"
- },
- {
- "name": "tokio_device_test_tests_io_write"
- },
- {
- "name": "tokio_device_test_tests_io_write_all"
- },
- {
- "name": "tokio_device_test_tests_io_write_buf"
- },
- {
- "name": "tokio_device_test_tests_io_write_int"
- },
- {
- "name": "tokio_device_test_tests_macros_join"
- },
- {
- "name": "tokio_device_test_tests_no_rt"
- },
- {
- "name": "tokio_device_test_tests_rt_basic"
- },
- {
- "name": "tokio_device_test_tests_rt_threaded"
- },
- {
- "name": "tokio_device_test_tests_sync_barrier"
- },
- {
- "name": "tokio_device_test_tests_sync_broadcast"
- },
- {
- "name": "tokio_device_test_tests_sync_errors"
- },
- {
- "name": "tokio_device_test_tests_sync_mpsc"
- },
- {
- "name": "tokio_device_test_tests_sync_mutex_owned"
- },
- {
- "name": "tokio_device_test_tests_sync_rwlock"
- },
- {
- "name": "tokio_device_test_tests_sync_watch"
- },
- {
- "name": "tokio_device_test_tests_task_local"
- },
- {
- "name": "tokio_device_test_tests_task_local_set"
- },
- {
- "name": "tokio_device_test_tests_tcp_accept"
- },
- {
- "name": "tokio_device_test_tests_tcp_echo"
- },
- {
- "name": "tokio_device_test_tests_tcp_into_std"
- },
- {
- "name": "tokio_device_test_tests_tcp_shutdown"
- },
- {
- "name": "tokio_device_test_tests_time_rt"
- },
- {
- "name": "tokio_device_test_tests_uds_split"
+ "path": "external/rust/crates/tokio-test"
}
]
}
diff --git a/cargo2android.json b/cargo2android.json
new file mode 100644
index 0000000..e4cd046
--- /dev/null
+++ b/cargo2android.json
@@ -0,0 +1,10 @@
+{
+ "apex-available": [
+ "//apex_available:platform",
+ "com.android.bluetooth"
+ ],
+ "device": true,
+ "features": "time,net,io-util,fs",
+ "min-sdk-version": "29",
+ "run": true
+}
diff --git a/src/lib.rs b/src/lib.rs
index af99488..b7f232f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,7 +10,7 @@
unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
-#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 51532ee..1157c9e 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -515,7 +515,7 @@ pub trait StreamExt: Stream {
/// Skip elements from the underlying stream while the provided predicate
/// resolves to `true`.
///
- /// This function, like [`Iterator::skip_while`], will ignore elemets from the
+ /// This function, like [`Iterator::skip_while`], will ignore elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false, the rest of the elements will be yielded.
///
diff --git a/src/stream_ext/all.rs b/src/stream_ext/all.rs
index 11573f9..b4dbc1e 100644
--- a/src/stream_ext/all.rs
+++ b/src/stream_ext/all.rs
@@ -38,18 +38,21 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
- let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
+ let mut stream = Pin::new(me.stream);
- match next {
- Some(v) => {
- if !(me.f)(v) {
- Poll::Ready(false)
- } else {
- cx.waker().wake_by_ref();
- Poll::Pending
+ // Take a maximum of 32 items from the stream before yielding.
+ for _ in 0..32 {
+ match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+ Some(v) => {
+ if !(me.f)(v) {
+ return Poll::Ready(false);
+ }
}
+ None => return Poll::Ready(true),
}
- None => Poll::Ready(true),
}
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
}
}
diff --git a/src/stream_ext/any.rs b/src/stream_ext/any.rs
index 4c4c593..31394f2 100644
--- a/src/stream_ext/any.rs
+++ b/src/stream_ext/any.rs
@@ -38,18 +38,21 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
- let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
+ let mut stream = Pin::new(me.stream);
- match next {
- Some(v) => {
- if (me.f)(v) {
- Poll::Ready(true)
- } else {
- cx.waker().wake_by_ref();
- Poll::Pending
+ // Take a maximum of 32 items from the stream before yielding.
+ for _ in 0..32 {
+ match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+ Some(v) => {
+ if (me.f)(v) {
+ return Poll::Ready(true);
+ }
}
+ None => return Poll::Ready(false),
}
- None => Poll::Ready(false),
}
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
}
}
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index 23f48b0..a33a6d6 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -113,7 +113,7 @@ impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
}
fn finalize(_: sealed::Internal, collection: &mut String) -> String {
- mem::replace(collection, String::new())
+ mem::take(collection)
}
}
@@ -132,7 +132,7 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> {
}
fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
- mem::replace(collection, vec![])
+ mem::take(collection)
}
}
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index de17dc0..98d7cd5 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -69,7 +69,18 @@ impl<S: Stream> Stream for Timeout<S> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
+ let (lower, upper) = self.stream.size_hint();
+
+ // The timeout stream may insert an error before and after each message
+ // from the underlying stream, but no more than one error between each
+ // message. Hence the upper bound is computed as 2x+1.
+
+ // Using a helper function to enable use of question mark operator.
+ fn twice_plus_one(value: Option<usize>) -> Option<usize> {
+ value?.checked_mul(2)?.checked_add(1)
+ }
+
+ (lower, twice_plus_one(upper))
}
}
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 85b60cf..80a521e 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -364,11 +364,11 @@ impl<K, V> StreamMap<K, V> {
/// # Examples
///
/// ```
- /// use std::collections::HashMap;
+ /// use tokio_stream::{StreamMap, pending};
///
- /// let mut a = HashMap::new();
+ /// let mut a = StreamMap::new();
/// assert!(a.is_empty());
- /// a.insert(1, "a");
+ /// a.insert(1, pending::<i32>());
/// assert!(!a.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
@@ -568,6 +568,23 @@ where
}
}
+impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
+where
+ K: Hash + Eq,
+{
+ fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
+ let iterator = iter.into_iter();
+ let (lower_bound, _) = iterator.size_hint();
+ let mut stream_map = Self::with_capacity(lower_bound);
+
+ for (key, value) in iterator {
+ stream_map.insert(key, value);
+ }
+
+ stream_map
+ }
+}
+
mod rand {
use std::cell::Cell;
@@ -605,10 +622,10 @@ mod rand {
/// Fast random number generate
///
/// Implement xorshift64+: 2 32-bit xorshift sequences added together.
- /// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's
- /// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf
+ /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
+ /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
/// This generator passes the SmallCrush suite, part of TestU01 framework:
- /// http://simul.iro.umontreal.ca/testu01/tu01.html
+ /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
#[derive(Debug)]
pub(crate) struct FastRand {
one: Cell<u32>,
diff --git a/src/wrappers.rs b/src/wrappers.rs
index f2dc21f..62cabe4 100644
--- a/src/wrappers.rs
+++ b/src/wrappers.rs
@@ -1,13 +1,4 @@
//! Wrappers for Tokio types that implement `Stream`.
-//!
-#![cfg_attr(
- unix,
- doc = "You are viewing documentation built under unix. To view windows-specific wrappers, change to the `x86_64-pc-windows-msvc` platform."
-)]
-#![cfg_attr(
- windows,
- doc = "You are viewing documentation built under windows. To view unix-specific wrappers, change to the `x86_64-unknown-linux-gnu` platform."
-)]
/// Error types for the wrappers.
pub mod errors {
@@ -36,9 +27,9 @@ cfg_signal! {
#[cfg(unix)]
pub use signal_unix::SignalStream;
- #[cfg(windows)]
+ #[cfg(any(windows, docsrs))]
mod signal_windows;
- #[cfg(windows)]
+ #[cfg(any(windows, docsrs))]
pub use signal_windows::{CtrlCStream, CtrlBreakStream};
}
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index 06a982d..c8346a6 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -27,6 +27,16 @@ pub enum BroadcastStreamRecvError {
Lagged(u64),
}
+impl fmt::Display for BroadcastStreamRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ BroadcastStreamRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
+ }
+ }
+}
+
+impl std::error::Error for BroadcastStreamRecvError {}
+
async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
let result = rx.recv().await;
(result, rx)
@@ -61,3 +71,9 @@ impl<T> fmt::Debug for BroadcastStream<T> {
f.debug_struct("BroadcastStream").finish()
}
}
+
+impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/src/wrappers/mpsc_bounded.rs b/src/wrappers/mpsc_bounded.rs
index e4f9000..b536268 100644
--- a/src/wrappers/mpsc_bounded.rs
+++ b/src/wrappers/mpsc_bounded.rs
@@ -57,3 +57,9 @@ impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
&mut self.inner
}
}
+
+impl<T> From<Receiver<T>> for ReceiverStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/src/wrappers/mpsc_unbounded.rs b/src/wrappers/mpsc_unbounded.rs
index bc5f40c..54597b7 100644
--- a/src/wrappers/mpsc_unbounded.rs
+++ b/src/wrappers/mpsc_unbounded.rs
@@ -51,3 +51,9 @@ impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
&mut self.inner
}
}
+
+impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn from(recv: UnboundedReceiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index 0ffd1b8..bd3a18a 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
/// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardles of whether it was the initial value or sent afterwards.
+/// regardless of whether it was the initial value or sent afterwards.
///
/// # Examples
///
@@ -59,7 +59,7 @@ async fn make_future<T: Clone + Send + Sync>(
(result, rx)
}
-impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
+impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
@@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- let (result, rx) = ready!(self.inner.poll(cx));
+ let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
- let received = (*rx.borrow()).clone();
+ let received = (*rx.borrow_and_update()).clone();
self.inner.set(make_future(rx));
Poll::Ready(Some(received))
}
@@ -94,3 +94,9 @@ impl<T> fmt::Debug for WatchStream<T> {
f.debug_struct("WatchStream").finish()
}
}
+
+impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs
index c06bebd..f1c8b4e 100644
--- a/tests/async_send_sync.rs
+++ b/tests/async_send_sync.rs
@@ -1,3 +1,5 @@
+#![allow(clippy::diverging_sub_expression)]
+
use std::rc::Rc;
#[allow(dead_code)]
diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs
index 759de30..f3b7edb 100644
--- a/tests/stream_chain.rs
+++ b/tests/stream_chain.rs
@@ -89,12 +89,12 @@ fn size_overflow() {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- (usize::max_value(), Some(usize::max_value()))
+ (usize::MAX, Some(usize::MAX))
}
}
let m1 = Monster;
let m2 = Monster;
let m = m1.chain(m2);
- assert_eq!(m.size_hint(), (usize::max_value(), None));
+ assert_eq!(m.size_hint(), (usize::MAX, None));
}
diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs
index 69cd568..f603bcc 100644
--- a/tests/stream_merge.rs
+++ b/tests/stream_merge.rs
@@ -72,12 +72,12 @@ fn size_overflow() {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- (usize::max_value(), Some(usize::max_value()))
+ (usize::MAX, Some(usize::MAX))
}
}
let m1 = Monster;
let m2 = Monster;
let m = m1.merge(m2);
- assert_eq!(m.size_hint(), (usize::max_value(), None));
+ assert_eq!(m.size_hint(), (usize::MAX, None));
}
diff --git a/tests/watch.rs b/tests/watch.rs
new file mode 100644
index 0000000..a56254e
--- /dev/null
+++ b/tests/watch.rs
@@ -0,0 +1,29 @@
+#![cfg(feature = "sync")]
+
+use tokio::sync::watch;
+use tokio_stream::wrappers::WatchStream;
+use tokio_stream::StreamExt;
+
+#[tokio::test]
+async fn message_not_twice() {
+ let (tx, rx) = watch::channel("hello");
+
+ let mut counter = 0;
+ let mut stream = WatchStream::new(rx).map(move |payload| {
+ println!("{}", payload);
+ if payload == "goodbye" {
+ counter += 1;
+ }
+ if counter >= 2 {
+ panic!("too many goodbyes");
+ }
+ });
+
+ let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
+
+ // Send goodbye just once
+ tx.send("goodbye").unwrap();
+
+ drop(tx);
+ task.await.unwrap();
+}