aboutsummaryrefslogtreecommitdiff
path: root/reactive/kotlinx-coroutines-reactive/src/Publish.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/Publish.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Publish.kt29
1 files changed, 14 insertions, 15 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 0e3a7b8c..704b7142 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -17,18 +17,15 @@ import kotlin.internal.LowPriorityInOverloadResolution
/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
- * Every time the returned publisher is subscribed, it starts a new coroutine.
- * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
+ * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
+ * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
- * | **Coroutine action** | **Signal to subscriber**
- * | -------------------------------------------- | ------------------------
- * | `send` | `onNext`
- * | Normal completion or `close` without cause | `onComplete`
- * | Failure with exception or `close` with cause | `onError`
- *
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -43,7 +40,7 @@ public fun <T> publish(
): Publisher<T> {
require(context[Job] === null) { "Publisher context cannot contain job in it." +
"Its lifecycle should be managed via subscription. Had $context" }
- return publishInternal(GlobalScope, context, block)
+ return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
}
@Deprecated(
@@ -55,37 +52,39 @@ public fun <T> publish(
public fun <T> CoroutineScope.publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Publisher<T> = publishInternal(this, context, block)
+): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block)
/** @suppress For internal use from other reactive integration modules only */
@InternalCoroutinesApi
public fun <T> publishInternal(
scope: CoroutineScope, // support for legacy publish in scope
context: CoroutineContext,
+ exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
val newContext = scope.newCoroutineContext(context)
- val coroutine = PublisherCoroutine(newContext, subscriber)
+ val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
+private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
@InternalCoroutinesApi
public class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
- private val subscriber: Subscriber<T>
+ private val subscriber: Subscriber<T>,
+ private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
private val mutex = Mutex(locked = true)
-
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
@Volatile
@@ -198,7 +197,7 @@ public class PublisherCoroutine<in T>(
// Specification requires that after cancellation requested we don't call onXXX
if (cancelled) {
// If the parent had failed to handle our exception, then we must not lose this exception
- if (cause != null && !handled) handleCoroutineException(context, cause)
+ if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
return
}
@@ -217,7 +216,7 @@ public class PublisherCoroutine<in T>(
*/
subscriber.onError(cause)
if (!handled && cause.isFatal()) {
- handleCoroutineException(context, cause)
+ exceptionOnCancelHandler(cause, context)
}
} else {
subscriber.onComplete()