summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorElliott Hughes <enh@google.com>2021-04-12 21:24:03 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-04-12 21:24:03 +0000
commitd57b10667d550dbcd272d25b4b108babad2753ec (patch)
tree2f96434d32839687c4a824c79a3d8d7b1c481fcc
parente9f08cc97420b61cbbb563bbfb97d3e8522d7996 (diff)
parent7cf1529946cd403283f49f930247f72723ccab30 (diff)
downloadtokio-stream-d57b10667d550dbcd272d25b4b108babad2753ec.tar.gz
Upgrade rust/crates/tokio-stream to 0.1.5 am: 7cf1529946
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1663499 Change-Id: I52cb8d6de98df980d5cab098deb7f148ae4bd059
-rw-r--r--.cargo_vcs_info.json2
-rw-r--r--Android.bp21
-rw-r--r--CHANGELOG.md23
-rw-r--r--Cargo.toml9
-rw-r--r--Cargo.toml.orig10
-rw-r--r--METADATA8
-rw-r--r--TEST_MAPPING272
-rw-r--r--src/lib.rs7
-rw-r--r--src/macros.rs10
-rw-r--r--src/stream_ext/throttle.rs3
-rw-r--r--src/wrappers.rs21
-rw-r--r--src/wrappers/signal_unix.rs46
-rw-r--r--src/wrappers/signal_windows.rs88
-rw-r--r--src/wrappers/watch.rs37
14 files changed, 524 insertions, 33 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 4cca143..77b1c80 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
{
"git": {
- "sha1": "23fdc2b3c4b33300c3e8b44dcb126057d90f4935"
+ "sha1": "b4918adbd8194e1ed7fd0c7fbe635b039b70c584"
}
}
diff --git a/Android.bp b/Android.bp
index 9061ce0..4948062 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,5 @@
// This file is generated by cargo2android.py --device --run --features=time,net,io-util,fs --dependencies.
+// Do not modify this file as changes will be overridden on upgrade.
package {
default_applicable_licenses: ["external_rust_crates_tokio-stream_license"],
@@ -40,24 +41,24 @@ rust_library {
// autocfg-1.0.1
// bytes-1.0.1 "default,std"
// cfg-if-1.0.0
-// futures-core-0.3.12 "alloc,default,std"
+// futures-core-0.3.13 "alloc,default,std"
// instant-0.1.9
-// libc-0.2.86 "align,default,std"
+// libc-0.2.92 "default,std"
// lock_api-0.4.2
// log-0.4.14
// memchr-2.3.4 "default,std"
-// mio-0.7.7 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
+// mio-0.7.11 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
// num_cpus-1.13.0
-// once_cell-1.5.2 "alloc,default,std"
+// once_cell-1.7.2 "alloc,default,race,std"
// parking_lot-0.11.1 "default"
-// parking_lot_core-0.8.2
-// pin-project-lite-0.2.4
-// proc-macro2-1.0.24 "default,proc-macro"
-// quote-1.0.8 "default,proc-macro"
+// parking_lot_core-0.8.3
+// pin-project-lite-0.2.6
+// proc-macro2-1.0.26 "default,proc-macro"
+// quote-1.0.9 "default,proc-macro"
// scopeguard-1.1.0
// signal-hook-registry-1.3.0
// smallvec-1.6.1
-// syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-// tokio-1.2.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
+// syn-1.0.68 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
+// tokio-1.4.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
// tokio-macros-1.1.0
// unicode-xid-0.2.1 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 34de724..59bb0fa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,26 @@
+# 0.1.5 (March 20, 2021)
+
+### Fixed
+
+- stream: documentation note for throttle `Unpin` ([#3600])
+
+[#3600]: https://github.com/tokio-rs/tokio/pull/3600
+
+# 0.1.4 (March 9, 2021)
+
+Added
+
+- signal: add `Signal` wrapper ([#3510])
+
+Fixed
+
+- stream: remove duplicate `doc_cfg` declaration ([#3561])
+- sync: yield initial value in `WatchStream` ([#3576])
+
+[#3510]: https://github.com/tokio-rs/tokio/pull/3510
+[#3561]: https://github.com/tokio-rs/tokio/pull/3561
+[#3576]: https://github.com/tokio-rs/tokio/pull/3576
+
# 0.1.3 (February 5, 2021)
Added
diff --git a/Cargo.toml b/Cargo.toml
index 75ea961..c734987 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "tokio-stream"
-version = "0.1.3"
+version = "0.1.5"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "Utilities to work with `Stream` and `tokio`.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream"
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
@@ -31,7 +31,7 @@ version = "0.3.0"
version = "0.2.0"
[dependencies.tokio]
-version = "1.0"
+version = "1.2.0"
features = ["sync"]
[dependencies.tokio-util]
@@ -48,7 +48,7 @@ default-features = false
version = "0.10.0"
[dev-dependencies.tokio]
-version = "1.0"
+version = "1.2.0"
features = ["full", "test-util"]
[features]
@@ -56,5 +56,6 @@ default = ["time"]
fs = ["tokio/fs"]
io-util = ["tokio/io-util"]
net = ["tokio/net"]
+signal = ["tokio/signal"]
sync = ["tokio/sync", "tokio-util"]
time = ["tokio/time"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 6fd9032..8588325 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -2,18 +2,17 @@
name = "tokio-stream"
# When releasing to crates.io:
# - Remove path dependencies
-# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.3"
+version = "0.1.5"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream"
description = """
Utilities to work with `Stream` and `tokio`.
"""
@@ -26,15 +25,16 @@ net = ["tokio/net"]
io-util = ["tokio/io-util"]
fs = ["tokio/fs"]
sync = ["tokio/sync", "tokio-util"]
+signal = ["tokio/signal"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
-tokio = { version = "1.0", features = ["sync"] }
+tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.6.3", optional = true }
[dev-dependencies]
-tokio = { version = "1.0", features = ["full", "test-util"] }
+tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] }
async-stream = "0.3"
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }
diff --git a/METADATA b/METADATA
index bb02dba..2dc18b1 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.3.crate"
+ value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.5.crate"
}
- version: "0.1.3"
+ version: "0.1.5"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 2
- day: 9
+ month: 4
+ day: 2
}
}
diff --git a/TEST_MAPPING b/TEST_MAPPING
new file mode 100644
index 0000000..9d2a34e
--- /dev/null
+++ b/TEST_MAPPING
@@ -0,0 +1,272 @@
+// Generated by update_crate_tests.py for tests that depend on this crate.
+{
+ "presubmit": [
+ {
+ "name": "tokio_device_test_tests_tcp_stream"
+ },
+ {
+ "name": "tokio_device_test_tests_io_take"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_semaphore"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_mutex_owned"
+ },
+ {
+ "name": "tokio_device_test_tests_no_rt"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_exact"
+ },
+ {
+ "name": "tokio_device_test_tests_async_send_sync"
+ },
+ {
+ "name": "tokio_device_test_tests_task_local"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_mpsc"
+ },
+ {
+ "name": "tokio_device_test_tests_net_lookup_host"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_notify_both"
+ },
+ {
+ "name": "tokio_device_test_tests_fs_link"
+ },
+ {
+ "name": "tokio-test_device_test_tests_block_on"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_usr1"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_notify"
+ },
+ {
+ "name": "tokio_device_test_tests_process_smoke"
+ },
+ {
+ "name": "tokio_device_test_tests_uds_stream"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_connect"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_test"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_peek"
+ },
+ {
+ "name": "tokio-test_device_test_tests_io"
+ },
+ {
+ "name": "tokio_device_test_tests_uds_datagram"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_to_string"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_split"
+ },
+ {
+ "name": "tokio_device_test_tests_fs_dir"
+ },
+ {
+ "name": "tokio_device_test_tests_io_split"
+ },
+ {
+ "name": "tokio_device_test_tests_io_driver"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_line"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_until"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_watch"
+ },
+ {
+ "name": "tokio_device_test_tests_uds_cred"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_pin"
+ },
+ {
+ "name": "tokio_device_test_tests_fs"
+ },
+ {
+ "name": "tokio_device_test_tests_task_local_set"
+ },
+ {
+ "name": "tokio_device_test_tests_fs_file_mocked"
+ },
+ {
+ "name": "tokio_device_test_tests_io_chain"
+ },
+ {
+ "name": "tokio_device_test_tests_io_lines"
+ },
+ {
+ "name": "tokio_device_test_tests_uds_split"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_broadcast"
+ },
+ {
+ "name": "tokio_device_test_tests_process_kill_on_drop"
+ },
+ {
+ "name": "tokio_device_test_tests_io_write_int"
+ },
+ {
+ "name": "tokio_device_test_tests_process_issue_42"
+ },
+ {
+ "name": "tokio_device_test_tests_rt_basic"
+ },
+ {
+ "name": "tokio_device_test_tests_net_bind_resource"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_join"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_no_rt"
+ },
+ {
+ "name": "tokio_device_test_tests_io_write_buf"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_mutex"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_ctrl_c"
+ },
+ {
+ "name": "tokio_device_test_tests_buffered"
+ },
+ {
+ "name": "tokio-test_device_test_src_lib"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_echo"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_into_split"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_select"
+ },
+ {
+ "name": "tokio_device_test_tests_io_async_read"
+ },
+ {
+ "name": "tokio_device_test_tests_io_driver_drop"
+ },
+ {
+ "name": "tokio-test_device_test_tests_macros"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_drop_recv"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_errors"
+ },
+ {
+ "name": "tokio_device_test_tests_fs_copy"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_socket"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_buf"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_drop_signal"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_into_std"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_barrier"
+ },
+ {
+ "name": "tokio_device_test_tests_rt_threaded"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_drop_rt"
+ },
+ {
+ "name": "tokio_device_test_tests_io_mem_stream"
+ },
+ {
+ "name": "tokio_device_test_tests_fs_file"
+ },
+ {
+ "name": "tokio_device_test_tests_process_issue_2174"
+ },
+ {
+ "name": "tokio_device_test_tests_task_abort"
+ },
+ {
+ "name": "tokio_device_test_tests_rt_common"
+ },
+ {
+ "name": "tokio_device_test_tests_test_clock"
+ },
+ {
+ "name": "tokio_device_test_tests_io_write_all"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_rwlock"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_accept"
+ },
+ {
+ "name": "tokio_device_test_tests_io_copy"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_oneshot"
+ },
+ {
+ "name": "tokio_device_test_tests_udp"
+ },
+ {
+ "name": "tokio_device_test_tests_task_blocking"
+ },
+ {
+ "name": "tokio_device_test_tests_tcp_shutdown"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_twice"
+ },
+ {
+ "name": "tokio_device_test_tests_macros_try_join"
+ },
+ {
+ "name": "tokio_device_test_tests_signal_multi_rt"
+ },
+ {
+ "name": "tokio_device_test_tests_io_async_fd"
+ },
+ {
+ "name": "tokio_device_test_tests_io_read_to_end"
+ },
+ {
+ "name": "tokio_device_test_tests_io_write"
+ },
+ {
+ "name": "tokio_device_test_tests_sync_semaphore_owned"
+ }
+ ]
+}
diff --git a/src/lib.rs b/src/lib.rs
index 731e0e5..af99488 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,3 @@
-#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.3")]
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
@@ -10,18 +9,12 @@
rust_2018_idioms,
unreachable_pub
)]
-#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
-#![doc(test(
- no_crate_inject,
- attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
-))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
-#![cfg_attr(docsrs, feature(doc_cfg))]
//! Stream utilities for Tokio.
//!
diff --git a/src/macros.rs b/src/macros.rs
index d4a72c8..1e3b61b 100644
--- a/src/macros.rs
+++ b/src/macros.rs
@@ -48,6 +48,16 @@ macro_rules! cfg_sync {
}
}
+macro_rules! cfg_signal {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "signal")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "signal")))]
+ $item
+ )*
+ }
+}
+
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs
index 99f3e0e..f36c66a 100644
--- a/src/stream_ext/throttle.rs
+++ b/src/stream_ext/throttle.rs
@@ -23,7 +23,8 @@ where
}
pin_project! {
- /// Stream for the [`throttle`](throttle) function.
+ /// Stream for the [`throttle`](throttle) function. This object is `!Unpin`. If you need it to
+ /// implement `Unpin` you can pin your throttle like this: `Box::pin(your_throttle)`.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Throttle<T> {
diff --git a/src/wrappers.rs b/src/wrappers.rs
index 0e8ebdf..f2dc21f 100644
--- a/src/wrappers.rs
+++ b/src/wrappers.rs
@@ -1,4 +1,13 @@
//! Wrappers for Tokio types that implement `Stream`.
+//!
+#![cfg_attr(
+ unix,
+ doc = "You are viewing documentation built under unix. To view windows-specific wrappers, change to the `x86_64-pc-windows-msvc` platform."
+)]
+#![cfg_attr(
+ windows,
+ doc = "You are viewing documentation built under windows. To view unix-specific wrappers, change to the `x86_64-unknown-linux-gnu` platform."
+)]
/// Error types for the wrappers.
pub mod errors {
@@ -21,6 +30,18 @@ cfg_sync! {
pub use watch::WatchStream;
}
+cfg_signal! {
+ #[cfg(unix)]
+ mod signal_unix;
+ #[cfg(unix)]
+ pub use signal_unix::SignalStream;
+
+ #[cfg(windows)]
+ mod signal_windows;
+ #[cfg(windows)]
+ pub use signal_windows::{CtrlCStream, CtrlBreakStream};
+}
+
cfg_time! {
mod interval;
pub use interval::IntervalStream;
diff --git a/src/wrappers/signal_unix.rs b/src/wrappers/signal_unix.rs
new file mode 100644
index 0000000..2f74e7d
--- /dev/null
+++ b/src/wrappers/signal_unix.rs
@@ -0,0 +1,46 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::signal::unix::Signal;
+
+/// A wrapper around [`Signal`] that implements [`Stream`].
+///
+/// [`Signal`]: struct@tokio::signal::unix::Signal
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "signal"))))]
+pub struct SignalStream {
+ inner: Signal,
+}
+
+impl SignalStream {
+ /// Create a new `SignalStream`.
+ pub fn new(interval: Signal) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `Signal`.
+ pub fn into_inner(self) -> Signal {
+ self.inner
+ }
+}
+
+impl Stream for SignalStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<Signal> for SignalStream {
+ fn as_ref(&self) -> &Signal {
+ &self.inner
+ }
+}
+
+impl AsMut<Signal> for SignalStream {
+ fn as_mut(&mut self) -> &mut Signal {
+ &mut self.inner
+ }
+}
diff --git a/src/wrappers/signal_windows.rs b/src/wrappers/signal_windows.rs
new file mode 100644
index 0000000..4631fba
--- /dev/null
+++ b/src/wrappers/signal_windows.rs
@@ -0,0 +1,88 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::signal::windows::{CtrlBreak, CtrlC};
+
+/// A wrapper around [`CtrlC`] that implements [`Stream`].
+///
+/// [`CtrlC`]: struct@tokio::signal::windows::CtrlC
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))]
+pub struct CtrlCStream {
+ inner: CtrlC,
+}
+
+impl CtrlCStream {
+ /// Create a new `CtrlCStream`.
+ pub fn new(interval: CtrlC) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `CtrlC`.
+ pub fn into_inner(self) -> CtrlC {
+ self.inner
+ }
+}
+
+impl Stream for CtrlCStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<CtrlC> for CtrlCStream {
+ fn as_ref(&self) -> &CtrlC {
+ &self.inner
+ }
+}
+
+impl AsMut<CtrlC> for CtrlCStream {
+ fn as_mut(&mut self) -> &mut CtrlC {
+ &mut self.inner
+ }
+}
+
+/// A wrapper around [`CtrlBreak`] that implements [`Stream`].
+///
+/// [`CtrlBreak`]: struct@tokio::signal::windows::CtrlBreak
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "signal"))))]
+pub struct CtrlBreakStream {
+ inner: CtrlBreak,
+}
+
+impl CtrlBreakStream {
+ /// Create a new `CtrlBreakStream`.
+ pub fn new(interval: CtrlBreak) -> Self {
+ Self { inner: interval }
+ }
+
+ /// Get back the inner `CtrlBreak`.
+ pub fn into_inner(self) -> CtrlBreak {
+ self.inner
+ }
+}
+
+impl Stream for CtrlBreakStream {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.inner.poll_recv(cx)
+ }
+}
+
+impl AsRef<CtrlBreak> for CtrlBreakStream {
+ fn as_ref(&self) -> &CtrlBreak {
+ &self.inner
+ }
+}
+
+impl AsMut<CtrlBreak> for CtrlBreakStream {
+ fn as_mut(&mut self) -> &mut CtrlBreak {
+ &mut self.inner
+ }
+}
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index a98a72c..0ffd1b8 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -10,6 +10,41 @@ use tokio::sync::watch::error::RecvError;
/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
+/// This stream will always start by yielding the current value when the WatchStream is polled,
+/// regardles of whether it was the initial value or sent afterwards.
+///
+/// # Examples
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// assert_eq!(rx.next().await, Some("hello"));
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
+/// ```
+/// # #[tokio::main]
+/// # async fn main() {
+/// use tokio_stream::{StreamExt, wrappers::WatchStream};
+/// use tokio::sync::watch;
+///
+/// let (tx, rx) = watch::channel("hello");
+/// let mut rx = WatchStream::new(rx);
+///
+/// tx.send("goodbye").unwrap();
+/// assert_eq!(rx.next().await, Some("goodbye"));
+/// # }
+/// ```
+///
/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
@@ -28,7 +63,7 @@ impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
- inner: ReusableBoxFuture::new(make_future(rx)),
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
}