aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2021-04-15 20:13:26 +0300
committerGitHub <noreply@github.com>2021-04-15 20:13:26 +0300
commit7872f8fcdf972b4146a9d9b51816e9acc90ce7ea (patch)
treee92842214382978216301a1f4c46d5613bef0260
parent3c83c0cfea619f68f1eb323773d1f057f294023f (diff)
downloadkotlinx.coroutines-7872f8fcdf972b4146a9d9b51816e9acc90ce7ea.tar.gz
Mark BroadcastChannel, ConflatedBroadcastChannel and all related oper… (#2647)
* Mark BroadcastChannel, ConflatedBroadcastChannel and all related operators as obsolete API replaced with SharedFlow and StateFlow * Remove operator fusion with deprecated broadcastIn in order to simplify further Flow maintenance
-rw-r--r--kotlinx-coroutines-core/api/kotlinx-coroutines-core.api1
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Broadcast.kt13
-rw-r--r--kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt12
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt6
-rw-r--r--kotlinx-coroutines-core/common/src/flow/Builders.kt4
-rw-r--r--kotlinx-coroutines-core/common/src/flow/Channels.kt44
-rw-r--r--kotlinx-coroutines-core/common/src/flow/SharedFlow.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/StateFlow.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt18
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Context.kt6
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Merge.kt8
-rw-r--r--kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt139
12 files changed, 56 insertions, 199 deletions
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 23f29b02..0d684b44 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -1102,7 +1102,6 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
public final field onBufferOverflow Lkotlinx/coroutines/channels/BufferOverflow;
public fun <init> (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)V
protected fun additionalToStringProps ()Ljava/lang/String;
- public fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
index 3ed4bc7f..b1c24b45 100644
--- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
@@ -35,11 +35,13 @@ import kotlin.coroutines.intrinsics.*
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways.
*
- * **Note: This API is obsolete.** It will be deprecated and replaced with the
- * [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn]
+ * operator.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
+@ObsoleteCoroutinesApi
public fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
@@ -95,10 +97,12 @@ public fun <E> ReceiveChannel<E>.broadcast(
*
* ### Future replacement
*
+ * This API is obsolete since 1.5.0.
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
- * the broadcasting coroutine in hard-to-specify ways. It will be replaced with
- * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
+ * the broadcasting coroutine in hard-to-specify ways. It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn]
+ * operator.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (1 by default).
@@ -106,6 +110,7 @@ public fun <E> ReceiveChannel<E>.broadcast(
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
+@ObsoleteCoroutinesApi
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
diff --git a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
index 6cd79373..c82b8dbd 100644
--- a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
@@ -20,10 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
* See `BroadcastChannel()` factory function for the description of available
* broadcast channel implementations.
*
- * **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
- * when it becomes stable.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
*/
-@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
+@ObsoleteCoroutinesApi
public interface BroadcastChannel<E> : SendChannel<E> {
/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
@@ -60,9 +60,11 @@ public interface BroadcastChannel<E> : SendChannel<E> {
* * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
* * otherwise -- throws [IllegalArgumentException].
*
- * **Note: This is an experimental api.** It may be changed in the future updates.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow]
+ * and [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
*/
-@ExperimentalCoroutinesApi
+@ObsoleteCoroutinesApi
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
when (capacity) {
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
index 8283bcb1..b768d7c3 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
@@ -26,10 +26,10 @@ import kotlin.jvm.*
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
* number of subscribers.
*
- * **Note: This API is obsolete.** It will be deprecated and replaced by [StateFlow][kotlinx.coroutines.flow.StateFlow]
- * when it becomes stable.
+ * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
+ * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow].
*/
-@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
+@ObsoleteCoroutinesApi
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
/**
* Creates an instance of this class that already holds a value.
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 83ae9136..66b55a90 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -234,7 +234,7 @@ public fun <T> flowViaChannel(
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior.
*
- * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
+ * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
* always fused so that only one properly configured channel is used for execution.
*
* Examples of usage:
@@ -289,7 +289,7 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior.
*
- * Adjacent applications of [callbackFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
+ * Adjacent applications of [callbackFlow], [flowOn], [buffer], and [produceIn] are
* always fused so that only one properly configured channel is used for execution.
*
* Example of usage that converts a multi-shot callback API to a flow.
diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt
index 5b126f0e..4b50ad31 100644
--- a/kotlinx-coroutines-core/common/src/flow/Channels.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt
@@ -133,17 +133,12 @@ private class ChannelAsFlow<T>(
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow)
- override fun dropChannelOperators(): Flow<T>? =
+ override fun dropChannelOperators(): Flow<T> =
ChannelAsFlow(channel, consume)
override suspend fun collectTo(scope: ProducerScope<T>) =
SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll
- override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
- markConsumed() // fail fast on repeated attempt to collect it
- return super.broadcastImpl(scope, start)
- }
-
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
markConsumed() // fail fast on repeated attempt to collect it
return if (capacity == Channel.OPTIONAL_CHANNEL) {
@@ -173,22 +168,16 @@ private class ChannelAsFlow<T>(
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, subscription is cancelled.
*/
-@FlowPreview
+@Deprecated(
+ level = DeprecationLevel.WARNING,
+ message = "'BroadcastChannel' is obsolete and all coreresponding operators are deprecated " +
+ "in the favour of StateFlow and SharedFlow"
+) // Since 1.5.0, was @FlowPreview, safe to remove in 1.7.0
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
emitAll(openSubscription())
}
/**
- * Creates a [broadcast] coroutine that collects the given flow.
- *
- * This transformation is **stateful**, it launches a [broadcast] coroutine
- * that collects the given flow and thus resulting channel should be properly closed or cancelled.
- *
- * A channel with [default][Channel.Factory.BUFFERED] buffer size is created.
- * Use [buffer] operator on the flow before calling `broadcastIn` to specify a value other than
- * default and to control what happens when data is produced faster than it is consumed,
- * that is to control backpressure behavior.
- *
* ### Deprecated
*
* **This API is deprecated.** The [BroadcastChannel] provides a complex channel-like API for hot flows.
@@ -202,13 +191,26 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
@Deprecated(
message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel",
replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"),
- level = DeprecationLevel.WARNING
-)
+ level = DeprecationLevel.ERROR
+) // WARNING in 1.4.0, error in 1.5.0, removed in 1.6.0 (was @FlowPreview)
public fun <T> Flow<T>.broadcastIn(
scope: CoroutineScope,
start: CoroutineStart = CoroutineStart.LAZY
-): BroadcastChannel<T> =
- asChannelFlow().broadcastImpl(scope, start)
+): BroadcastChannel<T> {
+ // Backwards compatibility with operator fusing
+ val channelFlow = asChannelFlow()
+ val capacity = when (channelFlow.onBufferOverflow) {
+ BufferOverflow.SUSPEND -> channelFlow.produceCapacity
+ BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
+ BufferOverflow.DROP_LATEST ->
+ throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
+ }
+ return scope.broadcast(channelFlow.context, capacity = capacity, start = start) {
+ collect { value ->
+ send(value)
+ }
+ }
+}
/**
* Creates a [produce] coroutine that collects the given flow.
diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
index 490b221b..25118d75 100644
--- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
@@ -80,7 +80,7 @@ import kotlin.native.concurrent.*
* ### SharedFlow vs BroadcastChannel
*
* Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel]
- * and is designed to completely replace `BroadcastChannel` in the future.
+ * and is designed to completely replace it.
* It has the following important differences:
*
* * `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
index 74d33140..49407e27 100644
--- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt
@@ -88,7 +88,7 @@ import kotlin.native.concurrent.*
* ### StateFlow vs ConflatedBroadcastChannel
*
* Conceptually, state flow is similar to [ConflatedBroadcastChannel]
- * and is designed to completely replace `ConflatedBroadcastChannel` in the future.
+ * and is designed to completely replace it.
* It has the following important differences:
*
* * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index bf82cf9a..0efe5f86 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -37,7 +37,7 @@ public interface FusibleFlow<T> : Flow<T> {
/**
* Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other.
* This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting
- * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it
+ * methods like ability to [produceIn] the corresponding flow, thus making it
* possible to directly use the backing channel if it exists (hence the `ChannelFlow` name).
*
* @suppress **This an internal API and should not be used from general code.**
@@ -59,7 +59,7 @@ public abstract class ChannelFlow<T>(
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }
- private val produceCapacity: Int
+ internal val produceCapacity: Int
get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity
/**
@@ -107,18 +107,6 @@ public abstract class ChannelFlow<T>(
protected abstract suspend fun collectTo(scope: ProducerScope<T>)
- // broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow.
- // BroadcastChannel does not support onBufferOverflow beyond simple conflation
- public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
- val broadcastCapacity = when (onBufferOverflow) {
- BufferOverflow.SUSPEND -> produceCapacity
- BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
- BufferOverflow.DROP_LATEST ->
- throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
- }
- return scope.broadcast(context, broadcastCapacity, start, block = collectToFun)
- }
-
/**
* Here we use ATOMIC start for a reason (#1825).
* NB: [produceImpl] is used for [flowOn].
@@ -201,7 +189,7 @@ internal class ChannelFlowOperatorImpl<T>(
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
- override fun dropChannelOperators(): Flow<T>? = flow
+ override fun dropChannelOperators(): Flow<T> = flow
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
index cbbb4196..66868967 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
@@ -79,7 +79,7 @@ import kotlin.jvm.*
*
* ### Operator fusion
*
- * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
+ * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
* always fused so that only one properly configured channel is used for execution.
*
* Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls,
@@ -176,7 +176,7 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capaci
*
* ### Operator fusion
*
- * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are
+ * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are
* always fused so that only one properly configured channel is used for execution.
* **Conflation takes precedence over `buffer()` calls with any other capacity.**
*
@@ -219,7 +219,7 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
*
* ### Operator fusion
*
- * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
+ * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
* always fused so that only one properly configured channel is used for execution.
*
* Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index d1cbe72b..432160f3 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -57,7 +57,7 @@ public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>
*
* ### Operator fusion
*
- * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
+ * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
@@ -87,7 +87,7 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
*
* ### Operator fusion
*
- * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
+ * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
@@ -111,7 +111,7 @@ public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
*
* ### Operator fusion
*
- * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
+ * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
@@ -126,7 +126,7 @@ public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge(
*
* ### Operator fusion
*
- * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
+ * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*
* When [concurrency] is greater than 1, this operator is [buffered][buffer] by default
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
index 4763d13b..410955ce 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
@@ -128,145 +128,6 @@ class ChannelBuildersFlowTest : TestBase() {
}
@Test
- fun testBroadcastChannelAsFlow() = runTest {
- val channel = broadcast {
- repeat(10) {
- send(it + 1)
- }
- }
-
- val sum = channel.asFlow().sum()
- assertEquals(55, sum)
- }
-
- @Test
- fun testExceptionInBroadcast() = runTest {
- expect(1)
- val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
- repeat(10) {
- send(it + 1)
- }
- throw TestException()
- }
- assertEquals(15, channel.asFlow().take(5).sum())
-
- // Workaround for JS bug
- try {
- channel.asFlow().collect { /* Do nothing */ }
- expectUnreached()
- } catch (e: TestException) {
- finish(2)
- }
- }
-
- @Test
- fun testBroadcastChannelAsFlowLimits() = runTest {
- val channel = BroadcastChannel<Int>(1)
- val flow = channel.asFlow().map { it * it }.drop(1).take(2)
-
- var expected = 0
- launch {
- assertTrue(channel.trySend(1).isSuccess) // Handed to the coroutine
- assertTrue(channel.trySend(2).isSuccess) // Buffered
- assertFalse(channel.trySend(3).isSuccess) // Failed to offer
- channel.send(3)
- yield()
- assertEquals(1, expected)
- assertTrue(channel.trySend(4).isSuccess) // Handed to the coroutine
- assertTrue(channel.trySend(5).isSuccess) // Buffered
- assertFalse(channel.trySend(6).isSuccess) // Failed to offer
- channel.send(6)
- assertEquals(2, expected)
- }
-
- val sum = flow.sum()
- assertEquals(13, sum)
- ++expected
- val sum2 = flow.sum()
- assertEquals(61, sum2)
- ++expected
- }
-
- @Test
- fun flowAsBroadcast() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
- }
-
- val channel = flow.broadcastIn(this)
- assertEquals((0..9).toList(), channel.openSubscription().toList())
- }
-
- @Test
- fun flowAsBroadcastMultipleSubscription() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
- }
-
- val broadcast = flow.broadcastIn(this)
- val channel = broadcast.openSubscription()
- val channel2 = broadcast.openSubscription()
-
- assertEquals(0, channel.receive())
- assertEquals(0, channel2.receive())
- yield()
- assertEquals(1, channel.receive())
- assertEquals(1, channel2.receive())
-
- channel.cancel()
- channel2.cancel()
- yield()
- ensureActive()
- }
-
- @Test
- fun flowAsBroadcastException() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
-
- throw TestException()
- }
-
- val channel = flow.broadcastIn(this + NonCancellable)
- assertFailsWith<TestException> { channel.openSubscription().toList() }
- assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
- }
-
- // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
- @Test
- fun testFlowAsBroadcastAsFlow() = runTest {
- val flow = flow {
- emit(1)
- emit(2)
- emit(3)
- }.broadcastIn(this).asFlow()
-
- assertEquals(6, flow.sum())
- assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
- }
-
- @Test
- fun testBroadcastAsFlowAsBroadcast() = runTest {
- val channel = broadcast {
- send(1)
- }.asFlow().broadcastIn(this)
-
- channel.openSubscription().consumeEach {
- assertEquals(1, it)
- }
-
- channel.openSubscription().consumeEach {
- fail()
- }
- }
-
- @Test
fun testProduceInAtomicity() = runTest {
val flow = flowOf(1).onCompletion { expect(2) }
val scope = CoroutineScope(wrapperDispatcher())