diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/Publish.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/Publish.kt | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 0e3a7b8c..704b7142 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -17,18 +17,15 @@ import kotlin.internal.LowPriorityInOverloadResolution /** * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. - * Every time the returned publisher is subscribed, it starts a new coroutine. - * Coroutine emits items with `send`. Unsubscribing cancels running coroutine. + * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. + * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) + * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) + * if coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels running coroutine. * * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that * `onNext` is not invoked concurrently. * - * | **Coroutine action** | **Signal to subscriber** - * | -------------------------------------------- | ------------------------ - * | `send` | `onNext` - * | Normal completion or `close` without cause | `onComplete` - * | Failure with exception or `close` with cause | `onError` - * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. @@ -43,7 +40,7 @@ public fun <T> publish( ): Publisher<T> { require(context[Job] === null) { "Publisher context cannot contain job in it." + "Its lifecycle should be managed via subscription. Had $context" } - return publishInternal(GlobalScope, context, block) + return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block) } @Deprecated( @@ -55,37 +52,39 @@ public fun <T> publish( public fun <T> CoroutineScope.publish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Publisher<T> = publishInternal(this, context, block) +): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block) /** @suppress For internal use from other reactive integration modules only */ @InternalCoroutinesApi public fun <T> publishInternal( scope: CoroutineScope, // support for legacy publish in scope context: CoroutineContext, + exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit, block: suspend ProducerScope<T>.() -> Unit ): Publisher<T> = Publisher { subscriber -> // specification requires NPE on null subscriber if (subscriber == null) throw NullPointerException("Subscriber cannot be null") val newContext = scope.newCoroutineContext(context) - val coroutine = PublisherCoroutine(newContext, subscriber) + val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) } private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError +private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) } @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE") @InternalCoroutinesApi public class PublisherCoroutine<in T>( parentContext: CoroutineContext, - private val subscriber: Subscriber<T> + private val subscriber: Subscriber<T>, + private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit ) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> { override val channel: SendChannel<T> get() = this // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked private val mutex = Mutex(locked = true) - private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED) @Volatile @@ -198,7 +197,7 @@ public class PublisherCoroutine<in T>( // Specification requires that after cancellation requested we don't call onXXX if (cancelled) { // If the parent had failed to handle our exception, then we must not lose this exception - if (cause != null && !handled) handleCoroutineException(context, cause) + if (cause != null && !handled) exceptionOnCancelHandler(cause, context) return } @@ -217,7 +216,7 @@ public class PublisherCoroutine<in T>( */ subscriber.onError(cause) if (!handled && cause.isFatal()) { - handleCoroutineException(context, cause) + exceptionOnCancelHandler(cause, context) } } else { subscriber.onComplete() |