aboutsummaryrefslogtreecommitdiff
path: root/tests/sink_fanout.rs
blob: 62f32f2872eb0f20571df36f73b3927fbfabfd84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#[cfg(all(feature = "alloc", feature="std", feature="executor"))] // channel::mpsc, executor::
#[test]
fn it_works() {
    use futures::channel::mpsc;
    use futures::executor::block_on;
    use futures::future::join3;
    use futures::sink::SinkExt;
    use futures::stream::{self, StreamExt};

    let (tx1, rx1) = mpsc::channel(1);
    let (tx2, rx2) = mpsc::channel(2);
    let tx = tx1.fanout(tx2).sink_map_err(|_| ());

    let src = stream::iter((0..10).map(Ok));
    let fwd = src.forward(tx);

    let collect_fut1 = rx1.collect::<Vec<_>>();
    let collect_fut2 = rx2.collect::<Vec<_>>();
    let (_, vec1, vec2) = block_on(join3(fwd, collect_fut1, collect_fut2));

    let expected = (0..10).collect::<Vec<_>>();

    assert_eq!(vec1, expected);
    assert_eq!(vec2, expected);
}