diff options
author | Haibo Huang <hhb@google.com> | 2021-02-10 06:21:28 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2021-02-10 06:21:28 +0000 |
commit | 599275bb560c9bcb27cc01829259fb4f71a0d10f (patch) | |
tree | cc335798af905732977d573d848005cdc4edc692 | |
parent | 7eaacb2d38acebce19507064062a198893874e93 (diff) | |
parent | 88a779aee6dc5aee6e4036d2f6cd0122e750823c (diff) | |
download | tokio-stream-599275bb560c9bcb27cc01829259fb4f71a0d10f.tar.gz |
Upgrade rust/crates/tokio-stream to 0.1.3 am: cd448d6d4d am: 6fdc81f9a9 am: f58a498ebe am: 88a779aee6
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1582319
MUST ONLY BE SUBMITTED BY AUTOMERGER
Change-Id: I8dec3257e17f667150b5cabf133db70db63b7f6b
-rw-r--r-- | .cargo_vcs_info.json | 5 | ||||
-rw-r--r-- | Android.bp | 9 | ||||
-rw-r--r-- | CHANGELOG.md | 13 | ||||
-rw-r--r-- | Cargo.toml | 9 | ||||
-rw-r--r-- | Cargo.toml.orig | 8 | ||||
-rw-r--r-- | METADATA | 8 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/macros.rs | 10 | ||||
-rw-r--r-- | src/wrappers.rs | 15 | ||||
-rw-r--r-- | src/wrappers/broadcast.rs | 63 | ||||
-rw-r--r-- | src/wrappers/watch.rs | 61 |
11 files changed, 187 insertions, 16 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..4cca143 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,5 @@ +{ + "git": { + "sha1": "23fdc2b3c4b33300c3e8b44dcb126057d90f4935" + } +} @@ -22,13 +22,12 @@ rust_library { // dependent_library ["feature_list"] // autocfg-1.0.1 // bytes-1.0.1 "default,std" -// cfg-if-0.1.10 // cfg-if-1.0.0 // futures-core-0.3.12 "alloc,default,std" // instant-0.1.9 -// libc-0.2.82 "align,default,std" +// libc-0.2.86 "align,default,std" // lock_api-0.4.2 -// log-0.4.13 +// log-0.4.14 // memchr-2.3.4 "default,std" // mio-0.7.7 "default,net,os-ext,os-poll,os-util,tcp,udp,uds" // num_cpus-1.13.0 @@ -42,6 +41,6 @@ rust_library { // signal-hook-registry-1.3.0 // smallvec-1.6.1 // syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut" -// tokio-1.1.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi" -// tokio-macros-1.0.0 +// tokio-1.2.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi" +// tokio-macros-1.1.0 // unicode-xid-0.2.1 "default" diff --git a/CHANGELOG.md b/CHANGELOG.md index bbb7d8c..34de724 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,19 @@ +# 0.1.3 (February 5, 2021) + +Added + + - sync: add wrapper for broadcast and watch ([#3384], [#3504]) + +[#3384]: https://github.com/tokio-rs/tokio/pull/3384 +[#3504]: https://github.com/tokio-rs/tokio/pull/3504 + # 0.1.2 (January 12, 2021) Fixed - - docs: fix some wrappers missing in documentation (#3378) + - docs: fix some wrappers missing in documentation ([#3378]) + +[#3378]: https://github.com/tokio-rs/tokio/pull/3378 # 0.1.1 (January 4, 2021) @@ -13,11 +13,11 @@ [package] edition = "2018" name = "tokio-stream" -version = "0.1.2" +version = "0.1.3" authors = ["Tokio Contributors <team@tokio.rs>"] description = "Utilities to work with `Stream` and `tokio`.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream" categories = ["asynchronous"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" @@ -33,6 +33,10 @@ version = "0.2.0" [dependencies.tokio] version = "1.0" features = ["sync"] + +[dependencies.tokio-util] +version = "0.6.3" +optional = true [dev-dependencies.async-stream] version = "0.3" @@ -52,4 +56,5 @@ default = ["time"] fs = ["tokio/fs"] io-util = ["tokio/io-util"] net = ["tokio/net"] +sync = ["tokio/sync", "tokio-util"] time = ["tokio/time"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index d662c38..6fd9032 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -7,13 +7,13 @@ name = "tokio-stream" # - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.2" +version = "0.1.3" edition = "2018" authors = ["Tokio Contributors <team@tokio.rs>"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream" description = """ Utilities to work with `Stream` and `tokio`. """ @@ -25,16 +25,18 @@ time = ["tokio/time"] net = ["tokio/net"] io-util = ["tokio/io-util"] fs = ["tokio/fs"] +sync = ["tokio/sync", "tokio-util"] [dependencies] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" tokio = { version = "1.0", features = ["sync"] } +tokio-util = { version = "0.6.3", optional = true } [dev-dependencies] tokio = { version = "1.0", features = ["full", "test-util"] } -tokio-test = { path = "../tokio-test" } async-stream = "0.3" +tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } proptest = "0.10.0" @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.2.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.3.crate" } - version: "0.1.2" + version: "0.1.3" license_type: NOTICE last_upgrade_date { year: 2021 - month: 1 - day: 21 + month: 2 + day: 9 } } @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.2")] +#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.3")] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, diff --git a/src/macros.rs b/src/macros.rs index 39ad86c..d4a72c8 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -38,6 +38,16 @@ macro_rules! cfg_time { } } +macro_rules! cfg_sync { + ($($item:item)*) => { + $( + #[cfg(feature = "sync")] + #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] + $item + )* + } +} + macro_rules! ready { ($e:expr $(,)?) => { match $e { diff --git a/src/wrappers.rs b/src/wrappers.rs index c0ffb23..0e8ebdf 100644 --- a/src/wrappers.rs +++ b/src/wrappers.rs @@ -1,11 +1,26 @@ //! Wrappers for Tokio types that implement `Stream`. +/// Error types for the wrappers. +pub mod errors { + cfg_sync! { + pub use crate::wrappers::broadcast::BroadcastStreamRecvError; + } +} + mod mpsc_bounded; pub use mpsc_bounded::ReceiverStream; mod mpsc_unbounded; pub use mpsc_unbounded::UnboundedReceiverStream; +cfg_sync! { + mod broadcast; + pub use broadcast::BroadcastStream; + + mod watch; + pub use watch::WatchStream; +} + cfg_time! { mod interval; pub use interval::IntervalStream; diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs new file mode 100644 index 0000000..06a982d --- /dev/null +++ b/src/wrappers/broadcast.rs @@ -0,0 +1,63 @@ +use std::pin::Pin; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; + +/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct BroadcastStream<T> { + inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>, +} + +/// An error returned from the inner stream of a [`BroadcastStream`]. +#[derive(Debug, PartialEq)] +pub enum BroadcastStreamRecvError { + /// The receiver lagged too far behind. Attempting to receive again will + /// return the oldest message still retained by the channel. + /// + /// Includes the number of skipped messages. + Lagged(u64), +} + +async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) { + let result = rx.recv().await; + (result, rx) +} + +impl<T: 'static + Clone + Send> BroadcastStream<T> { + /// Create a new `BroadcastStream`. + pub fn new(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> { + type Item = Result<T, BroadcastStreamRecvError>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Ok(item) => Poll::Ready(Some(Ok(item))), + Err(RecvError::Closed) => Poll::Ready(None), + Err(RecvError::Lagged(n)) => { + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) + } + } + } +} + +impl<T> fmt::Debug for BroadcastStream<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BroadcastStream").finish() + } +} diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs new file mode 100644 index 0000000..a98a72c --- /dev/null +++ b/src/wrappers/watch.rs @@ -0,0 +1,61 @@ +use std::pin::Pin; +use tokio::sync::watch::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; +use tokio::sync::watch::error::RecvError; + +/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct WatchStream<T> { + inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>, +} + +async fn make_future<T: Clone + Send + Sync>( + mut rx: Receiver<T>, +) -> (Result<(), RecvError>, Receiver<T>) { + let result = rx.changed().await; + (result, rx) +} + +impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> { + /// Create a new `WatchStream`. + pub fn new(rx: Receiver<T>) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow()).clone(); + self.inner.set(make_future(rx)); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(make_future(rx)); + Poll::Ready(None) + } + } + } +} + +impl<T> Unpin for WatchStream<T> {} + +impl<T> fmt::Debug for WatchStream<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WatchStream").finish() + } +} |