diff options
14 files changed, 143 insertions, 57 deletions
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 7d2fa1f4..062e466e 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -555,6 +555,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V + public static fun poll (Lkotlinx/coroutines/channels/ActorScope;)Ljava/lang/Object; } public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel { @@ -566,6 +567,7 @@ public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : k public final class kotlinx/coroutines/channels/BroadcastChannel$DefaultImpls { public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V + public static fun offer (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Object;)Z } public final class kotlinx/coroutines/channels/BroadcastChannelKt { @@ -598,6 +600,8 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co public final class kotlinx/coroutines/channels/Channel$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V + public static fun offer (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Object;)Z + public static fun poll (Lkotlinx/coroutines/channels/Channel;)Ljava/lang/Object; } public final class kotlinx/coroutines/channels/Channel$Factory { @@ -628,7 +632,7 @@ public final class kotlinx/coroutines/channels/ChannelKt { public final class kotlinx/coroutines/channels/ChannelResult { public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion; public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult; - public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object; + public static fun constructor-impl (Ljava/lang/Object;)Ljava/lang/Object; public fun equals (Ljava/lang/Object;)Z public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z @@ -645,6 +649,12 @@ public final class kotlinx/coroutines/channels/ChannelResult { public final synthetic fun unbox-impl ()Ljava/lang/Object; } +public final class kotlinx/coroutines/channels/ChannelResult$Companion { + public final fun closed-JP2dKIU (Ljava/lang/Throwable;)Ljava/lang/Object; + public final fun failure-PtdJZtk ()Ljava/lang/Object; + public final fun success-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object; +} + public final class kotlinx/coroutines/channels/ChannelsKt { public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -789,6 +799,7 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli public fun offer (Ljava/lang/Object;)Z public fun openSubscription ()Lkotlinx/coroutines/channels/ReceiveChannel; public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object; } public final class kotlinx/coroutines/channels/ProduceKt { @@ -804,6 +815,10 @@ public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotl public abstract fun getChannel ()Lkotlinx/coroutines/channels/SendChannel; } +public final class kotlinx/coroutines/channels/ProducerScope$DefaultImpls { + public static fun offer (Lkotlinx/coroutines/channels/ProducerScope;Ljava/lang/Object;)Z +} + public abstract interface class kotlinx/coroutines/channels/ReceiveChannel { public abstract synthetic fun cancel ()V public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z @@ -818,12 +833,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel { public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun tryReceive-PtdJZtk ()Ljava/lang/Object; } public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V + public static fun poll (Lkotlinx/coroutines/channels/ReceiveChannel;)Ljava/lang/Object; } public abstract interface class kotlinx/coroutines/channels/SendChannel { @@ -834,10 +851,12 @@ public abstract interface class kotlinx/coroutines/channels/SendChannel { public abstract fun isFull ()Z public abstract fun offer (Ljava/lang/Object;)Z public abstract fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object; } public final class kotlinx/coroutines/channels/SendChannel$DefaultImpls { public static synthetic fun close$default (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z + public static fun offer (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Z } public final class kotlinx/coroutines/channels/TickerChannelsKt { diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 180598c9..82143b03 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -137,23 +137,42 @@ internal abstract class AbstractSendChannel<E>( return sendSuspend(element) } - public final override fun offer(element: E): Boolean { + override fun offer(element: E): Boolean { + try { + return super.offer(element) + } catch (e: Throwable) { + onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { + // If it crashes, add send exception as suppressed for better diagnostics + it.addSuppressed(e) + throw it + } + throw e + } + } + + public final override fun trySend(element: E): ChannelResult<Unit> { val result = offerInternal(element) return when { - result === OFFER_SUCCESS -> true + result === OFFER_SUCCESS -> ChannelResult.success(Unit) result === OFFER_FAILED -> { - // We should check for closed token on offer as well, otherwise offer won't be linearizable + // We should check for closed token on trySend as well, otherwise trySend won't be linearizable // in the face of concurrent close() // See https://github.com/Kotlin/kotlinx.coroutines/issues/359 - throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false)) + val closedForSend = closedForSend ?: return ChannelResult.failure() + ChannelResult.closed(helpCloseAndGetSendException(closedForSend)) } result is Closed<*> -> { - throw recoverStackTrace(helpCloseAndGetSendException(element, result)) + ChannelResult.closed(helpCloseAndGetSendException(result)) } - else -> error("offerInternal returned $result") + else -> error("trySend returned $result") } } + private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable { + helpClose(closed) + return closed.sendException + } + private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable { // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419 @@ -632,9 +651,11 @@ internal abstract class AbstractChannel<E>( } @Suppress("UNCHECKED_CAST") - public final override fun poll(): E? { + public final override fun tryReceive(): ChannelResult<E> { val result = pollInternal() - return if (result === POLL_FAILED) null else receiveOrNullResult(result) + if (result === POLL_FAILED) return ChannelResult.failure() + if (result is Closed<*>) return ChannelResult.closed(result.closeCause) + return ChannelResult.success(result as E) } @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x") @@ -905,7 +926,7 @@ internal abstract class AbstractChannel<E>( @JvmField val receiveMode: Int ) : Receive<E>() { fun resumeValue(value: E): Any? = when (receiveMode) { - RECEIVE_RESULT -> ChannelResult.value(value) + RECEIVE_RESULT -> ChannelResult.success(value) else -> value } @@ -990,7 +1011,7 @@ internal abstract class AbstractChannel<E>( @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(value: E) { block.startCoroutineCancellable( - if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value, + if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value, select.completion, resumeOnCancellationFun(value) ) @@ -1144,7 +1165,7 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") private inline fun <E> Any?.toResult(): ChannelResult<E> = - if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E) + if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E) @Suppress("NOTHING_TO_INLINE") private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 824a57bf..61d06d83 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -85,7 +85,22 @@ public interface SendChannel<in E> { * then it calls `onUndeliveredElement` before throwing an exception. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. */ - public fun offer(element: E): Boolean + public fun offer(element: E): Boolean { + val result = trySend(element) + if (result.isSuccess) return true + throw recoverStackTrace(result.exceptionOrNull() ?: return false) + } + + /** + * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions, + * and returns the successful result. Otherwise, returns failed or closed result. + * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws. + * + * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and + * it does not call `onUndeliveredElement` that was installed for this channel. + * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. + */ + public fun trySend(element: E): ChannelResult<Unit> /** * Closes this channel. @@ -218,7 +233,7 @@ public interface ReceiveChannel<out E> { @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") @LowPriorityInOverloadResolution @Deprecated( - message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension", + message = "Deprecated in favor of receiveCatching and receiveOrNull extension", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull") ) @@ -230,13 +245,13 @@ public interface ReceiveChannel<out E> { * [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with * the original [close][SendChannel.close] cause exception if the channel has _failed_. * - * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension. + * @suppress **Deprecated**: in favor of receiveCatching and onReceiveOrNull extension. */ @ObsoleteCoroutinesApi @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") @LowPriorityInOverloadResolution @Deprecated( - message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension", + message = "Deprecated in favor of receiveCatching and onReceiveOrNull extension", level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull") ) @@ -251,7 +266,7 @@ public interface ReceiveChannel<out E> { * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with a [CancellationException]. * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel, + * suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel, * but then throw [CancellationException], thus failing to deliver the element. * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. * @@ -271,11 +286,22 @@ public interface ReceiveChannel<out E> { public val onReceiveCatching: SelectClause1<ChannelResult<E>> /** - * Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty + * Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty * or is [is closed for `receive`][isClosedForReceive] without a cause. * It throws the original [close][SendChannel.close] cause exception if the channel has _failed_. */ - public fun poll(): E? + public fun poll(): E? { + val result = tryReceive() + if (result.isSuccess) return result.getOrThrow() + throw recoverStackTrace(result.exceptionOrNull() ?: return null) + } + + /** + * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success] + * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed] + * result if the channel is closed. + */ + public fun tryReceive(): ChannelResult<E> /** * Returns a new iterator to receive elements from this channel using a `for` loop. @@ -315,35 +341,35 @@ public interface ReceiveChannel<out E> { /** * A discriminated union of channel operation result. - * It encapsulates successful or failed result of a channel operation, or a failed operation to a closed channel with + * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with * an optional cause. * - * Successful result represents a successful operation with value of type [T], for example, result of [Channel.receiveCatching] - * operation or a successfully sent element as a result of [Channel.trySend]. + * The successful result represents a successful operation with a value of type [T], for example, + * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend]. * - * Failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed. + * The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed. * E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state. * - * Closed result represents an operation attempt to a closed channel and also implies that the operation was failed. + * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed. */ @Suppress("UNCHECKED_CAST") public inline class ChannelResult<out T> -internal constructor(private val holder: Any?) { +@PublishedApi internal constructor(private val holder: Any?) { /** * Returns `true` if this instance represents a successful * operation outcome. * - * In this case [isFailure] and [isClosed] return false. + * In this case [isFailure] and [isClosed] return `false`. */ - public val isSuccess: Boolean get() = holder !is Closed + public val isSuccess: Boolean get() = holder !is Failed /** - * Returns true if this instance represents unsuccessful operation. + * Returns `true` if this instance represents unsuccessful operation. * * In this case [isSuccess] returns false, but it does not imply * that the channel is failed or closed. * - * Example of failed operation without an exception and channel being closed + * Example of a failed operation without an exception and channel being closed * is [Channel.trySend] attempt to a channel that is full. */ public val isFailure: Boolean get() = holder is Failed @@ -352,7 +378,7 @@ internal constructor(private val holder: Any?) { * Returns `true` if this instance represents unsuccessful operation * to a closed or cancelled channel. * - * In this case [isSuccess] returns false, [isFailure] returns `true`, but it does not imply + * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply * that [exceptionOrNull] returns non-null value. * * It can happen if the channel was [closed][Channel.close] normally without an exception. @@ -374,7 +400,7 @@ internal constructor(private val holder: Any?) { } /** - * Returns the encapsulated exception if this instance represents failure or null if it is success + * Returns the encapsulated exception if this instance represents failure or `null` if it is success * or unsuccessful operation to closed channel. */ public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause @@ -389,13 +415,21 @@ internal constructor(private val holder: Any?) { override fun toString(): String = "Closed($cause)" } - internal companion object { - @Suppress("NOTHING_TO_INLINE") - internal inline fun <E> value(value: E): ChannelResult<E> = + @Suppress("NOTHING_TO_INLINE") + @InternalCoroutinesApi + public companion object { + private val failed = Failed() + + @InternalCoroutinesApi + public fun <E> success(value: E): ChannelResult<E> = ChannelResult(value) - @Suppress("NOTHING_TO_INLINE") - internal inline fun <E> closed(cause: Throwable?): ChannelResult<E> = + @InternalCoroutinesApi + public fun <E> failure(): ChannelResult<E> = + ChannelResult(failed) + + @InternalCoroutinesApi + public fun <E> closed(cause: Throwable?): ChannelResult<E> = ChannelResult(Closed(cause)) } diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt index f1d092e3..75524f21 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt @@ -229,12 +229,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> { /** * Sends the value to all subscribed receives and stores this value as the most recent state for - * future subscribers. This implementation always returns `true`. - * It throws exception if the channel [isClosedForSend] (see [close] for details). + * future subscribers. This implementation always returns either successful result + * or closed with an exception. */ - public override fun offer(element: E): Boolean { - offerInternal(element)?.let { throw it.sendException } - return true + public override fun trySend(element: E): ChannelResult<Unit> { + offerInternal(element)?.let { return ChannelResult.closed(it.sendException) } + return ChannelResult.success(Unit) } @Suppress("UNCHECKED_CAST") diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 6381c467..0f6ee3ac 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -237,7 +237,7 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F lastValue = null // Consume the value } } - // Should be receiveOrClosed when boxing issues are fixed + // Should be receiveCatching when boxing issues are fixed values.onReceiveOrNull { value -> if (value == null) { if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index e178f124..f1658cfa 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -142,7 +142,7 @@ class BasicOperationsTest : TestBase() { val result = channel.receiveCatching() assertEquals(1, result.getOrThrow()) assertEquals(1, result.getOrNull()) - assertTrue(ChannelResult.value(1) == result) + assertTrue(ChannelResult.success(1) == result) expect(3) launch { diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt index 4344b105..2341c62e 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt @@ -45,7 +45,7 @@ class ChannelReceiveCatchingTest : TestBase() { assertEquals(1, element.getOrThrow()) assertEquals(1, element.getOrNull()) assertEquals("Value(1)", element.toString()) - assertTrue(ChannelResult.value(1) == element) // Don't box + assertTrue(ChannelResult.success(1) == element) // Don't box assertFalse(element.isFailure) assertFalse(element.isClosed) @@ -54,7 +54,7 @@ class ChannelReceiveCatchingTest : TestBase() { assertNull(nullElement.getOrThrow()) assertNull(nullElement.getOrNull()) assertEquals("Value(null)", nullElement.toString()) - assertTrue(ChannelResult.value(null) == nullElement) // Don't box + assertTrue(ChannelResult.success(null) == nullElement) // Don't box assertFalse(element.isFailure) assertFalse(element.isClosed) @@ -113,7 +113,7 @@ class ChannelReceiveCatchingTest : TestBase() { fun testReceiveResultChannel() = runTest { val channel = Channel<ChannelResult<UInt>>() launch { - channel.send(ChannelResult.value(1u)) + channel.send(ChannelResult.success(1u)) channel.send(ChannelResult.closed(TestException1())) channel.close(TestException2()) } diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt index d28d912d..3e70007a 100644 --- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt +++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt @@ -44,9 +44,9 @@ private class ChannelViaBroadcast<E>( override suspend fun receive(): E = sub.receive() override suspend fun receiveOrNull(): E? = sub.receiveOrNull() override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching() - override fun poll(): E? = sub.poll() override fun iterator(): ChannelIterator<E> = sub.iterator() - + override fun tryReceive(): ChannelResult<E> = sub.tryReceive() + override fun cancel(cause: CancellationException?) = sub.cancel(cause) // implementing hidden method anyway, so can cast to an internal class diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt index 3a208c0f..5153cb47 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt @@ -168,6 +168,11 @@ private class LazyActorCoroutine<E>( return super.offer(element) } + override fun trySend(element: E): ChannelResult<Unit> { + start() + return super.trySend(element) + } + override fun close(cause: Throwable?): Boolean { // close the channel _first_ val closed = super.close(cause) diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt index 76713aa1..9f7ce497 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -99,4 +99,4 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { dReceivedCnt++ lastReceived = received } -}
\ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api index 961fdbe2..67986251 100644 --- a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api +++ b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api @@ -63,6 +63,7 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V public fun request (J)V public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object; } public final class kotlinx/coroutines/reactive/ReactiveFlowKt { diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 15620562..6bb02ef1 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -13,7 +13,7 @@ import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import org.reactivestreams.* import kotlin.coroutines.* -import kotlin.internal.LowPriorityInOverloadResolution +import kotlin.internal.* /** * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. @@ -96,10 +96,10 @@ public class PublisherCoroutine<in T>( override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing = throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false + override fun trySend(element: T): ChannelResult<Unit> { + if (!mutex.tryLock()) return ChannelResult.failure() doLockedNext(element) - return true + return ChannelResult.success(Unit) } public override suspend fun send(element: T) { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 4f4706d8..7813bc75 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -91,6 +91,12 @@ private class RxObservableCoroutine<T : Any>( return true } + override fun trySend(element: T): ChannelResult<Unit> { + if (!mutex.tryLock()) return ChannelResult.failure() + doLockedNext(element) + return ChannelResult.success(Unit) + } + public override suspend fun send(element: T) { // fast-path -- try send without suspension if (offer(element)) return diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index bd9239e7..fed89c32 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -71,10 +71,10 @@ private class RxObservableCoroutine<T: Any>( override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false + override fun trySend(element: T): ChannelResult<Unit> { + if (!mutex.tryLock()) return ChannelResult.failure() doLockedNext(element) - return true + return ChannelResult.success(Unit) } public override suspend fun send(element: T) { |