summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaibo Huang <hhb@google.com>2021-02-10 05:16:06 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2021-02-10 05:16:06 +0000
commitf58a498ebe56f5488e20145f4038f07b8ff78c60 (patch)
treecc335798af905732977d573d848005cdc4edc692
parentb76f6fedb05588eab803f458773d6adfbc3b50a5 (diff)
parent6fdc81f9a9ff95575d3ffd78772a2eaa9c146703 (diff)
downloadtokio-stream-f58a498ebe56f5488e20145f4038f07b8ff78c60.tar.gz
Upgrade rust/crates/tokio-stream to 0.1.3 am: cd448d6d4d am: 6fdc81f9a9
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1582319 MUST ONLY BE SUBMITTED BY AUTOMERGER Change-Id: Ic2d4e6f6660b5086450aa4b976bc31e00fd26c27
-rw-r--r--.cargo_vcs_info.json5
-rw-r--r--Android.bp9
-rw-r--r--CHANGELOG.md13
-rw-r--r--Cargo.toml9
-rw-r--r--Cargo.toml.orig8
-rw-r--r--METADATA8
-rw-r--r--src/lib.rs2
-rw-r--r--src/macros.rs10
-rw-r--r--src/wrappers.rs15
-rw-r--r--src/wrappers/broadcast.rs63
-rw-r--r--src/wrappers/watch.rs61
11 files changed, 187 insertions, 16 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..4cca143
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,5 @@
+{
+ "git": {
+ "sha1": "23fdc2b3c4b33300c3e8b44dcb126057d90f4935"
+ }
+}
diff --git a/Android.bp b/Android.bp
index 221394c..46c372a 100644
--- a/Android.bp
+++ b/Android.bp
@@ -22,13 +22,12 @@ rust_library {
// dependent_library ["feature_list"]
// autocfg-1.0.1
// bytes-1.0.1 "default,std"
-// cfg-if-0.1.10
// cfg-if-1.0.0
// futures-core-0.3.12 "alloc,default,std"
// instant-0.1.9
-// libc-0.2.82 "align,default,std"
+// libc-0.2.86 "align,default,std"
// lock_api-0.4.2
-// log-0.4.13
+// log-0.4.14
// memchr-2.3.4 "default,std"
// mio-0.7.7 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
// num_cpus-1.13.0
@@ -42,6 +41,6 @@ rust_library {
// signal-hook-registry-1.3.0
// smallvec-1.6.1
// syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-// tokio-1.1.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
-// tokio-macros-1.0.0
+// tokio-1.2.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
+// tokio-macros-1.1.0
// unicode-xid-0.2.1 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bbb7d8c..34de724 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,19 @@
+# 0.1.3 (February 5, 2021)
+
+Added
+
+ - sync: add wrapper for broadcast and watch ([#3384], [#3504])
+
+[#3384]: https://github.com/tokio-rs/tokio/pull/3384
+[#3504]: https://github.com/tokio-rs/tokio/pull/3504
+
# 0.1.2 (January 12, 2021)
Fixed
- - docs: fix some wrappers missing in documentation (#3378)
+ - docs: fix some wrappers missing in documentation ([#3378])
+
+[#3378]: https://github.com/tokio-rs/tokio/pull/3378
# 0.1.1 (January 4, 2021)
diff --git a/Cargo.toml b/Cargo.toml
index 443228a..75ea961 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
[package]
edition = "2018"
name = "tokio-stream"
-version = "0.1.2"
+version = "0.1.3"
authors = ["Tokio Contributors <team@tokio.rs>"]
description = "Utilities to work with `Stream` and `tokio`.\n"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream"
categories = ["asynchronous"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
@@ -33,6 +33,10 @@ version = "0.2.0"
[dependencies.tokio]
version = "1.0"
features = ["sync"]
+
+[dependencies.tokio-util]
+version = "0.6.3"
+optional = true
[dev-dependencies.async-stream]
version = "0.3"
@@ -52,4 +56,5 @@ default = ["time"]
fs = ["tokio/fs"]
io-util = ["tokio/io-util"]
net = ["tokio/net"]
+sync = ["tokio/sync", "tokio-util"]
time = ["tokio/time"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index d662c38..6fd9032 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -7,13 +7,13 @@ name = "tokio-stream"
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.2"
+version = "0.1.3"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream"
description = """
Utilities to work with `Stream` and `tokio`.
"""
@@ -25,16 +25,18 @@ time = ["tokio/time"]
net = ["tokio/net"]
io-util = ["tokio/io-util"]
fs = ["tokio/fs"]
+sync = ["tokio/sync", "tokio-util"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.0", features = ["sync"] }
+tokio-util = { version = "0.6.3", optional = true }
[dev-dependencies]
tokio = { version = "1.0", features = ["full", "test-util"] }
-tokio-test = { path = "../tokio-test" }
async-stream = "0.3"
+tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }
proptest = "0.10.0"
diff --git a/METADATA b/METADATA
index f0d384f..bb02dba 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@ third_party {
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.2.crate"
+ value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.3.crate"
}
- version: "0.1.2"
+ version: "0.1.3"
license_type: NOTICE
last_upgrade_date {
year: 2021
- month: 1
- day: 21
+ month: 2
+ day: 9
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 307a839..731e0e5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,4 @@
-#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.2")]
+#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.3")]
#![allow(
clippy::cognitive_complexity,
clippy::large_enum_variant,
diff --git a/src/macros.rs b/src/macros.rs
index 39ad86c..d4a72c8 100644
--- a/src/macros.rs
+++ b/src/macros.rs
@@ -38,6 +38,16 @@ macro_rules! cfg_time {
}
}
+macro_rules! cfg_sync {
+ ($($item:item)*) => {
+ $(
+ #[cfg(feature = "sync")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+ $item
+ )*
+ }
+}
+
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
diff --git a/src/wrappers.rs b/src/wrappers.rs
index c0ffb23..0e8ebdf 100644
--- a/src/wrappers.rs
+++ b/src/wrappers.rs
@@ -1,11 +1,26 @@
//! Wrappers for Tokio types that implement `Stream`.
+/// Error types for the wrappers.
+pub mod errors {
+ cfg_sync! {
+ pub use crate::wrappers::broadcast::BroadcastStreamRecvError;
+ }
+}
+
mod mpsc_bounded;
pub use mpsc_bounded::ReceiverStream;
mod mpsc_unbounded;
pub use mpsc_unbounded::UnboundedReceiverStream;
+cfg_sync! {
+ mod broadcast;
+ pub use broadcast::BroadcastStream;
+
+ mod watch;
+ pub use watch::WatchStream;
+}
+
cfg_time! {
mod interval;
pub use interval::IntervalStream;
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
new file mode 100644
index 0000000..06a982d
--- /dev/null
+++ b/src/wrappers/broadcast.rs
@@ -0,0 +1,63 @@
+use std::pin::Pin;
+use tokio::sync::broadcast::error::RecvError;
+use tokio::sync::broadcast::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct BroadcastStream<T> {
+ inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
+}
+
+/// An error returned from the inner stream of a [`BroadcastStream`].
+#[derive(Debug, PartialEq)]
+pub enum BroadcastStreamRecvError {
+ /// The receiver lagged too far behind. Attempting to receive again will
+ /// return the oldest message still retained by the channel.
+ ///
+ /// Includes the number of skipped messages.
+ Lagged(u64),
+}
+
+async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
+ let result = rx.recv().await;
+ (result, rx)
+}
+
+impl<T: 'static + Clone + Send> BroadcastStream<T> {
+ /// Create a new `BroadcastStream`.
+ pub fn new(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
+}
+
+impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
+ type Item = Result<T, BroadcastStreamRecvError>;
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, rx) = ready!(self.inner.poll(cx));
+ self.inner.set(make_future(rx));
+ match result {
+ Ok(item) => Poll::Ready(Some(Ok(item))),
+ Err(RecvError::Closed) => Poll::Ready(None),
+ Err(RecvError::Lagged(n)) => {
+ Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
+ }
+ }
+ }
+}
+
+impl<T> fmt::Debug for BroadcastStream<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BroadcastStream").finish()
+ }
+}
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
new file mode 100644
index 0000000..a98a72c
--- /dev/null
+++ b/src/wrappers/watch.rs
@@ -0,0 +1,61 @@
+use std::pin::Pin;
+use tokio::sync::watch::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+use tokio::sync::watch::error::RecvError;
+
+/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct WatchStream<T> {
+ inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
+}
+
+async fn make_future<T: Clone + Send + Sync>(
+ mut rx: Receiver<T>,
+) -> (Result<(), RecvError>, Receiver<T>) {
+ let result = rx.changed().await;
+ (result, rx)
+}
+
+impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
+ /// Create a new `WatchStream`.
+ pub fn new(rx: Receiver<T>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(make_future(rx)),
+ }
+ }
+}
+
+impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow()).clone();
+ self.inner.set(make_future(rx));
+ Poll::Ready(Some(received))
+ }
+ Err(_) => {
+ self.inner.set(make_future(rx));
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl<T> Unpin for WatchStream<T> {}
+
+impl<T> fmt::Debug for WatchStream<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("WatchStream").finish()
+ }
+}