aboutsummaryrefslogtreecommitdiff
path: root/kotlinx-coroutines-core
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core')
-rw-r--r--kotlinx-coroutines-core/api/kotlinx-coroutines-core.api41
-rw-r--r--kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt26
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Channel.kt137
-rw-r--r--kotlinx-coroutines-core/common/src/flow/Channels.kt8
-rw-r--r--kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt20
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt146
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt135
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt8
-rw-r--r--kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt8
-rw-r--r--kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt28
-rw-r--r--kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt32
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt6
13 files changed, 299 insertions, 300 deletions
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 8a35d186..7d2fa1f4 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -625,6 +625,26 @@ public final class kotlinx/coroutines/channels/ChannelKt {
public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
}
+public final class kotlinx/coroutines/channels/ChannelResult {
+ public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion;
+ public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult;
+ public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object;
+ public fun equals (Ljava/lang/Object;)Z
+ public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
+ public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
+ public static final fun exceptionOrNull-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
+ public static final fun getOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
+ public static final fun getOrThrow-impl (Ljava/lang/Object;)Ljava/lang/Object;
+ public fun hashCode ()I
+ public static fun hashCode-impl (Ljava/lang/Object;)I
+ public static final fun isClosed-impl (Ljava/lang/Object;)Z
+ public static final fun isFailure-impl (Ljava/lang/Object;)Z
+ public static final fun isSuccess-impl (Ljava/lang/Object;)Z
+ public fun toString ()Ljava/lang/String;
+ public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
+ public final synthetic fun unbox-impl ()Ljava/lang/Object;
+}
+
public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -789,14 +809,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
public abstract fun cancel (Ljava/util/concurrent/CancellationException;)V
public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1;
- public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1;
+ public abstract fun getOnReceiveCatching ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
public abstract fun isEmpty ()Z
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public abstract fun receiveOrClosed-WVj179g (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
@@ -832,23 +852,6 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum {
public static fun values ()[Lkotlinx/coroutines/channels/TickerMode;
}
-public final class kotlinx/coroutines/channels/ValueOrClosed {
- public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion;
- public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed;
- public fun equals (Ljava/lang/Object;)Z
- public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
- public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
- public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
- public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object;
- public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
- public fun hashCode ()I
- public static fun hashCode-impl (Ljava/lang/Object;)I
- public static final fun isClosed-impl (Ljava/lang/Object;)Z
- public fun toString ()Ljava/lang/String;
- public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
- public final synthetic fun unbox-impl ()Ljava/lang/Object;
-}
-
public final class kotlinx/coroutines/debug/internal/DebugCoroutineInfo {
public fun <init> (Lkotlinx/coroutines/debug/internal/DebugCoroutineInfoImpl;Lkotlin/coroutines/CoroutineContext;)V
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 9721583e..180598c9 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -623,7 +623,7 @@ internal abstract class AbstractChannel<E>(
}
@Suppress("UNCHECKED_CAST")
- public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
+ public final override suspend fun receiveCatching(): ChannelResult<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
@@ -742,10 +742,10 @@ internal abstract class AbstractChannel<E>(
}
}
- final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
- get() = object : SelectClause1<ValueOrClosed<E>> {
+ final override val onReceiveCatching: SelectClause1<ChannelResult<E>>
+ get() = object : SelectClause1<ChannelResult<E>> {
@Suppress("UNCHECKED_CAST")
- override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
+ override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {
registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
}
}
@@ -776,7 +776,7 @@ internal abstract class AbstractChannel<E>(
}
RECEIVE_RESULT -> {
if (!select.trySelect()) return
- startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion)
+ startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)
}
RECEIVE_NULL_ON_CLOSE -> {
if (value.closeCause == null) {
@@ -905,7 +905,7 @@ internal abstract class AbstractChannel<E>(
@JvmField val receiveMode: Int
) : Receive<E>() {
fun resumeValue(value: E): Any? = when (receiveMode) {
- RECEIVE_RESULT -> ValueOrClosed.value(value)
+ RECEIVE_RESULT -> ChannelResult.value(value)
else -> value
}
@@ -990,7 +990,7 @@ internal abstract class AbstractChannel<E>(
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
- if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value,
+ if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
@@ -1000,7 +1000,7 @@ internal abstract class AbstractChannel<E>(
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
- RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
+ RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutineCancellable(null, select.completion)
} else {
@@ -1128,9 +1128,9 @@ internal class Closed<in E>(
override val offerResult get() = this
override val pollResult get() = this
- override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+ override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeSend() {}
- override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
+ override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeReceive(value: E) {}
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
@@ -1143,8 +1143,8 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose
}
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
-private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
- if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E)
+private inline fun <E> Any?.toResult(): ChannelResult<E> =
+ if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E)
@Suppress("NOTHING_TO_INLINE")
-private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause)
+private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index b8b81aac..824a57bf 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -244,8 +244,9 @@ public interface ReceiveChannel<out E> {
/**
* Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
- * This method returns [ValueOrClosed] with the value of an element successfully retrieved from the channel
- * or the close cause if the channel was closed.
+ * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
+ * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally.
+ * The result cannot be [failed][ChannelResult.isFailure] without being [closed][ChannelResult.isClosed].
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
@@ -257,25 +258,17 @@ public interface ReceiveChannel<out E> {
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
- * This function can be used in [select] invocations with the [onReceiveOrClosed] clause.
+ * This function can be used in [select] invocations with the [onReceiveCatching] clause.
* Use [poll] to try receiving from this channel without waiting.
- *
- * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
- * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
*/
- @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
- public suspend fun receiveOrClosed(): ValueOrClosed<E>
+ public suspend fun receiveCatching(): ChannelResult<E>
/**
- * Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
+ * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value
* that is received from the channel or with a close cause if the channel
* [is closed for `receive`][isClosedForReceive].
- *
- * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
- * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
*/
- @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
- public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
+ public val onReceiveCatching: SelectClause1<ChannelResult<E>>
/**
* Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
@@ -321,104 +314,96 @@ public interface ReceiveChannel<out E> {
}
/**
- * A discriminated union of [ReceiveChannel.receiveOrClosed] result
- * that encapsulates either an element of type [T] successfully received from the channel or a close cause.
+ * A discriminated union of channel operation result.
+ * It encapsulates successful or failed result of a channel operation, or a failed operation to a closed channel with
+ * an optional cause.
+ *
+ * Successful result represents a successful operation with value of type [T], for example, result of [Channel.receiveCatching]
+ * operation or a successfully sent element as a result of [Channel.trySend].
*
- * :todo: Do not make it public before resolving todos in the code of this class.
+ * Failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
+ * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
*
- * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
- * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
+ * Closed result represents an operation attempt to a closed channel and also implies that the operation was failed.
*/
-@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", "EXPERIMENTAL_FEATURE_WARNING")
-@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
-public inline class ValueOrClosed<out T>
+@Suppress("UNCHECKED_CAST")
+public inline class ChannelResult<out T>
internal constructor(private val holder: Any?) {
/**
- * Returns `true` if this instance represents a received element.
- * In this case [isClosed] returns `false`.
- * todo: it is commented for now, because it is not used
+ * Returns `true` if this instance represents a successful
+ * operation outcome.
+ *
+ * In this case [isFailure] and [isClosed] return false.
*/
- //public val isValue: Boolean get() = holder !is Closed
+ public val isSuccess: Boolean get() = holder !is Closed
/**
- * Returns `true` if this instance represents a close cause.
- * In this case [isValue] returns `false`.
+ * Returns true if this instance represents unsuccessful operation.
+ *
+ * In this case [isSuccess] returns false, but it does not imply
+ * that the channel is failed or closed.
+ *
+ * Example of failed operation without an exception and channel being closed
+ * is [Channel.trySend] attempt to a channel that is full.
*/
- public val isClosed: Boolean get() = holder is Closed
+ public val isFailure: Boolean get() = holder is Failed
/**
- * Returns the received value if this instance represents a received value, or throws an [IllegalStateException] otherwise.
+ * Returns `true` if this instance represents unsuccessful operation
+ * to a closed or cancelled channel.
*
- * :todo: Decide, if it is needed, how it shall be named with relation to [valueOrThrow]:
+ * In this case [isSuccess] returns false, [isFailure] returns `true`, but it does not imply
+ * that [exceptionOrNull] returns non-null value.
*
- * So we have the following methods on `ValueOrClosed`: `value`, `valueOrNull`, `valueOrThrow`.
- * On the other hand, the channel has the following `receive` variants:
- * * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh?
- * * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull`
- * * `receiveOrClosed`
- * For the sake of simplicity consider dropping this version of `value` and rename [valueOrThrow] to simply `value`.
+ * It can happen if the channel was [closed][Channel.close] normally without an exception.
*/
- @Suppress("UNCHECKED_CAST")
- public val value: T
- get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T
+ public val isClosed: Boolean get() = holder is Closed
/**
- * Returns the received value if this element represents a received value, or `null` otherwise.
- * :todo: Decide if it shall be made into extension that is available only for non-null T.
- * Note: it might become inconsistent with kotlin.Result
+ * Returns the encapsulated value if this instance represents success or `null` if it represents failed result.
*/
- @Suppress("UNCHECKED_CAST")
- public val valueOrNull: T?
- get() = if (holder is Closed) null else holder as T
+ public fun getOrNull(): T? = if (holder !is Failed) holder as T else null
/**
- * :todo: Decide, if it is needed, how it shall be named with relation to [value].
- * Note that `valueOrThrow` rethrows the cause adding no meaningful information about the call site,
- * so if one is sure that `ValueOrClosed` always holds a value, this very property should be used.
- * Otherwise, it could be very hard to locate the source of the exception.
- * todo: it is commented for now, because it is not used
+ * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
*/
- //@Suppress("UNCHECKED_CAST")
- //public val valueOrThrow: T
- // get() = if (holder is Closed) throw holder.exception else holder as T
+ public fun getOrThrow(): T {
+ if (holder !is Failed) return holder as T
+ if (holder is Closed && holder.cause != null) throw holder.cause
+ error("Trying to call 'getOrThrow' on a failed channel result: $holder")
+ }
/**
- * Returns the close cause of the channel if this instance represents a close cause, or throws
- * an [IllegalStateException] otherwise.
+ * Returns the encapsulated exception if this instance represents failure or null if it is success
+ * or unsuccessful operation to closed channel.
*/
- @Suppress("UNCHECKED_CAST")
- public val closeCause: Throwable? get() =
- if (holder is Closed) holder.cause else error("Channel was not closed")
+ public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
- /**
- * @suppress
- */
- public override fun toString(): String =
- when (holder) {
- is Closed -> holder.toString()
- else -> "Value($holder)"
+ internal open class Failed {
+ override fun toString(): String = "Failed"
}
- internal class Closed(@JvmField val cause: Throwable?) {
- // todo: it is commented for now, because it is not used
- //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
+ internal class Closed(@JvmField val cause: Throwable?): Failed() {
override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
override fun hashCode(): Int = cause.hashCode()
override fun toString(): String = "Closed($cause)"
}
- /**
- * todo: consider making value/closed constructors public in the future.
- */
internal companion object {
@Suppress("NOTHING_TO_INLINE")
- internal inline fun <E> value(value: E): ValueOrClosed<E> =
- ValueOrClosed(value)
+ internal inline fun <E> value(value: E): ChannelResult<E> =
+ ChannelResult(value)
@Suppress("NOTHING_TO_INLINE")
- internal inline fun <E> closed(cause: Throwable?): ValueOrClosed<E> =
- ValueOrClosed(Closed(cause))
+ internal inline fun <E> closed(cause: Throwable?): ChannelResult<E> =
+ ChannelResult(Closed(cause))
}
+
+ public override fun toString(): String =
+ when (holder) {
+ is Closed -> holder.toString()
+ else -> "Value($holder)"
+ }
}
/**
diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt
index e883c3b4..5b126f0e 100644
--- a/kotlinx-coroutines-core/common/src/flow/Channels.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt
@@ -30,7 +30,7 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni
emitAllImpl(channel, consume = true)
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
- // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed".
+ // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching".
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
// fix retention of the last emitted value.
// See https://youtrack.jetbrains.com/issue/KT-16222
@@ -47,9 +47,9 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
// L$1 <- channel
// L$2 <- cause
// L$3 <- this$run (actually equal to this)
- val result = run { channel.receiveOrClosed() }
+ val result = run { channel.receiveCatching() }
if (result.isClosed) {
- result.closeCause?.let { throw it }
+ result.exceptionOrNull()?.let { throw it }
break // returns normally when result.closeCause == null
}
// result is spilled here to the coroutine state and retained after the call, even though
@@ -58,7 +58,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>,
// L$1 <- channel
// L$2 <- cause
// L$3 <- result
- emit(result.value)
+ emit(result.getOrThrow())
}
} catch (e: Throwable) {
cause = e
diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
index 91d941b3..e178f124 100644
--- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -35,7 +35,7 @@ class BasicOperationsTest : TestBase() {
}
@Test
- fun testReceiveOrClosed() = runTest {
+ fun testReceiveCatching() = runTest {
TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) }
}
@@ -139,22 +139,22 @@ class BasicOperationsTest : TestBase() {
}
expect(1)
- val result = channel.receiveOrClosed()
- assertEquals(1, result.value)
- assertEquals(1, result.valueOrNull)
- assertTrue(ValueOrClosed.value(1) == result)
+ val result = channel.receiveCatching()
+ assertEquals(1, result.getOrThrow())
+ assertEquals(1, result.getOrNull())
+ assertTrue(ChannelResult.value(1) == result)
expect(3)
launch {
expect(4)
channel.close()
}
- val closed = channel.receiveOrClosed()
+ val closed = channel.receiveCatching()
expect(5)
- assertNull(closed.valueOrNull)
+ assertNull(closed.getOrNull())
assertTrue(closed.isClosed)
- assertNull(closed.closeCause)
- assertTrue(ValueOrClosed.closed<Int>(closed.closeCause) == closed)
+ assertNull(closed.exceptionOrNull())
+ assertTrue(ChannelResult.closed<Int>(closed.exceptionOrNull()) == closed)
finish(6)
}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt
new file mode 100644
index 00000000..4344b105
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.channels
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class ChannelReceiveCatchingTest : TestBase() {
+ @Test
+ fun testChannelOfThrowables() = runTest {
+ val channel = Channel<Throwable>()
+ launch {
+ channel.send(TestException1())
+ channel.close(TestException2())
+ }
+
+ val element = channel.receiveCatching()
+ assertTrue(element.getOrThrow() is TestException1)
+ assertTrue(element.getOrNull() is TestException1)
+
+ val closed = channel.receiveCatching()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.isFailure)
+ assertTrue(closed.exceptionOrNull() is TestException2)
+ }
+
+ @Test
+ @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test
+ fun testNullableIntChanel() = runTest {
+ val channel = Channel<Int?>()
+ launch {
+ expect(2)
+ channel.send(1)
+ expect(3)
+ channel.send(null)
+
+ expect(6)
+ channel.close()
+ }
+
+ expect(1)
+ val element = channel.receiveCatching()
+ assertEquals(1, element.getOrThrow())
+ assertEquals(1, element.getOrNull())
+ assertEquals("Value(1)", element.toString())
+ assertTrue(ChannelResult.value(1) == element) // Don't box
+ assertFalse(element.isFailure)
+ assertFalse(element.isClosed)
+
+ expect(4)
+ val nullElement = channel.receiveCatching()
+ assertNull(nullElement.getOrThrow())
+ assertNull(nullElement.getOrNull())
+ assertEquals("Value(null)", nullElement.toString())
+ assertTrue(ChannelResult.value(null) == nullElement) // Don't box
+ assertFalse(element.isFailure)
+ assertFalse(element.isClosed)
+
+ expect(5)
+ val closed = channel.receiveCatching()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.isFailure)
+
+ val closed2 = channel.receiveCatching()
+ assertTrue(closed2.isClosed)
+ assertTrue(closed.isFailure)
+ assertNull(closed2.exceptionOrNull())
+ finish(7)
+ }
+
+ @Test
+ @ExperimentalUnsignedTypes
+ fun testUIntChannel() = runTest {
+ val channel = Channel<UInt>()
+ launch {
+ expect(2)
+ channel.send(1u)
+ yield()
+ expect(4)
+ channel.send((Long.MAX_VALUE - 1).toUInt())
+ expect(5)
+ }
+
+ expect(1)
+ val element = channel.receiveCatching()
+ assertEquals(1u, element.getOrThrow())
+
+ expect(3)
+ val element2 = channel.receiveCatching()
+ assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.getOrThrow())
+ finish(6)
+ }
+
+ @Test
+ fun testCancelChannel() = runTest {
+ val channel = Channel<Boolean>()
+ launch {
+ expect(2)
+ channel.cancel()
+ }
+
+ expect(1)
+ val closed = channel.receiveCatching()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.isFailure)
+ finish(3)
+ }
+
+ @Test
+ @ExperimentalUnsignedTypes
+ fun testReceiveResultChannel() = runTest {
+ val channel = Channel<ChannelResult<UInt>>()
+ launch {
+ channel.send(ChannelResult.value(1u))
+ channel.send(ChannelResult.closed(TestException1()))
+ channel.close(TestException2())
+ }
+
+ val intResult = channel.receiveCatching()
+ assertEquals(1u, intResult.getOrThrow().getOrThrow())
+ assertFalse(intResult.isFailure)
+ assertFalse(intResult.isClosed)
+
+ val closeCauseResult = channel.receiveCatching()
+ assertTrue(closeCauseResult.getOrThrow().exceptionOrNull() is TestException1)
+
+ val closeCause = channel.receiveCatching()
+ assertTrue(closeCause.isClosed)
+ assertTrue(closeCause.isFailure)
+ assertTrue(closeCause.exceptionOrNull() is TestException2)
+ }
+
+ @Test
+ fun testToString() = runTest {
+ val channel = Channel<String>(1)
+ channel.send("message")
+ channel.close(TestException1("OK"))
+ assertEquals("Value(message)", channel.receiveCatching().toString())
+ // toString implementation for exception differs on every platform
+ val str = channel.receiveCatching().toString()
+ if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex()))
+ error("Unexpected string: '$str'")
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt
deleted file mode 100644
index e58b0dee..00000000
--- a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.channels
-
-import kotlinx.coroutines.*
-import kotlin.test.*
-
-class ChannelReceiveOrClosedTest : TestBase() {
- @Test
- fun testChannelOfThrowables() = runTest {
- val channel = Channel<Throwable>()
- launch {
- channel.send(TestException1())
- channel.close(TestException2())
- }
-
- val element = channel.receiveOrClosed()
- assertTrue(element.value is TestException1)
- assertTrue(element.valueOrNull is TestException1)
-
- val closed = channel.receiveOrClosed()
- assertTrue(closed.isClosed)
- assertTrue(closed.closeCause is TestException2)
- }
-
- @Test
- @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test
- fun testNullableIntChanel() = runTest {
- val channel = Channel<Int?>()
- launch {
- expect(2)
- channel.send(1)
- expect(3)
- channel.send(null)
-
- expect(6)
- channel.close()
- }
-
- expect(1)
- val element = channel.receiveOrClosed()
- assertEquals(1, element.value)
- assertEquals(1, element.valueOrNull)
- assertEquals("Value(1)", element.toString())
- assertTrue(ValueOrClosed.value(1) == element) // Don't box
-
- expect(4)
- val nullElement = channel.receiveOrClosed()
- assertNull(nullElement.value)
- assertNull(nullElement.valueOrNull)
- assertEquals("Value(null)", nullElement.toString())
- assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box
-
- expect(5)
- val closed = channel.receiveOrClosed()
- assertTrue(closed.isClosed)
-
- val closed2 = channel.receiveOrClosed()
- assertTrue(closed2.isClosed)
- assertNull(closed2.closeCause)
- finish(7)
- }
-
- @Test
- @ExperimentalUnsignedTypes
- fun testUIntChannel() = runTest {
- val channel = Channel<UInt>()
- launch {
- expect(2)
- channel.send(1u)
- yield()
- expect(4)
- channel.send((Long.MAX_VALUE - 1).toUInt())
- expect(5)
- }
-
- expect(1)
- val element = channel.receiveOrClosed()
- assertEquals(1u, element.value)
-
- expect(3)
- val element2 = channel.receiveOrClosed()
- assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value)
- finish(6)
- }
-
- @Test
- fun testCancelChannel() = runTest {
- val channel = Channel<Boolean>()
- launch {
- expect(2)
- channel.cancel()
- }
-
- expect(1)
- val closed = channel.receiveOrClosed()
- assertTrue(closed.isClosed)
- finish(3)
- }
-
- @Test
- @ExperimentalUnsignedTypes
- fun testReceiveResultChannel() = runTest {
- val channel = Channel<ValueOrClosed<UInt>>()
- launch {
- channel.send(ValueOrClosed.value(1u))
- channel.send(ValueOrClosed.closed(TestException1()))
- channel.close(TestException2())
- }
-
- val intResult = channel.receiveOrClosed()
- assertEquals(1u, intResult.value.value)
-
- val closeCauseResult = channel.receiveOrClosed()
- assertTrue(closeCauseResult.value.closeCause is TestException1)
-
- val closeCause = channel.receiveOrClosed()
- assertTrue(closeCause.isClosed)
- assertTrue(closeCause.closeCause is TestException2)
- }
-
- @Test
- fun testToString() = runTest {
- val channel = Channel<String>(1)
- channel.send("message")
- channel.close(TestException1("OK"))
- assertEquals("Value(message)", channel.receiveOrClosed().toString())
- // toString implementation for exception differs on every platform
- val str = channel.receiveOrClosed().toString()
- if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex()))
- error("Unexpected string: '$str'")
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt
index d2ef3d26..37db7e45 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -99,7 +99,7 @@ class ChannelUndeliveredElementFailureTest : TestBase() {
fun testReceiveOrClosedCancelledFail() = runTest(unhandled = shouldBeUnhandled) {
val channel = Channel(onUndeliveredElement = onCancelFail)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
- channel.receiveOrClosed()
+ channel.receiveCatching()
expectUnreached() // will be cancelled before it dispatches
}
channel.send(item)
@@ -111,7 +111,7 @@ class ChannelUndeliveredElementFailureTest : TestBase() {
val channel = Channel(onUndeliveredElement = onCancelFail)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expectUnreached()
}
}
@@ -140,4 +140,4 @@ class ChannelUndeliveredElementFailureTest : TestBase() {
expectUnreached()
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
index 993be78e..d28d912d 100644
--- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
+++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -43,7 +43,7 @@ private class ChannelViaBroadcast<E>(
override suspend fun receive(): E = sub.receive()
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
- override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
+ override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching()
override fun poll(): E? = sub.poll()
override fun iterator(): ChannelIterator<E> = sub.iterator()
@@ -57,6 +57,6 @@ private class ChannelViaBroadcast<E>(
get() = sub.onReceive
override val onReceiveOrNull: SelectClause1<E?>
get() = sub.onReceiveOrNull
- override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
- get() = sub.onReceiveOrClosed
+ override val onReceiveCatching: SelectClause1<ChannelResult<E>>
+ get() = sub.onReceiveCatching
}
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
index a4f8c3ba..0158c843 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.selects
@@ -295,10 +295,10 @@ class SelectArrayChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
}
}
@@ -316,10 +316,10 @@ class SelectArrayChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertTrue(it.closeCause is TestException)
+ assertTrue(it.exceptionOrNull() is TestException)
}
}
@@ -327,16 +327,16 @@ class SelectArrayChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosed() = runTest {
+ fun testSelectReceiveCatching() = runTest {
val c = Channel<Int>(1)
val iterations = 10
expect(1)
val job = launch {
repeat(iterations) {
select<Unit> {
- c.onReceiveOrClosed { v ->
+ c.onReceiveCatching { v ->
expect(4 + it * 2)
- assertEquals(it, v.value)
+ assertEquals(it, v.getOrNull())
}
}
}
@@ -360,9 +360,9 @@ class SelectArrayChannelTest : TestBase() {
launch {
expect(3)
val res = select<String> {
- c.onReceiveOrClosed { v ->
+ c.onReceiveCatching { v ->
expect(6)
- assertEquals(42, v.value)
+ assertEquals(42, v.getOrNull())
yield() // back to main
expect(8)
"OK"
@@ -396,9 +396,9 @@ class SelectArrayChannelTest : TestBase() {
expect(1)
select<Unit> {
expect(2)
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
finish(3)
}
}
@@ -412,9 +412,9 @@ class SelectArrayChannelTest : TestBase() {
expect(1)
select<Unit> {
expect(2)
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
assertFalse(it.isClosed)
- assertEquals(42, it.value)
+ assertEquals(42, it.getOrNull())
finish(3)
}
}
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
index 2027630f..6a157676 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
@@ -306,7 +306,7 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedWaitClosed() = runTest {
+ fun testSelectReceiveCatchingWaitClosed() = runTest {
expect(1)
val channel = Channel<String>(Channel.RENDEZVOUS)
launch {
@@ -316,10 +316,10 @@ class SelectRendezvousChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
}
}
@@ -327,7 +327,7 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest {
+ fun testSelectReceiveCatchingWaitClosedWithCause() = runTest {
expect(1)
val channel = Channel<String>(Channel.RENDEZVOUS)
launch {
@@ -337,10 +337,10 @@ class SelectRendezvousChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertTrue(it.closeCause is TestException)
+ assertTrue(it.exceptionOrNull() is TestException)
}
}
@@ -348,31 +348,31 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedForClosedChannel() = runTest {
+ fun testSelectReceiveCatchingForClosedChannel() = runTest {
val channel = Channel<Unit>()
channel.close()
expect(1)
select<Unit> {
expect(2)
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
finish(3)
}
}
}
@Test
- fun testSelectReceiveOrClosed() = runTest {
+ fun testSelectReceiveCatching() = runTest {
val channel = Channel<Int>(Channel.RENDEZVOUS)
val iterations = 10
expect(1)
val job = launch {
repeat(iterations) {
select<Unit> {
- channel.onReceiveOrClosed { v ->
+ channel.onReceiveCatching { v ->
expect(4 + it * 2)
- assertEquals(it, v.value)
+ assertEquals(it, v.getOrThrow())
}
}
}
@@ -390,15 +390,15 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedDispatch() = runTest {
+ fun testSelectReceiveCatchingDispatch() = runTest {
val c = Channel<Int>(Channel.RENDEZVOUS)
expect(1)
launch {
expect(3)
val res = select<String> {
- c.onReceiveOrClosed { v ->
+ c.onReceiveCatching { v ->
expect(6)
- assertEquals(42, v.value)
+ assertEquals(42, v.getOrThrow())
yield() // back to main
expect(8)
"OK"
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
index 1188329a..3f502ba9 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -190,7 +190,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T
2 -> select { channel.onReceive { it } }
3 -> channel.receiveOrNull() ?: error("Should not be closed")
4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } }
- 5 -> channel.receiveOrClosed().value
+ 5 -> channel.receiveCatching().getOrThrow()
6 -> {
val iterator = channel.iterator()
check(iterator.hasNext()) { "Should not be closed" }
diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
index 51789078..6ce2f20d 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -112,13 +112,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
var sum = 0
var n = 0
whileSelect {
- this@averageInTimeWindow.onReceiveOrClosed {
+ this@averageInTimeWindow.onReceiveCatching {
if (it.isClosed) {
// Send leftovers and bail out
if (n != 0) send(sum / n.toDouble())
false
} else {
- sum += it.value
+ sum += it.getOrThrow()
++n
true
}