aboutsummaryrefslogtreecommitdiff
path: root/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt
blob: 8083cf0a67ed7c78c2627cd48348e476a09fdbcc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@file:Suppress("DEPRECATION")

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import kotlin.test.*

class BroadcastTest : TestBase() {
    @Test
    fun testBroadcastBasic() = runTest {
        expect(1)
        val b = broadcast {
            expect(4)
            send(1) // goes to receiver
            expect(5)
            select<Unit> { onSend(2) {} } // goes to buffer
            expect(6)
            send(3) // suspends, will not be consumes, but will not be cancelled either
            expect(10)
        }
        yield() // has no effect, because default is lazy
        expect(2)

        val subscription = b.openSubscription()
        expect(3)
        assertEquals(1, subscription.receive()) // suspends
        expect(7)
        assertEquals(2, subscription.receive()) // suspends
        expect(8)
        subscription.cancel()
        expect(9)
        yield() // to broadcast
        finish(11)
    }

    /**
     * See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
     */
    @Test
    fun testChannelBroadcastLazyCancel() = runTest {
        expect(1)
        val a = produce {
            expect(3)
            assertFailsWith<CancellationException> { send("MSG") }
            expect(5)
        }
        expect(2)
        yield() // to produce
        val b = a.broadcast()
        b.cancel()
        expect(4)
        yield() // to abort produce
        assertTrue(a.isClosedForReceive) // the source channel was consumed
        finish(6)
    }

    @Test
    fun testChannelBroadcastLazyClose() = runTest {
        expect(1)
        val a = produce {
            expect(3)
            send("MSG")
            expectUnreached() // is not executed, because send is cancelled
        }
        expect(2)
        yield() // to produce
        val b = a.broadcast()
        b.close()
        expect(4)
        yield() // to abort produce
        assertTrue(a.isClosedForReceive) // the source channel was consumed
        finish(5)
    }

    @Test
    fun testChannelBroadcastEagerCancel() = runTest {
        expect(1)
        val a = produce<Unit> {
            expect(3)
            yield() // back to main
            expectUnreached() // will be cancelled
        }
        expect(2)
        val b = a.broadcast(start = CoroutineStart.DEFAULT)
        yield() // to produce
        expect(4)
        b.cancel()
        yield() // to produce (cancelled)
        assertTrue(a.isClosedForReceive) // the source channel was consumed
        finish(5)
    }

    @Test
    fun testChannelBroadcastEagerClose() = runTest {
        expect(1)
        val a = produce<Unit> {
            expect(3)
            yield() // back to main
            // shall eventually get cancelled
            assertFailsWith<CancellationException> {
                while (true) { send(Unit) }
            }
        }
        expect(2)
        val b = a.broadcast(start = CoroutineStart.DEFAULT)
        yield() // to produce
        expect(4)
        b.close()
        yield() // to produce (closed)
        assertTrue(a.isClosedForReceive) // the source channel was consumed
        finish(5)
    }

    @Test
    fun testBroadcastCloseWithException() = runTest {
        expect(1)
        val b = broadcast(NonCancellable, capacity = 1) {
            expect(2)
            send(1)
            expect(3)
            send(2) // suspends
            expect(5)
            // additional attempts to send fail
            assertFailsWith<TestException> { send(3) }
        }
        val sub = b.openSubscription()
        yield() // into broadcast
        expect(4)
        b.close(TestException()) // close broadcast channel with exception
        assertTrue(b.isClosedForSend) // sub was also closed
        assertEquals(1, sub.receive()) // 1st element received
        assertEquals(2, sub.receive()) // 2nd element received
        assertFailsWith<TestException> { sub.receive() } // then closed with exception
        yield() // to cancel broadcast
        finish(6)
    }
}