diff options
author | Vsevolod Tolstopyatov <qwwdfsad@gmail.com> | 2021-04-15 20:13:26 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-15 20:13:26 +0300 |
commit | 7872f8fcdf972b4146a9d9b51816e9acc90ce7ea (patch) | |
tree | e92842214382978216301a1f4c46d5613bef0260 | |
parent | 3c83c0cfea619f68f1eb323773d1f057f294023f (diff) | |
download | kotlinx.coroutines-7872f8fcdf972b4146a9d9b51816e9acc90ce7ea.tar.gz |
Mark BroadcastChannel, ConflatedBroadcastChannel and all related oper… (#2647)
* Mark BroadcastChannel, ConflatedBroadcastChannel and all related operators as obsolete API replaced with SharedFlow and StateFlow
* Remove operator fusion with deprecated broadcastIn in order to simplify further Flow maintenance
12 files changed, 56 insertions, 199 deletions
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 23f29b02..0d684b44 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -1102,7 +1102,6 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor public final field onBufferOverflow Lkotlinx/coroutines/channels/BufferOverflow; public fun <init> (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)V protected fun additionalToStringProps ()Ljava/lang/String; - public fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel; public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun create (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/internal/ChannelFlow; diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt index 3ed4bc7f..b1c24b45 100644 --- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt +++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt @@ -35,11 +35,13 @@ import kotlin.coroutines.intrinsics.* * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with * the broadcasting coroutine in hard-to-specify ways. * - * **Note: This API is obsolete.** It will be deprecated and replaced with the - * [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] + * operator. * * @param start coroutine start option. The default value is [CoroutineStart.LAZY]. */ +@ObsoleteCoroutinesApi public fun <E> ReceiveChannel<E>.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY @@ -95,10 +97,12 @@ public fun <E> ReceiveChannel<E>.broadcast( * * ### Future replacement * + * This API is obsolete since 1.5.0. * This function has an inappropriate result type of [BroadcastChannel] which provides * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with - * the broadcasting coroutine in hard-to-specify ways. It will be replaced with - * sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future. + * the broadcasting coroutine in hard-to-specify ways. It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] + * operator. * * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. * @param capacity capacity of the channel's buffer (1 by default). @@ -106,6 +110,7 @@ public fun <E> ReceiveChannel<E>.broadcast( * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]). * @param block the coroutine code. */ +@ObsoleteCoroutinesApi public fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, diff --git a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt index 6cd79373..c82b8dbd 100644 --- a/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt @@ -20,10 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED * See `BroadcastChannel()` factory function for the description of available * broadcast channel implementations. * - * **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow] - * when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. */ -@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it +@ObsoleteCoroutinesApi public interface BroadcastChannel<E> : SendChannel<E> { /** * Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it. @@ -60,9 +60,11 @@ public interface BroadcastChannel<E> : SendChannel<E> { * * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity. * * otherwise -- throws [IllegalArgumentException]. * - * **Note: This is an experimental api.** It may be changed in the future updates. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow] + * and [SharedFlow][kotlinx.coroutines.flow.SharedFlow]. */ -@ExperimentalCoroutinesApi +@ObsoleteCoroutinesApi public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> = when (capacity) { 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel") diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt index 8283bcb1..b768d7c3 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt @@ -26,10 +26,10 @@ import kotlin.jvm.* * [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the * number of subscribers. * - * **Note: This API is obsolete.** It will be deprecated and replaced by [StateFlow][kotlinx.coroutines.flow.StateFlow] - * when it becomes stable. + * **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0 + * and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow]. */ -@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it +@ObsoleteCoroutinesApi public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { /** * Creates an instance of this class that already holds a value. diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 83ae9136..66b55a90 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -234,7 +234,7 @@ public fun <T> flowViaChannel( * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. * - * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * * Examples of usage: @@ -289,7 +289,7 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. * - * Adjacent applications of [callbackFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [callbackFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * * Example of usage that converts a multi-shot callback API to a flow. diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index 5b126f0e..4b50ad31 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -133,17 +133,12 @@ private class ChannelAsFlow<T>( override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow) - override fun dropChannelOperators(): Flow<T>? = + override fun dropChannelOperators(): Flow<T> = ChannelAsFlow(channel, consume) override suspend fun collectTo(scope: ProducerScope<T>) = SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll - override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> { - markConsumed() // fail fast on repeated attempt to collect it - return super.broadcastImpl(scope, start) - } - override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> { markConsumed() // fail fast on repeated attempt to collect it return if (capacity == Channel.OPTIONAL_CHANNEL) { @@ -173,22 +168,16 @@ private class ChannelAsFlow<T>( * 2) Flow consumer completes normally when the original channel completes (~is closed) normally. * 3) If the flow consumer fails with an exception, subscription is cancelled. */ -@FlowPreview +@Deprecated( + level = DeprecationLevel.WARNING, + message = "'BroadcastChannel' is obsolete and all coreresponding operators are deprecated " + + "in the favour of StateFlow and SharedFlow" +) // Since 1.5.0, was @FlowPreview, safe to remove in 1.7.0 public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow { emitAll(openSubscription()) } /** - * Creates a [broadcast] coroutine that collects the given flow. - * - * This transformation is **stateful**, it launches a [broadcast] coroutine - * that collects the given flow and thus resulting channel should be properly closed or cancelled. - * - * A channel with [default][Channel.Factory.BUFFERED] buffer size is created. - * Use [buffer] operator on the flow before calling `broadcastIn` to specify a value other than - * default and to control what happens when data is produced faster than it is consumed, - * that is to control backpressure behavior. - * * ### Deprecated * * **This API is deprecated.** The [BroadcastChannel] provides a complex channel-like API for hot flows. @@ -202,13 +191,26 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow { @Deprecated( message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel", replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"), - level = DeprecationLevel.WARNING -) + level = DeprecationLevel.ERROR +) // WARNING in 1.4.0, error in 1.5.0, removed in 1.6.0 (was @FlowPreview) public fun <T> Flow<T>.broadcastIn( scope: CoroutineScope, start: CoroutineStart = CoroutineStart.LAZY -): BroadcastChannel<T> = - asChannelFlow().broadcastImpl(scope, start) +): BroadcastChannel<T> { + // Backwards compatibility with operator fusing + val channelFlow = asChannelFlow() + val capacity = when (channelFlow.onBufferOverflow) { + BufferOverflow.SUSPEND -> channelFlow.produceCapacity + BufferOverflow.DROP_OLDEST -> Channel.CONFLATED + BufferOverflow.DROP_LATEST -> + throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST") + } + return scope.broadcast(channelFlow.context, capacity = capacity, start = start) { + collect { value -> + send(value) + } + } +} /** * Creates a [produce] coroutine that collects the given flow. diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 490b221b..25118d75 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -80,7 +80,7 @@ import kotlin.native.concurrent.* * ### SharedFlow vs BroadcastChannel * * Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel] - * and is designed to completely replace `BroadcastChannel` in the future. + * and is designed to completely replace it. * It has the following important differences: * * * `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index 74d33140..49407e27 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -88,7 +88,7 @@ import kotlin.native.concurrent.* * ### StateFlow vs ConflatedBroadcastChannel * * Conceptually, state flow is similar to [ConflatedBroadcastChannel] - * and is designed to completely replace `ConflatedBroadcastChannel` in the future. + * and is designed to completely replace it. * It has the following important differences: * * * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index bf82cf9a..0efe5f86 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -37,7 +37,7 @@ public interface FusibleFlow<T> : Flow<T> { /** * Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other. * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting - * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it + * methods like ability to [produceIn] the corresponding flow, thus making it * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name). * * @suppress **This an internal API and should not be used from general code.** @@ -59,7 +59,7 @@ public abstract class ChannelFlow<T>( internal val collectToFun: suspend (ProducerScope<T>) -> Unit get() = { collectTo(it) } - private val produceCapacity: Int + internal val produceCapacity: Int get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity /** @@ -107,18 +107,6 @@ public abstract class ChannelFlow<T>( protected abstract suspend fun collectTo(scope: ProducerScope<T>) - // broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow. - // BroadcastChannel does not support onBufferOverflow beyond simple conflation - public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> { - val broadcastCapacity = when (onBufferOverflow) { - BufferOverflow.SUSPEND -> produceCapacity - BufferOverflow.DROP_OLDEST -> Channel.CONFLATED - BufferOverflow.DROP_LATEST -> - throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST") - } - return scope.broadcast(context, broadcastCapacity, start, block = collectToFun) - } - /** * Here we use ATOMIC start for a reason (#1825). * NB: [produceImpl] is used for [flowOn]. @@ -201,7 +189,7 @@ internal class ChannelFlowOperatorImpl<T>( override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) - override fun dropChannelOperators(): Flow<T>? = flow + override fun dropChannelOperators(): Flow<T> = flow override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index cbbb4196..66868967 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -79,7 +79,7 @@ import kotlin.jvm.* * * ### Operator fusion * - * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * * Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls, @@ -176,7 +176,7 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capaci * * ### Operator fusion * - * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are + * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are * always fused so that only one properly configured channel is used for execution. * **Conflation takes precedence over `buffer()` calls with any other capacity.** * @@ -219,7 +219,7 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED) * * ### Operator fusion * - * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are + * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are * always fused so that only one properly configured channel is used for execution. * * Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index d1cbe72b..432160f3 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -57,7 +57,7 @@ public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R> * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. * * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected @@ -87,7 +87,7 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow { * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. */ @ExperimentalCoroutinesApi @@ -111,7 +111,7 @@ public fun <T> Iterable<Flow<T>>.merge(): Flow<T> { * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. */ @ExperimentalCoroutinesApi @@ -126,7 +126,7 @@ public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge( * * ### Operator fusion * - * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with + * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with * its concurrent merging so that only one properly configured channel is used for execution of merging logic. * * When [concurrency] is greater than 1, this operator is [buffered][buffer] by default diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt index 4763d13b..410955ce 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt @@ -128,145 +128,6 @@ class ChannelBuildersFlowTest : TestBase() { } @Test - fun testBroadcastChannelAsFlow() = runTest { - val channel = broadcast { - repeat(10) { - send(it + 1) - } - } - - val sum = channel.asFlow().sum() - assertEquals(55, sum) - } - - @Test - fun testExceptionInBroadcast() = runTest { - expect(1) - val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well - repeat(10) { - send(it + 1) - } - throw TestException() - } - assertEquals(15, channel.asFlow().take(5).sum()) - - // Workaround for JS bug - try { - channel.asFlow().collect { /* Do nothing */ } - expectUnreached() - } catch (e: TestException) { - finish(2) - } - } - - @Test - fun testBroadcastChannelAsFlowLimits() = runTest { - val channel = BroadcastChannel<Int>(1) - val flow = channel.asFlow().map { it * it }.drop(1).take(2) - - var expected = 0 - launch { - assertTrue(channel.trySend(1).isSuccess) // Handed to the coroutine - assertTrue(channel.trySend(2).isSuccess) // Buffered - assertFalse(channel.trySend(3).isSuccess) // Failed to offer - channel.send(3) - yield() - assertEquals(1, expected) - assertTrue(channel.trySend(4).isSuccess) // Handed to the coroutine - assertTrue(channel.trySend(5).isSuccess) // Buffered - assertFalse(channel.trySend(6).isSuccess) // Failed to offer - channel.send(6) - assertEquals(2, expected) - } - - val sum = flow.sum() - assertEquals(13, sum) - ++expected - val sum2 = flow.sum() - assertEquals(61, sum2) - ++expected - } - - @Test - fun flowAsBroadcast() = runTest { - val flow = flow { - repeat(10) { - emit(it) - } - } - - val channel = flow.broadcastIn(this) - assertEquals((0..9).toList(), channel.openSubscription().toList()) - } - - @Test - fun flowAsBroadcastMultipleSubscription() = runTest { - val flow = flow { - repeat(10) { - emit(it) - } - } - - val broadcast = flow.broadcastIn(this) - val channel = broadcast.openSubscription() - val channel2 = broadcast.openSubscription() - - assertEquals(0, channel.receive()) - assertEquals(0, channel2.receive()) - yield() - assertEquals(1, channel.receive()) - assertEquals(1, channel2.receive()) - - channel.cancel() - channel2.cancel() - yield() - ensureActive() - } - - @Test - fun flowAsBroadcastException() = runTest { - val flow = flow { - repeat(10) { - emit(it) - } - - throw TestException() - } - - val channel = flow.broadcastIn(this + NonCancellable) - assertFailsWith<TestException> { channel.openSubscription().toList() } - assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel - } - - // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains - @Test - fun testFlowAsBroadcastAsFlow() = runTest { - val flow = flow { - emit(1) - emit(2) - emit(3) - }.broadcastIn(this).asFlow() - - assertEquals(6, flow.sum()) - assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold - } - - @Test - fun testBroadcastAsFlowAsBroadcast() = runTest { - val channel = broadcast { - send(1) - }.asFlow().broadcastIn(this) - - channel.openSubscription().consumeEach { - assertEquals(1, it) - } - - channel.openSubscription().consumeEach { - fail() - } - } - - @Test fun testProduceInAtomicity() = runTest { val flow = flowOf(1).onCompletion { expect(2) } val scope = CoroutineScope(wrapperDispatcher()) |