summaryrefslogtreecommitdiff
path: root/src/stream_map.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream_map.rs')
-rw-r--r--src/stream_map.rs664
1 files changed, 664 insertions, 0 deletions
diff --git a/src/stream_map.rs b/src/stream_map.rs
new file mode 100644
index 0000000..85b60cf
--- /dev/null
+++ b/src/stream_map.rs
@@ -0,0 +1,664 @@
+use crate::Stream;
+
+use std::borrow::Borrow;
+use std::hash::Hash;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Combine many streams into one, indexing each source stream with a unique
+/// key.
+///
+/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
+/// streams into a single merged stream that yields values in the order that
+/// they arrive from the source streams. However, `StreamMap` has a lot more
+/// flexibility in usage patterns.
+///
+/// `StreamMap` can:
+///
+/// * Merge an arbitrary number of streams.
+/// * Track which source stream the value was received from.
+/// * Handle inserting and removing streams from the set of managed streams at
+/// any point during iteration.
+///
+/// All source streams held by `StreamMap` are indexed using a key. This key is
+/// included with the value when a source stream yields a value. The key is also
+/// used to remove the stream from the `StreamMap` before the stream has
+/// completed streaming.
+///
+/// # `Unpin`
+///
+/// Because the `StreamMap` API moves streams during runtime, both streams and
+/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
+/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
+/// pin the stream in the heap.
+///
+/// # Implementation
+///
+/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
+/// internal implementation detail will persist in future versions, but it is
+/// important to know the runtime implications. In general, `StreamMap` works
+/// best with a "smallish" number of streams as all entries are scanned on
+/// insert, remove, and polling. In cases where a large number of streams need
+/// to be merged, it may be advisable to use tasks sending values on a shared
+/// [`mpsc`] channel.
+///
+/// [`StreamExt::merge`]: crate::StreamExt::merge
+/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
+/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
+/// [`Box::pin`]: std::boxed::Box::pin
+///
+/// # Examples
+///
+/// Merging two streams, then remove them after receiving the first value
+///
+/// ```
+/// use tokio_stream::{StreamExt, StreamMap, Stream};
+/// use tokio::sync::mpsc;
+/// use std::pin::Pin;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
+/// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
+///
+/// // Convert the channels to a `Stream`.
+/// let rx1 = Box::pin(async_stream::stream! {
+/// while let Some(item) = rx1.recv().await {
+/// yield item;
+/// }
+/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+///
+/// let rx2 = Box::pin(async_stream::stream! {
+/// while let Some(item) = rx2.recv().await {
+/// yield item;
+/// }
+/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+///
+/// tokio::spawn(async move {
+/// tx1.send(1).await.unwrap();
+///
+/// // This value will never be received. The send may or may not return
+/// // `Err` depending on if the remote end closed first or not.
+/// let _ = tx1.send(2).await;
+/// });
+///
+/// tokio::spawn(async move {
+/// tx2.send(3).await.unwrap();
+/// let _ = tx2.send(4).await;
+/// });
+///
+/// let mut map = StreamMap::new();
+///
+/// // Insert both streams
+/// map.insert("one", rx1);
+/// map.insert("two", rx2);
+///
+/// // Read twice
+/// for _ in 0..2 {
+/// let (key, val) = map.next().await.unwrap();
+///
+/// if key == "one" {
+/// assert_eq!(val, 1);
+/// } else {
+/// assert_eq!(val, 3);
+/// }
+///
+/// // Remove the stream to prevent reading the next value
+/// map.remove(key);
+/// }
+/// }
+/// ```
+///
+/// This example models a read-only client to a chat system with channels. The
+/// client sends commands to join and leave channels. `StreamMap` is used to
+/// manage active channel subscriptions.
+///
+/// For simplicity, messages are displayed with `println!`, but they could be
+/// sent to the client over a socket.
+///
+/// ```no_run
+/// use tokio_stream::{Stream, StreamExt, StreamMap};
+///
+/// enum Command {
+/// Join(String),
+/// Leave(String),
+/// }
+///
+/// fn commands() -> impl Stream<Item = Command> {
+/// // Streams in user commands by parsing `stdin`.
+/// # tokio_stream::pending()
+/// }
+///
+/// // Join a channel, returns a stream of messages received on the channel.
+/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
+/// // left as an exercise to the reader
+/// # tokio_stream::pending()
+/// }
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let mut channels = StreamMap::new();
+///
+/// // Input commands (join / leave channels).
+/// let cmds = commands();
+/// tokio::pin!(cmds);
+///
+/// loop {
+/// tokio::select! {
+/// Some(cmd) = cmds.next() => {
+/// match cmd {
+/// Command::Join(chan) => {
+/// // Join the channel and add it to the `channels`
+/// // stream map
+/// let msgs = join(&chan);
+/// channels.insert(chan, msgs);
+/// }
+/// Command::Leave(chan) => {
+/// channels.remove(&chan);
+/// }
+/// }
+/// }
+/// Some((chan, msg)) = channels.next() => {
+/// // Received a message, display it on stdout with the channel
+/// // it originated from.
+/// println!("{}: {}", chan, msg);
+/// }
+/// // Both the `commands` stream and the `channels` stream are
+/// // complete. There is no more work to do, so leave the loop.
+/// else => break,
+/// }
+/// }
+/// }
+/// ```
+#[derive(Debug)]
+pub struct StreamMap<K, V> {
+ /// Streams stored in the map
+ entries: Vec<(K, V)>,
+}
+
+impl<K, V> StreamMap<K, V> {
+ /// An iterator visiting all key-value pairs in arbitrary order.
+ ///
+ /// The iterator element type is &'a (K, V).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for (key, stream) in map.iter() {
+ /// println!("({}, {:?})", key, stream);
+ /// }
+ /// ```
+ pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
+ self.entries.iter()
+ }
+
+ /// An iterator visiting all key-value pairs mutably in arbitrary order.
+ ///
+ /// The iterator element type is &'a mut (K, V).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for (key, stream) in map.iter_mut() {
+ /// println!("({}, {:?})", key, stream);
+ /// }
+ /// ```
+ pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
+ self.entries.iter_mut()
+ }
+
+ /// Creates an empty `StreamMap`.
+ ///
+ /// The stream map is initially created with a capacity of `0`, so it will
+ /// not allocate until it is first inserted into.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
+ /// ```
+ pub fn new() -> StreamMap<K, V> {
+ StreamMap { entries: vec![] }
+ }
+
+ /// Creates an empty `StreamMap` with the specified capacity.
+ ///
+ /// The stream map will be able to hold at least `capacity` elements without
+ /// reallocating. If `capacity` is 0, the stream map will not allocate.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
+ /// ```
+ pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
+ StreamMap {
+ entries: Vec::with_capacity(capacity),
+ }
+ }
+
+ /// Returns an iterator visiting all keys in arbitrary order.
+ ///
+ /// The iterator element type is &'a K.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for key in map.keys() {
+ /// println!("{}", key);
+ /// }
+ /// ```
+ pub fn keys(&self) -> impl Iterator<Item = &K> {
+ self.iter().map(|(k, _)| k)
+ }
+
+ /// An iterator visiting all values in arbitrary order.
+ ///
+ /// The iterator element type is &'a V.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for stream in map.values() {
+ /// println!("{:?}", stream);
+ /// }
+ /// ```
+ pub fn values(&self) -> impl Iterator<Item = &V> {
+ self.iter().map(|(_, v)| v)
+ }
+
+ /// An iterator visiting all values mutably in arbitrary order.
+ ///
+ /// The iterator element type is &'a mut V.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// map.insert("a", pending::<i32>());
+ /// map.insert("b", pending());
+ /// map.insert("c", pending());
+ ///
+ /// for stream in map.values_mut() {
+ /// println!("{:?}", stream);
+ /// }
+ /// ```
+ pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
+ self.iter_mut().map(|(_, v)| v)
+ }
+
+ /// Returns the number of streams the map can hold without reallocating.
+ ///
+ /// This number is a lower bound; the `StreamMap` might be able to hold
+ /// more, but is guaranteed to be able to hold at least this many.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, Pending};
+ ///
+ /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
+ /// assert!(map.capacity() >= 100);
+ /// ```
+ pub fn capacity(&self) -> usize {
+ self.entries.capacity()
+ }
+
+ /// Returns the number of streams in the map.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut a = StreamMap::new();
+ /// assert_eq!(a.len(), 0);
+ /// a.insert(1, pending::<i32>());
+ /// assert_eq!(a.len(), 1);
+ /// ```
+ pub fn len(&self) -> usize {
+ self.entries.len()
+ }
+
+ /// Returns `true` if the map contains no elements.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::collections::HashMap;
+ ///
+ /// let mut a = HashMap::new();
+ /// assert!(a.is_empty());
+ /// a.insert(1, "a");
+ /// assert!(!a.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ self.entries.is_empty()
+ }
+
+ /// Clears the map, removing all key-stream pairs. Keeps the allocated
+ /// memory for reuse.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut a = StreamMap::new();
+ /// a.insert(1, pending::<i32>());
+ /// a.clear();
+ /// assert!(a.is_empty());
+ /// ```
+ pub fn clear(&mut self) {
+ self.entries.clear();
+ }
+
+ /// Insert a key-stream pair into the map.
+ ///
+ /// If the map did not have this key present, `None` is returned.
+ ///
+ /// If the map did have this key present, the new `stream` replaces the old
+ /// one and the old stream is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ ///
+ /// assert!(map.insert(37, pending::<i32>()).is_none());
+ /// assert!(!map.is_empty());
+ ///
+ /// map.insert(37, pending());
+ /// assert!(map.insert(37, pending()).is_some());
+ /// ```
+ pub fn insert(&mut self, k: K, stream: V) -> Option<V>
+ where
+ K: Hash + Eq,
+ {
+ let ret = self.remove(&k);
+ self.entries.push((k, stream));
+
+ ret
+ }
+
+ /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
+ ///
+ /// The key may be any borrowed form of the map's key type, but `Hash` and
+ /// `Eq` on the borrowed form must match those for the key type.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ /// map.insert(1, pending::<i32>());
+ /// assert!(map.remove(&1).is_some());
+ /// assert!(map.remove(&1).is_none());
+ /// ```
+ pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
+ where
+ K: Borrow<Q>,
+ Q: Hash + Eq,
+ {
+ for i in 0..self.entries.len() {
+ if self.entries[i].0.borrow() == k {
+ return Some(self.entries.swap_remove(i).1);
+ }
+ }
+
+ None
+ }
+
+ /// Returns `true` if the map contains a stream for the specified key.
+ ///
+ /// The key may be any borrowed form of the map's key type, but `Hash` and
+ /// `Eq` on the borrowed form must match those for the key type.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_stream::{StreamMap, pending};
+ ///
+ /// let mut map = StreamMap::new();
+ /// map.insert(1, pending::<i32>());
+ /// assert_eq!(map.contains_key(&1), true);
+ /// assert_eq!(map.contains_key(&2), false);
+ /// ```
+ pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
+ where
+ K: Borrow<Q>,
+ Q: Hash + Eq,
+ {
+ for i in 0..self.entries.len() {
+ if self.entries[i].0.borrow() == k {
+ return true;
+ }
+ }
+
+ false
+ }
+}
+
+impl<K, V> StreamMap<K, V>
+where
+ K: Unpin,
+ V: Stream + Unpin,
+{
+ /// Polls the next value, includes the vec entry index
+ fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
+ use Poll::*;
+
+ let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
+ let mut idx = start;
+
+ for _ in 0..self.entries.len() {
+ let (_, stream) = &mut self.entries[idx];
+
+ match Pin::new(stream).poll_next(cx) {
+ Ready(Some(val)) => return Ready(Some((idx, val))),
+ Ready(None) => {
+ // Remove the entry
+ self.entries.swap_remove(idx);
+
+ // Check if this was the last entry, if so the cursor needs
+ // to wrap
+ if idx == self.entries.len() {
+ idx = 0;
+ } else if idx < start && start <= self.entries.len() {
+ // The stream being swapped into the current index has
+ // already been polled, so skip it.
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ Pending => {
+ idx = idx.wrapping_add(1) % self.entries.len();
+ }
+ }
+ }
+
+ // If the map is empty, then the stream is complete.
+ if self.entries.is_empty() {
+ Ready(None)
+ } else {
+ Pending
+ }
+ }
+}
+
+impl<K, V> Default for StreamMap<K, V> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl<K, V> Stream for StreamMap<K, V>
+where
+ K: Clone + Unpin,
+ V: Stream + Unpin,
+{
+ type Item = (K, V::Item);
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
+ let key = self.entries[idx].0.clone();
+ Poll::Ready(Some((key, val)))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ let mut ret = (0, Some(0));
+
+ for (_, stream) in &self.entries {
+ let hint = stream.size_hint();
+
+ ret.0 += hint.0;
+
+ match (ret.1, hint.1) {
+ (Some(a), Some(b)) => ret.1 = Some(a + b),
+ (Some(_), None) => ret.1 = None,
+ _ => {}
+ }
+ }
+
+ ret
+ }
+}
+
+mod rand {
+ use std::cell::Cell;
+
+ mod loom {
+ #[cfg(not(loom))]
+ pub(crate) mod rand {
+ use std::collections::hash_map::RandomState;
+ use std::hash::{BuildHasher, Hash, Hasher};
+ use std::sync::atomic::AtomicU32;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ static COUNTER: AtomicU32 = AtomicU32::new(1);
+
+ pub(crate) fn seed() -> u64 {
+ let rand_state = RandomState::new();
+
+ let mut hasher = rand_state.build_hasher();
+
+ // Hash some unique-ish data to generate some new state
+ COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
+
+ // Get the seed
+ hasher.finish()
+ }
+ }
+
+ #[cfg(loom)]
+ pub(crate) mod rand {
+ pub(crate) fn seed() -> u64 {
+ 1
+ }
+ }
+ }
+
+ /// Fast random number generate
+ ///
+ /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
+ /// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's
+ /// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf
+ /// This generator passes the SmallCrush suite, part of TestU01 framework:
+ /// http://simul.iro.umontreal.ca/testu01/tu01.html
+ #[derive(Debug)]
+ pub(crate) struct FastRand {
+ one: Cell<u32>,
+ two: Cell<u32>,
+ }
+
+ impl FastRand {
+ /// Initialize a new, thread-local, fast random number generator.
+ pub(crate) fn new(seed: u64) -> FastRand {
+ let one = (seed >> 32) as u32;
+ let mut two = seed as u32;
+
+ if two == 0 {
+ // This value cannot be zero
+ two = 1;
+ }
+
+ FastRand {
+ one: Cell::new(one),
+ two: Cell::new(two),
+ }
+ }
+
+ pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
+ // This is similar to fastrand() % n, but faster.
+ // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
+ let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
+ (mul >> 32) as u32
+ }
+
+ fn fastrand(&self) -> u32 {
+ let mut s1 = self.one.get();
+ let s0 = self.two.get();
+
+ s1 ^= s1 << 17;
+ s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
+
+ self.one.set(s0);
+ self.two.set(s1);
+
+ s0.wrapping_add(s1)
+ }
+ }
+
+ // Used by `StreamMap`
+ pub(crate) fn thread_rng_n(n: u32) -> u32 {
+ thread_local! {
+ static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
+ }
+
+ THREAD_RNG.with(|rng| rng.fastrand_n(n))
+ }
+}