aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kotlinx-coroutines-core/api/kotlinx-coroutines-core.api21
-rw-r--r--kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt43
-rw-r--r--kotlinx-coroutines-core/common/src/channels/Channel.kt82
-rw-r--r--kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt10
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Delay.kt2
-rw-r--r--kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt2
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt6
-rw-r--r--kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/Actor.kt5
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt4
-rw-r--r--reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api1
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Publish.kt8
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxObservable.kt6
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxObservable.kt6
14 files changed, 143 insertions, 57 deletions
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 7d2fa1f4..062e466e 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -555,6 +555,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx
public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
+ public static fun poll (Lkotlinx/coroutines/channels/ActorScope;)Ljava/lang/Object;
}
public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
@@ -566,6 +567,7 @@ public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : k
public final class kotlinx/coroutines/channels/BroadcastChannel$DefaultImpls {
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
+ public static fun offer (Lkotlinx/coroutines/channels/BroadcastChannel;Ljava/lang/Object;)Z
}
public final class kotlinx/coroutines/channels/BroadcastChannelKt {
@@ -598,6 +600,8 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co
public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
+ public static fun offer (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Object;)Z
+ public static fun poll (Lkotlinx/coroutines/channels/Channel;)Ljava/lang/Object;
}
public final class kotlinx/coroutines/channels/Channel$Factory {
@@ -628,7 +632,7 @@ public final class kotlinx/coroutines/channels/ChannelKt {
public final class kotlinx/coroutines/channels/ChannelResult {
public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult;
- public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object;
+ public static fun constructor-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
@@ -645,6 +649,12 @@ public final class kotlinx/coroutines/channels/ChannelResult {
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}
+public final class kotlinx/coroutines/channels/ChannelResult$Companion {
+ public final fun closed-JP2dKIU (Ljava/lang/Throwable;)Ljava/lang/Object;
+ public final fun failure-PtdJZtk ()Ljava/lang/Object;
+ public final fun success-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
+}
+
public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
@@ -789,6 +799,7 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
public fun offer (Ljava/lang/Object;)Z
public fun openSubscription ()Lkotlinx/coroutines/channels/ReceiveChannel;
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
}
public final class kotlinx/coroutines/channels/ProduceKt {
@@ -804,6 +815,10 @@ public abstract interface class kotlinx/coroutines/channels/ProducerScope : kotl
public abstract fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
}
+public final class kotlinx/coroutines/channels/ProducerScope$DefaultImpls {
+ public static fun offer (Lkotlinx/coroutines/channels/ProducerScope;Ljava/lang/Object;)Z
+}
+
public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel ()V
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
@@ -818,12 +833,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public abstract fun tryReceive-PtdJZtk ()Ljava/lang/Object;
}
public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
+ public static fun poll (Lkotlinx/coroutines/channels/ReceiveChannel;)Ljava/lang/Object;
}
public abstract interface class kotlinx/coroutines/channels/SendChannel {
@@ -834,10 +851,12 @@ public abstract interface class kotlinx/coroutines/channels/SendChannel {
public abstract fun isFull ()Z
public abstract fun offer (Ljava/lang/Object;)Z
public abstract fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public abstract fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
}
public final class kotlinx/coroutines/channels/SendChannel$DefaultImpls {
public static synthetic fun close$default (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
+ public static fun offer (Lkotlinx/coroutines/channels/SendChannel;Ljava/lang/Object;)Z
}
public final class kotlinx/coroutines/channels/TickerChannelsKt {
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index 180598c9..82143b03 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -137,23 +137,42 @@ internal abstract class AbstractSendChannel<E>(
return sendSuspend(element)
}
- public final override fun offer(element: E): Boolean {
+ override fun offer(element: E): Boolean {
+ try {
+ return super.offer(element)
+ } catch (e: Throwable) {
+ onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
+ // If it crashes, add send exception as suppressed for better diagnostics
+ it.addSuppressed(e)
+ throw it
+ }
+ throw e
+ }
+ }
+
+ public final override fun trySend(element: E): ChannelResult<Unit> {
val result = offerInternal(element)
return when {
- result === OFFER_SUCCESS -> true
+ result === OFFER_SUCCESS -> ChannelResult.success(Unit)
result === OFFER_FAILED -> {
- // We should check for closed token on offer as well, otherwise offer won't be linearizable
+ // We should check for closed token on trySend as well, otherwise trySend won't be linearizable
// in the face of concurrent close()
// See https://github.com/Kotlin/kotlinx.coroutines/issues/359
- throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
+ val closedForSend = closedForSend ?: return ChannelResult.failure()
+ ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
}
result is Closed<*> -> {
- throw recoverStackTrace(helpCloseAndGetSendException(element, result))
+ ChannelResult.closed(helpCloseAndGetSendException(result))
}
- else -> error("offerInternal returned $result")
+ else -> error("trySend returned $result")
}
}
+ private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
+ helpClose(closed)
+ return closed.sendException
+ }
+
private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
// To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
@@ -632,9 +651,11 @@ internal abstract class AbstractChannel<E>(
}
@Suppress("UNCHECKED_CAST")
- public final override fun poll(): E? {
+ public final override fun tryReceive(): ChannelResult<E> {
val result = pollInternal()
- return if (result === POLL_FAILED) null else receiveOrNullResult(result)
+ if (result === POLL_FAILED) return ChannelResult.failure()
+ if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
+ return ChannelResult.success(result as E)
}
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
@@ -905,7 +926,7 @@ internal abstract class AbstractChannel<E>(
@JvmField val receiveMode: Int
) : Receive<E>() {
fun resumeValue(value: E): Any? = when (receiveMode) {
- RECEIVE_RESULT -> ChannelResult.value(value)
+ RECEIVE_RESULT -> ChannelResult.success(value)
else -> value
}
@@ -990,7 +1011,7 @@ internal abstract class AbstractChannel<E>(
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
- if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value,
+ if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
@@ -1144,7 +1165,7 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <E> Any?.toResult(): ChannelResult<E> =
- if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E)
+ if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
@Suppress("NOTHING_TO_INLINE")
private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index 824a57bf..61d06d83 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -85,7 +85,22 @@ public interface SendChannel<in E> {
* then it calls `onUndeliveredElement` before throwing an exception.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*/
- public fun offer(element: E): Boolean
+ public fun offer(element: E): Boolean {
+ val result = trySend(element)
+ if (result.isSuccess) return true
+ throw recoverStackTrace(result.exceptionOrNull() ?: return false)
+ }
+
+ /**
+ * Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
+ * and returns the successful result. Otherwise, returns failed or closed result.
+ * This is synchronous variant of [send], which backs off in situations when `send` suspends or throws.
+ *
+ * When `trySend` call returns a non-successful result, it guarantees that the element was not delivered to the consumer, and
+ * it does not call `onUndeliveredElement` that was installed for this channel.
+ * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
+ */
+ public fun trySend(element: E): ChannelResult<Unit>
/**
* Closes this channel.
@@ -218,7 +233,7 @@ public interface ReceiveChannel<out E> {
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated(
- message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
+ message = "Deprecated in favor of receiveCatching and receiveOrNull extension",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull")
)
@@ -230,13 +245,13 @@ public interface ReceiveChannel<out E> {
* [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
- * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension.
+ * @suppress **Deprecated**: in favor of receiveCatching and onReceiveOrNull extension.
*/
@ObsoleteCoroutinesApi
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated(
- message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
+ message = "Deprecated in favor of receiveCatching and onReceiveOrNull extension",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull")
)
@@ -251,7 +266,7 @@ public interface ReceiveChannel<out E> {
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
- * suspended, it will not resume successfully. The `receiveOrClosed` call can retrieve the element from the channel,
+ * suspended, it will not resume successfully. The `receiveCatching` call can retrieve the element from the channel,
* but then throw [CancellationException], thus failing to deliver the element.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*
@@ -271,11 +286,22 @@ public interface ReceiveChannel<out E> {
public val onReceiveCatching: SelectClause1<ChannelResult<E>>
/**
- * Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
+ * Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty
* or is [is closed for `receive`][isClosedForReceive] without a cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
- public fun poll(): E?
+ public fun poll(): E? {
+ val result = tryReceive()
+ if (result.isSuccess) return result.getOrThrow()
+ throw recoverStackTrace(result.exceptionOrNull() ?: return null)
+ }
+
+ /**
+ * Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
+ * result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
+ * result if the channel is closed.
+ */
+ public fun tryReceive(): ChannelResult<E>
/**
* Returns a new iterator to receive elements from this channel using a `for` loop.
@@ -315,35 +341,35 @@ public interface ReceiveChannel<out E> {
/**
* A discriminated union of channel operation result.
- * It encapsulates successful or failed result of a channel operation, or a failed operation to a closed channel with
+ * It encapsulates the successful or failed result of a channel operation or a failed operation to a closed channel with
* an optional cause.
*
- * Successful result represents a successful operation with value of type [T], for example, result of [Channel.receiveCatching]
- * operation or a successfully sent element as a result of [Channel.trySend].
+ * The successful result represents a successful operation with a value of type [T], for example,
+ * the result of [Channel.receiveCatching] operation or a successfully sent element as a result of [Channel.trySend].
*
- * Failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
+ * The failed result represents a failed operation attempt to a channel, but it doesn't necessary indicate that the channel is failed.
* E.g. when the channel is full, [Channel.trySend] returns failed result, but the channel itself is not in the failed state.
*
- * Closed result represents an operation attempt to a closed channel and also implies that the operation was failed.
+ * The closed result represents an operation attempt to a closed channel and also implies that the operation has failed.
*/
@Suppress("UNCHECKED_CAST")
public inline class ChannelResult<out T>
-internal constructor(private val holder: Any?) {
+@PublishedApi internal constructor(private val holder: Any?) {
/**
* Returns `true` if this instance represents a successful
* operation outcome.
*
- * In this case [isFailure] and [isClosed] return false.
+ * In this case [isFailure] and [isClosed] return `false`.
*/
- public val isSuccess: Boolean get() = holder !is Closed
+ public val isSuccess: Boolean get() = holder !is Failed
/**
- * Returns true if this instance represents unsuccessful operation.
+ * Returns `true` if this instance represents unsuccessful operation.
*
* In this case [isSuccess] returns false, but it does not imply
* that the channel is failed or closed.
*
- * Example of failed operation without an exception and channel being closed
+ * Example of a failed operation without an exception and channel being closed
* is [Channel.trySend] attempt to a channel that is full.
*/
public val isFailure: Boolean get() = holder is Failed
@@ -352,7 +378,7 @@ internal constructor(private val holder: Any?) {
* Returns `true` if this instance represents unsuccessful operation
* to a closed or cancelled channel.
*
- * In this case [isSuccess] returns false, [isFailure] returns `true`, but it does not imply
+ * In this case [isSuccess] returns `false`, [isFailure] returns `true`, but it does not imply
* that [exceptionOrNull] returns non-null value.
*
* It can happen if the channel was [closed][Channel.close] normally without an exception.
@@ -374,7 +400,7 @@ internal constructor(private val holder: Any?) {
}
/**
- * Returns the encapsulated exception if this instance represents failure or null if it is success
+ * Returns the encapsulated exception if this instance represents failure or `null` if it is success
* or unsuccessful operation to closed channel.
*/
public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause
@@ -389,13 +415,21 @@ internal constructor(private val holder: Any?) {
override fun toString(): String = "Closed($cause)"
}
- internal companion object {
- @Suppress("NOTHING_TO_INLINE")
- internal inline fun <E> value(value: E): ChannelResult<E> =
+ @Suppress("NOTHING_TO_INLINE")
+ @InternalCoroutinesApi
+ public companion object {
+ private val failed = Failed()
+
+ @InternalCoroutinesApi
+ public fun <E> success(value: E): ChannelResult<E> =
ChannelResult(value)
- @Suppress("NOTHING_TO_INLINE")
- internal inline fun <E> closed(cause: Throwable?): ChannelResult<E> =
+ @InternalCoroutinesApi
+ public fun <E> failure(): ChannelResult<E> =
+ ChannelResult(failed)
+
+ @InternalCoroutinesApi
+ public fun <E> closed(cause: Throwable?): ChannelResult<E> =
ChannelResult(Closed(cause))
}
diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
index f1d092e3..75524f21 100644
--- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt
@@ -229,12 +229,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
- * future subscribers. This implementation always returns `true`.
- * It throws exception if the channel [isClosedForSend] (see [close] for details).
+ * future subscribers. This implementation always returns either successful result
+ * or closed with an exception.
*/
- public override fun offer(element: E): Boolean {
- offerInternal(element)?.let { throw it.sendException }
- return true
+ public override fun trySend(element: E): ChannelResult<Unit> {
+ offerInternal(element)?.let { return ChannelResult.closed(it.sendException) }
+ return ChannelResult.success(Unit)
}
@Suppress("UNCHECKED_CAST")
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index 6381c467..0f6ee3ac 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -237,7 +237,7 @@ private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : F
lastValue = null // Consume the value
}
}
- // Should be receiveOrClosed when boxing issues are fixed
+ // Should be receiveCatching when boxing issues are fixed
values.onReceiveOrNull { value ->
if (value == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
index e178f124..f1658cfa 100644
--- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
@@ -142,7 +142,7 @@ class BasicOperationsTest : TestBase() {
val result = channel.receiveCatching()
assertEquals(1, result.getOrThrow())
assertEquals(1, result.getOrNull())
- assertTrue(ChannelResult.value(1) == result)
+ assertTrue(ChannelResult.success(1) == result)
expect(3)
launch {
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt
index 4344b105..2341c62e 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt
@@ -45,7 +45,7 @@ class ChannelReceiveCatchingTest : TestBase() {
assertEquals(1, element.getOrThrow())
assertEquals(1, element.getOrNull())
assertEquals("Value(1)", element.toString())
- assertTrue(ChannelResult.value(1) == element) // Don't box
+ assertTrue(ChannelResult.success(1) == element) // Don't box
assertFalse(element.isFailure)
assertFalse(element.isClosed)
@@ -54,7 +54,7 @@ class ChannelReceiveCatchingTest : TestBase() {
assertNull(nullElement.getOrThrow())
assertNull(nullElement.getOrNull())
assertEquals("Value(null)", nullElement.toString())
- assertTrue(ChannelResult.value(null) == nullElement) // Don't box
+ assertTrue(ChannelResult.success(null) == nullElement) // Don't box
assertFalse(element.isFailure)
assertFalse(element.isClosed)
@@ -113,7 +113,7 @@ class ChannelReceiveCatchingTest : TestBase() {
fun testReceiveResultChannel() = runTest {
val channel = Channel<ChannelResult<UInt>>()
launch {
- channel.send(ChannelResult.value(1u))
+ channel.send(ChannelResult.success(1u))
channel.send(ChannelResult.closed(TestException1()))
channel.close(TestException2())
}
diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
index d28d912d..3e70007a 100644
--- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
+++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
@@ -44,9 +44,9 @@ private class ChannelViaBroadcast<E>(
override suspend fun receive(): E = sub.receive()
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching()
- override fun poll(): E? = sub.poll()
override fun iterator(): ChannelIterator<E> = sub.iterator()
-
+ override fun tryReceive(): ChannelResult<E> = sub.tryReceive()
+
override fun cancel(cause: CancellationException?) = sub.cancel(cause)
// implementing hidden method anyway, so can cast to an internal class
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
index 3a208c0f..5153cb47 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
@@ -168,6 +168,11 @@ private class LazyActorCoroutine<E>(
return super.offer(element)
}
+ override fun trySend(element: E): ChannelResult<Unit> {
+ start()
+ return super.trySend(element)
+ }
+
override fun close(cause: Throwable?): Boolean {
// close the channel _first_
val closed = super.close(cause)
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
index 76713aa1..9f7ce497 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -99,4 +99,4 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
dReceivedCnt++
lastReceived = received
}
-} \ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api
index 961fdbe2..67986251 100644
--- a/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api
+++ b/reactive/kotlinx-coroutines-reactive/api/kotlinx-coroutines-reactive.api
@@ -63,6 +63,7 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public fun request (J)V
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public fun trySend-JP2dKIU (Ljava/lang/Object;)Ljava/lang/Object;
}
public final class kotlinx/coroutines/reactive/ReactiveFlowKt {
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 15620562..6bb02ef1 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -13,7 +13,7 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
import kotlin.coroutines.*
-import kotlin.internal.LowPriorityInOverloadResolution
+import kotlin.internal.*
/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
@@ -96,10 +96,10 @@ public class PublisherCoroutine<in T>(
override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
+ override fun trySend(element: T): ChannelResult<Unit> {
+ if (!mutex.tryLock()) return ChannelResult.failure()
doLockedNext(element)
- return true
+ return ChannelResult.success(Unit)
}
public override suspend fun send(element: T) {
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index 4f4706d8..7813bc75 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -91,6 +91,12 @@ private class RxObservableCoroutine<T : Any>(
return true
}
+ override fun trySend(element: T): ChannelResult<Unit> {
+ if (!mutex.tryLock()) return ChannelResult.failure()
+ doLockedNext(element)
+ return ChannelResult.success(Unit)
+ }
+
public override suspend fun send(element: T) {
// fast-path -- try send without suspension
if (offer(element)) return
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
index bd9239e7..fed89c32 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
@@ -71,10 +71,10 @@ private class RxObservableCoroutine<T: Any>(
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
+ override fun trySend(element: T): ChannelResult<Unit> {
+ if (!mutex.tryLock()) return ChannelResult.failure()
doLockedNext(element)
- return true
+ return ChannelResult.success(Unit)
}
public override suspend fun send(element: T) {