summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/stream_map.rs17
-rw-r--r--src/wrappers.rs13
-rw-r--r--src/wrappers/broadcast.rs6
-rw-r--r--src/wrappers/mpsc_bounded.rs6
-rw-r--r--src/wrappers/mpsc_unbounded.rs6
-rw-r--r--src/wrappers/watch.rs8
6 files changed, 44 insertions, 12 deletions
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 9dc529a..80a521e 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -568,6 +568,23 @@ where
}
}
+impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
+where
+ K: Hash + Eq,
+{
+ fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
+ let iterator = iter.into_iter();
+ let (lower_bound, _) = iterator.size_hint();
+ let mut stream_map = Self::with_capacity(lower_bound);
+
+ for (key, value) in iterator {
+ stream_map.insert(key, value);
+ }
+
+ stream_map
+ }
+}
+
mod rand {
use std::cell::Cell;
diff --git a/src/wrappers.rs b/src/wrappers.rs
index f2dc21f..62cabe4 100644
--- a/src/wrappers.rs
+++ b/src/wrappers.rs
@@ -1,13 +1,4 @@
//! Wrappers for Tokio types that implement `Stream`.
-//!
-#![cfg_attr(
- unix,
- doc = "You are viewing documentation built under unix. To view windows-specific wrappers, change to the `x86_64-pc-windows-msvc` platform."
-)]
-#![cfg_attr(
- windows,
- doc = "You are viewing documentation built under windows. To view unix-specific wrappers, change to the `x86_64-unknown-linux-gnu` platform."
-)]
/// Error types for the wrappers.
pub mod errors {
@@ -36,9 +27,9 @@ cfg_signal! {
#[cfg(unix)]
pub use signal_unix::SignalStream;
- #[cfg(windows)]
+ #[cfg(any(windows, docsrs))]
mod signal_windows;
- #[cfg(windows)]
+ #[cfg(any(windows, docsrs))]
pub use signal_windows::{CtrlCStream, CtrlBreakStream};
}
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index 3bddbb7..c8346a6 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -71,3 +71,9 @@ impl<T> fmt::Debug for BroadcastStream<T> {
f.debug_struct("BroadcastStream").finish()
}
}
+
+impl<T: 'static + Clone + Send> From<Receiver<T>> for BroadcastStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/src/wrappers/mpsc_bounded.rs b/src/wrappers/mpsc_bounded.rs
index e4f9000..b536268 100644
--- a/src/wrappers/mpsc_bounded.rs
+++ b/src/wrappers/mpsc_bounded.rs
@@ -57,3 +57,9 @@ impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
&mut self.inner
}
}
+
+impl<T> From<Receiver<T>> for ReceiverStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/src/wrappers/mpsc_unbounded.rs b/src/wrappers/mpsc_unbounded.rs
index bc5f40c..54597b7 100644
--- a/src/wrappers/mpsc_unbounded.rs
+++ b/src/wrappers/mpsc_unbounded.rs
@@ -51,3 +51,9 @@ impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
&mut self.inner
}
}
+
+impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+ fn from(recv: UnboundedReceiver<T>) -> Self {
+ Self::new(recv)
+ }
+}
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index 1daca10..bd3a18a 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -59,7 +59,7 @@ async fn make_future<T: Clone + Send + Sync>(
(result, rx)
}
-impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
+impl<T: 'static + Clone + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
Self {
@@ -94,3 +94,9 @@ impl<T> fmt::Debug for WatchStream<T> {
f.debug_struct("WatchStream").finish()
}
}
+
+impl<T: 'static + Clone + Send + Sync> From<Receiver<T>> for WatchStream<T> {
+ fn from(recv: Receiver<T>) -> Self {
+ Self::new(recv)
+ }
+}