aboutsummaryrefslogtreecommitdiff
path: root/benches/flatten_unordered.rs
diff options
context:
space:
mode:
Diffstat (limited to 'benches/flatten_unordered.rs')
-rw-r--r--benches/flatten_unordered.rs66
1 files changed, 66 insertions, 0 deletions
diff --git a/benches/flatten_unordered.rs b/benches/flatten_unordered.rs
new file mode 100644
index 0000000..64d5f9a
--- /dev/null
+++ b/benches/flatten_unordered.rs
@@ -0,0 +1,66 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+use futures::stream::{self, StreamExt};
+use futures::task::Poll;
+use std::collections::VecDeque;
+use std::thread;
+
+#[bench]
+fn oneshot_streams(b: &mut Bencher) {
+ const STREAM_COUNT: usize = 10_000;
+ const STREAM_ITEM_COUNT: usize = 1;
+
+ b.iter(|| {
+ let mut txs = VecDeque::with_capacity(STREAM_COUNT);
+ let mut rxs = Vec::new();
+
+ for _ in 0..STREAM_COUNT {
+ let (tx, rx) = oneshot::channel();
+ txs.push_back(tx);
+ rxs.push(rx);
+ }
+
+ thread::spawn(move || {
+ let mut last = 1;
+ while let Some(tx) = txs.pop_front() {
+ let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
+ last += STREAM_ITEM_COUNT;
+ }
+ });
+
+ let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
+ async {
+ if let Some(next) = vals.next() {
+ let val = next.await.unwrap();
+ Some((val, vals))
+ } else {
+ None
+ }
+ }
+ .boxed()
+ })
+ .flatten_unordered(None);
+
+ block_on(future::poll_fn(move |cx| {
+ let mut count = 0;
+ loop {
+ match flatten.poll_next_unpin(cx) {
+ Poll::Ready(None) => break,
+ Poll::Ready(Some(_)) => {
+ count += 1;
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
+
+ Poll::Ready(())
+ }))
+ });
+}