diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:05:06 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2022-05-10 07:05:06 +0000 |
commit | 607411b93b67ad645b9a23b011fa500edd0e2efd (patch) | |
tree | c7137dd1d515ee77f69864a1aa7986dcf3015730 | |
parent | 297433102af660120efe69e24a5f256e8785ed95 (diff) | |
parent | b582a58c918f3cc89abbbb8c20744351a47d96de (diff) | |
download | tokio-stream-607411b93b67ad645b9a23b011fa500edd0e2efd.tar.gz |
Snap for 8564071 from b582a58c918f3cc89abbbb8c20744351a47d96de to mainline-sdkext-releaseaml_sdk_331812000aml_sdk_331811100aml_sdk_331811000aml_sdk_331412000aml_sdk_331410000aml_sdk_331310010aml_sdk_331111000aml_sdk_330810050aml_sdk_330810010android13-mainline-sdkext-release
Change-Id: I4969b015c971713ff3e529edfb1a8eebd45756bb
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 35 | ||||
-rw-r--r-- | CHANGELOG.md | 33 | ||||
-rw-r--r-- | Cargo.toml | 18 | ||||
-rw-r--r-- | Cargo.toml.orig | 15 | ||||
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | TEST_MAPPING | 108 | ||||
-rw-r--r-- | cargo2android.json | 10 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/stream_ext.rs | 2 | ||||
-rw-r--r-- | src/stream_ext/all.rs | 21 | ||||
-rw-r--r-- | src/stream_ext/any.rs | 21 | ||||
-rw-r--r-- | src/stream_ext/collect.rs | 4 | ||||
-rw-r--r-- | src/stream_ext/timeout.rs | 13 | ||||
-rw-r--r-- | src/stream_map.rs | 29 | ||||
-rw-r--r-- | src/wrappers.rs | 13 | ||||
-rw-r--r-- | src/wrappers/broadcast.rs | 16 | ||||
-rw-r--r-- | src/wrappers/mpsc_bounded.rs | 6 | ||||
-rw-r--r-- | src/wrappers/mpsc_unbounded.rs | 6 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 14 | ||||
-rw-r--r-- | tests/async_send_sync.rs | 2 | ||||
-rw-r--r-- | tests/stream_chain.rs | 4 | ||||
-rw-r--r-- | tests/stream_merge.rs | 4 | ||||
-rw-r--r-- | tests/watch.rs | 29 |
25 files changed, 218 insertions, 201 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index 77b1c80..f2822a6 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,5 +1,5 @@ { "git": { - "sha1": "b4918adbd8194e1ed7fd0c7fbe635b039b70c584" + "sha1": "d1a400912e82505c18c6c0c1f05cda06f334e201" } } @@ -1,4 +1,4 @@ -// This file is generated by cargo2android.py --device --run --features=time,net,io-util,fs --dependencies. +// This file is generated by cargo2android.py --config cargo2android.json. // Do not modify this file as changes will be overridden on upgrade. package { @@ -22,6 +22,8 @@ rust_library { name: "libtokio_stream", host_supported: true, crate_name: "tokio_stream", + cargo_env_compat: true, + cargo_pkg_version: "0.1.8", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -35,30 +37,9 @@ rust_library { "libpin_project_lite", "libtokio", ], + apex_available: [ + "//apex_available:platform", + "com.android.bluetooth", + ], + min_sdk_version: "29", } - -// dependent_library ["feature_list"] -// autocfg-1.0.1 -// bytes-1.0.1 "default,std" -// cfg-if-1.0.0 -// futures-core-0.3.13 "alloc,default,std" -// instant-0.1.9 -// libc-0.2.92 "default,std" -// lock_api-0.4.2 -// log-0.4.14 -// memchr-2.3.4 "default,std" -// mio-0.7.11 "default,net,os-ext,os-poll,os-util,tcp,udp,uds" -// num_cpus-1.13.0 -// once_cell-1.7.2 "alloc,default,race,std" -// parking_lot-0.11.1 "default" -// parking_lot_core-0.8.3 -// pin-project-lite-0.2.6 -// proc-macro2-1.0.26 "default,proc-macro" -// quote-1.0.9 "default,proc-macro" -// scopeguard-1.1.0 -// signal-hook-registry-1.3.0 -// smallvec-1.6.1 -// syn-1.0.68 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut" -// tokio-1.4.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi" -// tokio-macros-1.1.0 -// unicode-xid-0.2.1 "default" diff --git a/CHANGELOG.md b/CHANGELOG.md index 59bb0fa..4ef469e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,36 @@ +# 0.1.8 (October 29, 2021) + +- stream: add `From<Receiver<T>>` impl for receiver streams ([#4080]) +- stream: impl `FromIterator` for `StreamMap` ([#4052]) +- signal: make windows docs for signal module show up on unix builds ([#3770]) + +[#3770]: https://github.com/tokio-rs/tokio/pull/3770 +[#4052]: https://github.com/tokio-rs/tokio/pull/4052 +[#4080]: https://github.com/tokio-rs/tokio/pull/4080 + +# 0.1.7 (July 7, 2021) + +### Fixed + +- sync: fix watch wrapper ([#3914]) +- time: fix `Timeout::size_hint` ([#3902]) + +[#3902]: https://github.com/tokio-rs/tokio/pull/3902 +[#3914]: https://github.com/tokio-rs/tokio/pull/3914 + +# 0.1.6 (May 14, 2021) + +### Added + +- stream: implement `Error` and `Display` for `BroadcastStreamRecvError` ([#3745]) + +### Fixed + +- stream: avoid yielding in `AllFuture` and `AnyFuture` ([#3625]) + +[#3745]: https://github.com/tokio-rs/tokio/pull/3745 +[#3625]: https://github.com/tokio-rs/tokio/pull/3625 + # 0.1.5 (March 20, 2021) ### Fixed @@ -3,26 +3,26 @@ # When uploading crates to the registry Cargo will automatically # "normalize" Cargo.toml files for maximal compatibility # with all versions of Cargo and also rewrite `path` dependencies -# to registry (e.g., crates.io) dependencies +# to registry (e.g., crates.io) dependencies. # -# If you believe there's an error in this file please file an -# issue against the rust-lang/cargo repository. If you're -# editing this file be aware that the upstream Cargo.toml -# will likely look very different (and much more reasonable) +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. [package] edition = "2018" name = "tokio-stream" -version = "0.1.5" +version = "0.1.8" authors = ["Tokio Contributors <team@tokio.rs>"] description = "Utilities to work with `Stream` and `tokio`.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream" categories = ["asynchronous"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" [package.metadata.docs.rs] all-features = true +rustc-args = ["--cfg", "docsrs"] rustdoc-args = ["--cfg", "docsrs"] [dependencies.futures-core] version = "0.3.0" @@ -31,7 +31,7 @@ version = "0.3.0" version = "0.2.0" [dependencies.tokio] -version = "1.2.0" +version = "1.8.0" features = ["sync"] [dependencies.tokio-util] @@ -45,7 +45,7 @@ version = "0.3" default-features = false [dev-dependencies.proptest] -version = "0.10.0" +version = "1" [dev-dependencies.tokio] version = "1.2.0" diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 8588325..83f8551 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -6,13 +6,13 @@ name = "tokio-stream" # - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.5" +version = "0.1.8" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream" description = """ Utilities to work with `Stream` and `tokio`. """ @@ -30,8 +30,8 @@ signal = ["tokio/signal"] [dependencies] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" -tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] } -tokio-util = { version = "0.6.3", optional = true } +tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] } +tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true } [dev-dependencies] tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] } @@ -39,8 +39,13 @@ async-stream = "0.3" tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } -proptest = "0.10.0" +proptest = "1" [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] +# Issue #3770 +# +# This should allow `docsrs` to be read across projects, so that `tokio-stream` +# can pick up stubbed types exported by `tokio`. +rustc-args = ["--cfg", "docsrs"] @@ -1,4 +1,4 @@ -Copyright (c) 2020 Tokio Contributors +Copyright (c) 2021 Tokio Contributors Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.5.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.8.crate" } - version: "0.1.5" + version: "0.1.8" license_type: NOTICE last_upgrade_date { - year: 2021 - month: 4 - day: 2 + year: 2022 + month: 3 + day: 1 } } diff --git a/TEST_MAPPING b/TEST_MAPPING index 73ad094..dfc3524 100644 --- a/TEST_MAPPING +++ b/TEST_MAPPING @@ -1,113 +1,11 @@ // Generated by update_crate_tests.py for tests that depend on this crate. { - "presubmit": [ + "imports": [ { - "name": "tokio-test_device_test_src_lib" + "path": "external/rust/crates/tokio" }, { - "name": "tokio-test_device_test_tests_block_on" - }, - { - "name": "tokio-test_device_test_tests_io" - }, - { - "name": "tokio-test_device_test_tests_macros" - }, - { - "name": "tokio_device_test_tests_buffered" - }, - { - "name": "tokio_device_test_tests_io_async_read" - }, - { - "name": "tokio_device_test_tests_io_copy_bidirectional" - }, - { - "name": "tokio_device_test_tests_io_lines" - }, - { - "name": "tokio_device_test_tests_io_mem_stream" - }, - { - "name": "tokio_device_test_tests_io_read" - }, - { - "name": "tokio_device_test_tests_io_read_buf" - }, - { - "name": "tokio_device_test_tests_io_read_to_end" - }, - { - "name": "tokio_device_test_tests_io_take" - }, - { - "name": "tokio_device_test_tests_io_write" - }, - { - "name": "tokio_device_test_tests_io_write_all" - }, - { - "name": "tokio_device_test_tests_io_write_buf" - }, - { - "name": "tokio_device_test_tests_io_write_int" - }, - { - "name": "tokio_device_test_tests_macros_join" - }, - { - "name": "tokio_device_test_tests_no_rt" - }, - { - "name": "tokio_device_test_tests_rt_basic" - }, - { - "name": "tokio_device_test_tests_rt_threaded" - }, - { - "name": "tokio_device_test_tests_sync_barrier" - }, - { - "name": "tokio_device_test_tests_sync_broadcast" - }, - { - "name": "tokio_device_test_tests_sync_errors" - }, - { - "name": "tokio_device_test_tests_sync_mpsc" - }, - { - "name": "tokio_device_test_tests_sync_mutex_owned" - }, - { - "name": "tokio_device_test_tests_sync_rwlock" - }, - { - "name": "tokio_device_test_tests_sync_watch" - }, - { - "name": "tokio_device_test_tests_task_local" - }, - { - "name": "tokio_device_test_tests_task_local_set" - }, - { - "name": "tokio_device_test_tests_tcp_accept" - }, - { - "name": "tokio_device_test_tests_tcp_echo" - }, - { - "name": "tokio_device_test_tests_tcp_into_std" - }, - { - "name": "tokio_device_test_tests_tcp_shutdown" - }, - { - "name": "tokio_device_test_tests_time_rt" - }, - { - "name": "tokio_device_test_tests_uds_split" + "path": "external/rust/crates/tokio-test" } ] } diff --git a/cargo2android.json b/cargo2android.json new file mode 100644 index 0000000..e4cd046 --- /dev/null +++ b/cargo2android.json @@ -0,0 +1,10 @@ +{ + "apex-available": [ + "//apex_available:platform", + "com.android.bluetooth" + ], + "device": true, + "features": "time,net,io-util,fs", + "min-sdk-version": "29", + "run": true +} @@ -10,7 +10,7 @@ unreachable_pub )] #![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(docsrs, deny(broken_intra_doc_links))] +#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/src/stream_ext.rs b/src/stream_ext.rs index 51532ee..1157c9e 100644 --- a/src/stream_ext.rs +++ b/src/stream_ext.rs @@ -515,7 +515,7 @@ pub trait StreamExt: Stream { /// Skip elements from the underlying stream while the provided predicate /// resolves to `true`. /// - /// This function, like [`Iterator::skip_while`], will ignore elemets from the + /// This function, like [`Iterator::skip_while`], will ignore elements from the /// stream until the predicate `f` resolves to `false`. Once one element /// returns false, the rest of the elements will be yielded. /// diff --git a/src/stream_ext/all.rs b/src/stream_ext/all.rs index 11573f9..b4dbc1e 100644 --- a/src/stream_ext/all.rs +++ b/src/stream_ext/all.rs @@ -38,18 +38,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if !(me.f)(v) { - Poll::Ready(false) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if !(me.f)(v) { + return Poll::Ready(false); + } } + None => return Poll::Ready(true), } - None => Poll::Ready(true), } + + cx.waker().wake_by_ref(); + Poll::Pending } } diff --git a/src/stream_ext/any.rs b/src/stream_ext/any.rs index 4c4c593..31394f2 100644 --- a/src/stream_ext/any.rs +++ b/src/stream_ext/any.rs @@ -38,18 +38,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); + let mut stream = Pin::new(me.stream); - match next { - Some(v) => { - if (me.f)(v) { - Poll::Ready(true) - } else { - cx.waker().wake_by_ref(); - Poll::Pending + // Take a maximum of 32 items from the stream before yielding. + for _ in 0..32 { + match futures_core::ready!(stream.as_mut().poll_next(cx)) { + Some(v) => { + if (me.f)(v) { + return Poll::Ready(true); + } } + None => return Poll::Ready(false), } - None => Poll::Ready(false), } + + cx.waker().wake_by_ref(); + Poll::Pending } } diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs index 23f48b0..a33a6d6 100644 --- a/src/stream_ext/collect.rs +++ b/src/stream_ext/collect.rs @@ -113,7 +113,7 @@ impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String { } fn finalize(_: sealed::Internal, collection: &mut String) -> String { - mem::replace(collection, String::new()) + mem::take(collection) } } @@ -132,7 +132,7 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> { } fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> { - mem::replace(collection, vec![]) + mem::take(collection) } } diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs index de17dc0..98d7cd5 100644 --- a/src/stream_ext/timeout.rs +++ b/src/stream_ext/timeout.rs @@ -69,7 +69,18 @@ impl<S: Stream> Stream for Timeout<S> { } fn size_hint(&self) -> (usize, Option<usize>) { - self.stream.size_hint() + let (lower, upper) = self.stream.size_hint(); + + // The timeout stream may insert an error before and after each message + // from the underlying stream, but no more than one error between each + // message. Hence the upper bound is computed as 2x+1. + + // Using a helper function to enable use of question mark operator. + fn twice_plus_one(value: Option<usize>) -> Option<usize> { + value?.checked_mul(2)?.checked_add(1) + } + + (lower, twice_plus_one(upper)) } } diff --git a/src/stream_map.rs b/src/stream_map.rs index 85b60cf..80a521e 100644 --- a/src/stream_map.rs +++ b/src/stream_map.rs @@ -364,11 +364,11 @@ impl<K, V> StreamMap<K, V> { /// # Examples /// /// ``` - /// use std::collections::HashMap; + /// use tokio_stream::{StreamMap, pending}; /// - /// let mut a = HashMap::new(); + /// let mut a = StreamMap::new(); /// assert!(a.is_empty()); - /// a.insert(1, "a"); + /// a.insert(1, pending::<i32>()); /// assert!(!a.is_empty()); /// ``` pub fn is_empty(&self) -> bool { @@ -568,6 +568,23 @@ where } } +impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V> +where + K: Hash + Eq, +{ + fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { + let iterator = iter.into_iter(); + let (lower_bound, _) = iterator.size_hint(); + let mut stream_map = Self::with_capacity(lower_bound); + + for (key, value) in iterator { + stream_map.insert(key, value); + } + + stream_map + } +} + mod rand { use std::cell::Cell; @@ -605,10 +622,10 @@ mod rand { /// Fast random number generate /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. - /// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's - /// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf + /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's + /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> /// This generator passes the SmallCrush suite, part of TestU01 framework: - /// http://simul.iro.umontreal.ca/testu01/tu01.html + /// <http://simul.iro.umontreal.ca/testu01/tu01.html> #[derive(Debug)] pub(crate) struct FastRand { one: Cell<u32>, diff --git a/src/wrappers.rs b/src/wrappers.rs index f2dc21f..62cabe4 100644 --- a/src/wrappers.rs +++ b/src/wrappers.rs @@ -1,13 +1,4 @@ //! Wrappers for Tokio types that implement `Stream`. -//! -#![cfg_attr( - unix, - doc = "You are viewing documentation built under unix. To view windows-specific wrappers, change to the `x86_64-pc-windows-msvc` platform." -)] -#![cfg_attr( - windows, - doc = "You are viewing documentation built under windows. To view unix-specific wrappers, change to the `x86_64-unknown-linux-gnu` platform." -)] /// Error types for the wrappers. pub mod errors { @@ -36,9 +27,9 @@ cfg_signal! { #[cfg(unix)] pub use signal_unix::SignalStream; - #[cfg(windows)] + #[cfg(any(windows, docsrs))] mod signal_windows; - #[cfg(windows)] + #[cfg(any(windows, docsrs))] pub use signal_windows::{CtrlCStream, CtrlBreakStream}; } diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs index 06a982d..c8346a6 100644 --- a/src/wrappers/broadcast.rs +++ b/src/wrappers/broadcast.rs @@ -27,6 +27,16 @@ pub enum BroadcastStreamRecvError { Lagged(u64), } +impl fmt::Display for BroadcastStreamRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BroadcastStreamRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt), + } + } +} + +impl std::error::Error for BroadcastStreamRecvError {} + async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) { let result = rx.recv().await; (result, rx) @@ -61,3 +71,9 @@ impl<T> fmt::Debug for BroadcastStream<T> { f.debug_struct("BroadcastStream").finish() } } + +impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/src/wrappers/mpsc_bounded.rs b/src/wrappers/mpsc_bounded.rs index e4f9000..b536268 100644 --- a/src/wrappers/mpsc_bounded.rs +++ b/src/wrappers/mpsc_bounded.rs @@ -57,3 +57,9 @@ impl<T> AsMut<Receiver<T>> for ReceiverStream<T> { &mut self.inner } } + +impl<T> From<Receiver<T>> for ReceiverStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/src/wrappers/mpsc_unbounded.rs b/src/wrappers/mpsc_unbounded.rs index bc5f40c..54597b7 100644 --- a/src/wrappers/mpsc_unbounded.rs +++ b/src/wrappers/mpsc_unbounded.rs @@ -51,3 +51,9 @@ impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { &mut self.inner } } + +impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { + fn from(recv: UnboundedReceiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs index 0ffd1b8..bd3a18a 100644 --- a/src/wrappers/watch.rs +++ b/src/wrappers/watch.rs @@ -11,7 +11,7 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// /// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardles of whether it was the initial value or sent afterwards. +/// regardless of whether it was the initial value or sent afterwards. /// /// # Examples /// @@ -59,7 +59,7 @@ async fn make_future<T: Clone + Send + Sync>( (result, rx) } -impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> { +impl<T: 'static + Clone + Send + Sync> WatchStream<T> { /// Create a new `WatchStream`. pub fn new(rx: Receiver<T>) -> Self { Self { @@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - let (result, rx) = ready!(self.inner.poll(cx)); + let (result, mut rx) = ready!(self.inner.poll(cx)); match result { Ok(_) => { - let received = (*rx.borrow()).clone(); + let received = (*rx.borrow_and_update()).clone(); self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } @@ -94,3 +94,9 @@ impl<T> fmt::Debug for WatchStream<T> { f.debug_struct("WatchStream").finish() } } + +impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> { + fn from(recv: Receiver<T>) -> Self { + Self::new(recv) + } +} diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs index c06bebd..f1c8b4e 100644 --- a/tests/async_send_sync.rs +++ b/tests/async_send_sync.rs @@ -1,3 +1,5 @@ +#![allow(clippy::diverging_sub_expression)] + use std::rc::Rc; #[allow(dead_code)] diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs index 759de30..f3b7edb 100644 --- a/tests/stream_chain.rs +++ b/tests/stream_chain.rs @@ -89,12 +89,12 @@ fn size_overflow() { } fn size_hint(&self) -> (usize, Option<usize>) { - (usize::max_value(), Some(usize::max_value())) + (usize::MAX, Some(usize::MAX)) } } let m1 = Monster; let m2 = Monster; let m = m1.chain(m2); - assert_eq!(m.size_hint(), (usize::max_value(), None)); + assert_eq!(m.size_hint(), (usize::MAX, None)); } diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs index 69cd568..f603bcc 100644 --- a/tests/stream_merge.rs +++ b/tests/stream_merge.rs @@ -72,12 +72,12 @@ fn size_overflow() { } fn size_hint(&self) -> (usize, Option<usize>) { - (usize::max_value(), Some(usize::max_value())) + (usize::MAX, Some(usize::MAX)) } } let m1 = Monster; let m2 = Monster; let m = m1.merge(m2); - assert_eq!(m.size_hint(), (usize::max_value(), None)); + assert_eq!(m.size_hint(), (usize::MAX, None)); } diff --git a/tests/watch.rs b/tests/watch.rs new file mode 100644 index 0000000..a56254e --- /dev/null +++ b/tests/watch.rs @@ -0,0 +1,29 @@ +#![cfg(feature = "sync")] + +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::StreamExt; + +#[tokio::test] +async fn message_not_twice() { + let (tx, rx) = watch::channel("hello"); + + let mut counter = 0; + let mut stream = WatchStream::new(rx).map(move |payload| { + println!("{}", payload); + if payload == "goodbye" { + counter += 1; + } + if counter >= 2 { + panic!("too many goodbyes"); + } + }); + + let task = tokio::spawn(async move { while stream.next().await.is_some() {} }); + + // Send goodbye just once + tx.send("goodbye").unwrap(); + + drop(tx); + task.await.unwrap(); +} |