aboutsummaryrefslogtreecommitdiff
path: root/tests/stream_select_all.rs
blob: 6178412f4dcec08abf686e60b2dc4ce1b51cbebe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#[test]
fn is_terminated() {
    use futures::future::{self, FutureExt};
    use futures::stream::{FusedStream, SelectAll, StreamExt};
    use futures::task::Poll;
    use futures_test::task::noop_context;

    let mut cx = noop_context();
    let mut tasks = SelectAll::new();

    assert_eq!(tasks.is_terminated(), false);
    assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
    assert_eq!(tasks.is_terminated(), true);

    // Test that the sentinel value doesn't leak
    assert_eq!(tasks.is_empty(), true);
    assert_eq!(tasks.len(), 0);

    tasks.push(future::ready(1).into_stream());

    assert_eq!(tasks.is_empty(), false);
    assert_eq!(tasks.len(), 1);

    assert_eq!(tasks.is_terminated(), false);
    assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
    assert_eq!(tasks.is_terminated(), false);
    assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
    assert_eq!(tasks.is_terminated(), true);
}

#[test]
fn issue_1626() {
    use futures::executor::block_on_stream;
    use futures::stream;

    let a = stream::iter(0..=2);
    let b = stream::iter(10..=14);

    let mut s = block_on_stream(stream::select_all(vec![a, b]));

    assert_eq!(s.next(), Some(0));
    assert_eq!(s.next(), Some(10));
    assert_eq!(s.next(), Some(1));
    assert_eq!(s.next(), Some(11));
    assert_eq!(s.next(), Some(2));
    assert_eq!(s.next(), Some(12));
    assert_eq!(s.next(), Some(13));
    assert_eq!(s.next(), Some(14));
    assert_eq!(s.next(), None);
}

#[test]
fn works_1() {
    use futures::channel::mpsc;
    use futures::executor::block_on_stream;
    use futures::stream::select_all;

    let (a_tx, a_rx) = mpsc::unbounded::<u32>();
    let (b_tx, b_rx) = mpsc::unbounded::<u32>();
    let (c_tx, c_rx) = mpsc::unbounded::<u32>();

    let streams = vec![a_rx, b_rx, c_rx];

    let mut stream = block_on_stream(select_all(streams));

    b_tx.unbounded_send(99).unwrap();
    a_tx.unbounded_send(33).unwrap();
    assert_eq!(Some(33), stream.next());
    assert_eq!(Some(99), stream.next());

    b_tx.unbounded_send(99).unwrap();
    a_tx.unbounded_send(33).unwrap();
    assert_eq!(Some(33), stream.next());
    assert_eq!(Some(99), stream.next());

    c_tx.unbounded_send(42).unwrap();
    assert_eq!(Some(42), stream.next());
    a_tx.unbounded_send(43).unwrap();
    assert_eq!(Some(43), stream.next());

    drop((a_tx, b_tx, c_tx));
    assert_eq!(None, stream.next());
}