From cd448d6d4df5f5ba06ceab1d45c135bbeabe88f1 Mon Sep 17 00:00:00 2001 From: Haibo Huang Date: Tue, 9 Feb 2021 18:23:29 -0800 Subject: Upgrade rust/crates/tokio-stream to 0.1.3 Test: make Change-Id: Ife5296a08ee583c835d161304b28edf296a9eaea --- .cargo_vcs_info.json | 5 ++++ Android.bp | 9 +++---- CHANGELOG.md | 13 +++++++++- Cargo.toml | 9 +++++-- Cargo.toml.orig | 8 +++--- METADATA | 8 +++--- src/lib.rs | 2 +- src/macros.rs | 10 ++++++++ src/wrappers.rs | 15 +++++++++++ src/wrappers/broadcast.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++ src/wrappers/watch.rs | 61 +++++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 187 insertions(+), 16 deletions(-) create mode 100644 .cargo_vcs_info.json create mode 100644 src/wrappers/broadcast.rs create mode 100644 src/wrappers/watch.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..4cca143 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,5 @@ +{ + "git": { + "sha1": "23fdc2b3c4b33300c3e8b44dcb126057d90f4935" + } +} diff --git a/Android.bp b/Android.bp index 221394c..46c372a 100644 --- a/Android.bp +++ b/Android.bp @@ -22,13 +22,12 @@ rust_library { // dependent_library ["feature_list"] // autocfg-1.0.1 // bytes-1.0.1 "default,std" -// cfg-if-0.1.10 // cfg-if-1.0.0 // futures-core-0.3.12 "alloc,default,std" // instant-0.1.9 -// libc-0.2.82 "align,default,std" +// libc-0.2.86 "align,default,std" // lock_api-0.4.2 -// log-0.4.13 +// log-0.4.14 // memchr-2.3.4 "default,std" // mio-0.7.7 "default,net,os-ext,os-poll,os-util,tcp,udp,uds" // num_cpus-1.13.0 @@ -42,6 +41,6 @@ rust_library { // signal-hook-registry-1.3.0 // smallvec-1.6.1 // syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut" -// tokio-1.1.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi" -// tokio-macros-1.0.0 +// tokio-1.2.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi" +// tokio-macros-1.1.0 // unicode-xid-0.2.1 "default" diff --git a/CHANGELOG.md b/CHANGELOG.md index bbb7d8c..34de724 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,19 @@ +# 0.1.3 (February 5, 2021) + +Added + + - sync: add wrapper for broadcast and watch ([#3384], [#3504]) + +[#3384]: https://github.com/tokio-rs/tokio/pull/3384 +[#3504]: https://github.com/tokio-rs/tokio/pull/3504 + # 0.1.2 (January 12, 2021) Fixed - - docs: fix some wrappers missing in documentation (#3378) + - docs: fix some wrappers missing in documentation ([#3378]) + +[#3378]: https://github.com/tokio-rs/tokio/pull/3378 # 0.1.1 (January 4, 2021) diff --git a/Cargo.toml b/Cargo.toml index 443228a..75ea961 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ [package] edition = "2018" name = "tokio-stream" -version = "0.1.2" +version = "0.1.3" authors = ["Tokio Contributors "] description = "Utilities to work with `Stream` and `tokio`.\n" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream" categories = ["asynchronous"] license = "MIT" repository = "https://github.com/tokio-rs/tokio" @@ -33,6 +33,10 @@ version = "0.2.0" [dependencies.tokio] version = "1.0" features = ["sync"] + +[dependencies.tokio-util] +version = "0.6.3" +optional = true [dev-dependencies.async-stream] version = "0.3" @@ -52,4 +56,5 @@ default = ["time"] fs = ["tokio/fs"] io-util = ["tokio/io-util"] net = ["tokio/net"] +sync = ["tokio/sync", "tokio-util"] time = ["tokio/time"] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index d662c38..6fd9032 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -7,13 +7,13 @@ name = "tokio-stream" # - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.2" +version = "0.1.3" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream" description = """ Utilities to work with `Stream` and `tokio`. """ @@ -25,16 +25,18 @@ time = ["tokio/time"] net = ["tokio/net"] io-util = ["tokio/io-util"] fs = ["tokio/fs"] +sync = ["tokio/sync", "tokio-util"] [dependencies] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" tokio = { version = "1.0", features = ["sync"] } +tokio-util = { version = "0.6.3", optional = true } [dev-dependencies] tokio = { version = "1.0", features = ["full", "test-util"] } -tokio-test = { path = "../tokio-test" } async-stream = "0.3" +tokio-test = { path = "../tokio-test" } futures = { version = "0.3", default-features = false } proptest = "0.10.0" diff --git a/METADATA b/METADATA index f0d384f..bb02dba 100644 --- a/METADATA +++ b/METADATA @@ -7,13 +7,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.2.crate" + value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.3.crate" } - version: "0.1.2" + version: "0.1.3" license_type: NOTICE last_upgrade_date { year: 2021 - month: 1 - day: 21 + month: 2 + day: 9 } } diff --git a/src/lib.rs b/src/lib.rs index 307a839..731e0e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.2")] +#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.3")] #![allow( clippy::cognitive_complexity, clippy::large_enum_variant, diff --git a/src/macros.rs b/src/macros.rs index 39ad86c..d4a72c8 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -38,6 +38,16 @@ macro_rules! cfg_time { } } +macro_rules! cfg_sync { + ($($item:item)*) => { + $( + #[cfg(feature = "sync")] + #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] + $item + )* + } +} + macro_rules! ready { ($e:expr $(,)?) => { match $e { diff --git a/src/wrappers.rs b/src/wrappers.rs index c0ffb23..0e8ebdf 100644 --- a/src/wrappers.rs +++ b/src/wrappers.rs @@ -1,11 +1,26 @@ //! Wrappers for Tokio types that implement `Stream`. +/// Error types for the wrappers. +pub mod errors { + cfg_sync! { + pub use crate::wrappers::broadcast::BroadcastStreamRecvError; + } +} + mod mpsc_bounded; pub use mpsc_bounded::ReceiverStream; mod mpsc_unbounded; pub use mpsc_unbounded::UnboundedReceiverStream; +cfg_sync! { + mod broadcast; + pub use broadcast::BroadcastStream; + + mod watch; + pub use watch::WatchStream; +} + cfg_time! { mod interval; pub use interval::IntervalStream; diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs new file mode 100644 index 0000000..06a982d --- /dev/null +++ b/src/wrappers/broadcast.rs @@ -0,0 +1,63 @@ +use std::pin::Pin; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; + +/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct BroadcastStream { + inner: ReusableBoxFuture<(Result, Receiver)>, +} + +/// An error returned from the inner stream of a [`BroadcastStream`]. +#[derive(Debug, PartialEq)] +pub enum BroadcastStreamRecvError { + /// The receiver lagged too far behind. Attempting to receive again will + /// return the oldest message still retained by the channel. + /// + /// Includes the number of skipped messages. + Lagged(u64), +} + +async fn make_future(mut rx: Receiver) -> (Result, Receiver) { + let result = rx.recv().await; + (result, rx) +} + +impl BroadcastStream { + /// Create a new `BroadcastStream`. + pub fn new(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl Stream for BroadcastStream { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, rx) = ready!(self.inner.poll(cx)); + self.inner.set(make_future(rx)); + match result { + Ok(item) => Poll::Ready(Some(Ok(item))), + Err(RecvError::Closed) => Poll::Ready(None), + Err(RecvError::Lagged(n)) => { + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))) + } + } + } +} + +impl fmt::Debug for BroadcastStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BroadcastStream").finish() + } +} diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs new file mode 100644 index 0000000..a98a72c --- /dev/null +++ b/src/wrappers/watch.rs @@ -0,0 +1,61 @@ +use std::pin::Pin; +use tokio::sync::watch::Receiver; + +use futures_core::Stream; +use tokio_util::sync::ReusableBoxFuture; + +use std::fmt; +use std::task::{Context, Poll}; +use tokio::sync::watch::error::RecvError; + +/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. +/// +/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver +/// [`Stream`]: trait@crate::Stream +#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +pub struct WatchStream { + inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver)>, +} + +async fn make_future( + mut rx: Receiver, +) -> (Result<(), RecvError>, Receiver) { + let result = rx.changed().await; + (result, rx) +} + +impl WatchStream { + /// Create a new `WatchStream`. + pub fn new(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } +} + +impl Stream for WatchStream { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow()).clone(); + self.inner.set(make_future(rx)); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(make_future(rx)); + Poll::Ready(None) + } + } + } +} + +impl Unpin for WatchStream {} + +impl fmt::Debug for WatchStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WatchStream").finish() + } +} -- cgit v1.2.3