aboutsummaryrefslogtreecommitdiff
path: root/reactive
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2019-12-05 19:33:44 +0300
committerVsevolod Tolstopyatov <qwwdfsad@gmail.com>2019-12-05 08:50:34 -0800
commita930b0cd1ad128c5782c697ba53cd514370f8042 (patch)
treee4fdd2197dae9edf7641e55f7bd4d4af4c7fdcfa /reactive
parent6c98c192b96a42242a0c82d9142f70a69217d074 (diff)
downloadkotlinx.coroutines-a930b0cd1ad128c5782c697ba53cd514370f8042.tar.gz
Consistently handle undeliverable exceptions in RxJava and Reactor integrations
Use tryOnError in RxJava to make exception delivery check-and-act race free. Deliver undeliverable exceptions via RxJavaPlugins instead of handleCoroutineException. This is a deliberate choice for a multiple reasons: * When using Rx (whether with coroutines or not), undeliverable exceptions are inevitable and users should hook into RxJavaPlugins anyway. We don't want to force them using Rx-specific CoroutineExceptionHandler all over the place * Undeliverable exceptions provide additional helpful stacktrace and proper way to distinguish them from other unhandled exceptions * Be consistent with reactor where we don't have try*, thus cannot provide a completely consistent experience with CEH (at least, without wrapping all the subscribers)\ Do the similar in Reactor integration, but without try*, Reactor does not have notion of undeliverable exceoptions and handles them via Operators.* on its own. Also, get rid of ASCII tables that are not properly render in IDEA Fixes #252 Fixes #1614
Diffstat (limited to 'reactive')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Publish.kt29
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/Flux.kt35
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/Mono.kt31
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/FluxTest.kt12
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/MonoTest.kt43
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt11
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt24
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxConvert.kt10
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt18
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt34
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxObservable.kt22
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt3
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxSingle.kt26
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/Check.kt11
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt53
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt29
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt58
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt63
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt29
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt33
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/SingleTest.kt57
21 files changed, 412 insertions, 219 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 0e3a7b8c..704b7142 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -17,18 +17,15 @@ import kotlin.internal.LowPriorityInOverloadResolution
/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
- * Every time the returned publisher is subscribed, it starts a new coroutine.
- * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
+ * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
+ * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
- * | **Coroutine action** | **Signal to subscriber**
- * | -------------------------------------------- | ------------------------
- * | `send` | `onNext`
- * | Normal completion or `close` without cause | `onComplete`
- * | Failure with exception or `close` with cause | `onError`
- *
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -43,7 +40,7 @@ public fun <T> publish(
): Publisher<T> {
require(context[Job] === null) { "Publisher context cannot contain job in it." +
"Its lifecycle should be managed via subscription. Had $context" }
- return publishInternal(GlobalScope, context, block)
+ return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
}
@Deprecated(
@@ -55,37 +52,39 @@ public fun <T> publish(
public fun <T> CoroutineScope.publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Publisher<T> = publishInternal(this, context, block)
+): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block)
/** @suppress For internal use from other reactive integration modules only */
@InternalCoroutinesApi
public fun <T> publishInternal(
scope: CoroutineScope, // support for legacy publish in scope
context: CoroutineContext,
+ exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit,
block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
val newContext = scope.newCoroutineContext(context)
- val coroutine = PublisherCoroutine(newContext, subscriber)
+ val coroutine = PublisherCoroutine(newContext, subscriber, exceptionOnCancelHandler)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
private const val CLOSED = -1L // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
+private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
@InternalCoroutinesApi
public class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
- private val subscriber: Subscriber<T>
+ private val subscriber: Subscriber<T>,
+ private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
private val mutex = Mutex(locked = true)
-
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
@Volatile
@@ -198,7 +197,7 @@ public class PublisherCoroutine<in T>(
// Specification requires that after cancellation requested we don't call onXXX
if (cancelled) {
// If the parent had failed to handle our exception, then we must not lose this exception
- if (cause != null && !handled) handleCoroutineException(context, cause)
+ if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
return
}
@@ -217,7 +216,7 @@ public class PublisherCoroutine<in T>(
*/
subscriber.onError(cause)
if (!handled && cause.isFatal()) {
- handleCoroutineException(context, cause)
+ exceptionOnCancelHandler(cause, context)
}
} else {
subscriber.onComplete()
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 389428df..b6cc1615 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -10,29 +10,24 @@ package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
-import org.reactivestreams.Publisher
-import reactor.core.CoreSubscriber
+import org.reactivestreams.*
+import reactor.core.*
import reactor.core.publisher.*
+import reactor.util.context.*
import kotlin.coroutines.*
-import kotlin.internal.LowPriorityInOverloadResolution
+import kotlin.internal.*
/**
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
- * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
- *
- * Coroutine context can be specified with [context] argument.
- * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
- * | **Coroutine action** | **Signal to subscriber**
- * | -------------------------------------------- | ------------------------
- * | `send` | `onNext`
- * | Normal completion or `close` without cause | `onComplete`
- * | Failure with exception or `close` with cause | `onError`
- *
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
@@ -71,7 +66,17 @@ private fun <T> reactorPublish(
val currentContext = subscriber.currentContext()
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
val newContext = scope.newCoroutineContext(context + reactorContext)
- val coroutine = PublisherCoroutine(newContext, subscriber)
+ val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
-} \ No newline at end of file
+}
+
+private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx ->
+ if (e !is CancellationException) {
+ try {
+ Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty())
+ } catch (e: Throwable) {
+ handleCoroutineException(ctx, e)
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
index 76f0418e..415932dd 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt
@@ -13,15 +13,10 @@ import kotlin.coroutines.*
import kotlin.internal.*
/**
- * Creates cold [mono][Mono] that will run a given [block] in a coroutine.
+ * Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result.
* Every time the returned mono is subscribed, it starts a new coroutine.
- * Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
- *
- * | **Coroutine action** | **Signal to sink**
- * | ------------------------------------- | ------------------------
- * | Returns a non-null value | `success(value)`
- * | Returns a null | `success`
- * | Failure with exception or unsubscribe | `error`
+ * If [block] result is `null`, [MonoSink.success] is invoked without a value.
+ * Unsubscribing cancels running coroutine.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
@@ -64,18 +59,24 @@ private class MonoCoroutine<in T>(
parentContext: CoroutineContext,
private val sink: MonoSink<T>
) : AbstractCoroutine<T>(parentContext, true), Disposable {
- var disposed = false
+ @Volatile
+ private var disposed = false
override fun onCompleted(value: T) {
- if (!disposed) {
- if (value == null) sink.success() else sink.success(value)
- }
+ if (value == null) sink.success() else sink.success(value)
}
override fun onCancelled(cause: Throwable, handled: Boolean) {
- if (!disposed) {
- sink.error(cause)
- } else if (!handled) {
+ try {
+ /*
+ * sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op.
+ * Guard potentially non-empty handlers against meaningless cancellation exceptions
+ */
+ if (getCancellationException() !== cause) {
+ sink.error(cause)
+ }
+ } catch (e: Throwable) {
+ // In case of improper error implementation or fatal exceptions
handleCoroutineException(context, cause)
}
}
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
index ee26455e..2562c9d3 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
@@ -130,4 +131,15 @@ class FluxTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { flux<Int>(Job()) { } }
}
+
+ @Test
+ fun testLeakedException() = runBlocking {
+ // Test exception is not reported to global handler
+ val flow = flux<Unit> { throw TestException() }.asFlow()
+ repeat(2000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect { }
+ }
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
index 2283d45a..223ba7be 100644
--- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
@@ -5,13 +5,17 @@
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
import org.reactivestreams.*
import reactor.core.publisher.*
+import reactor.util.context.*
+import java.time.*
import java.time.Duration.*
+import java.util.function.*
class MonoTest : TestBase() {
@Before
@@ -217,11 +221,13 @@ class MonoTest : TestBase() {
fun testUnhandledException() = runTest {
expect(1)
var subscription: Subscription? = null
- val mono = mono(currentDispatcher() + CoroutineExceptionHandler { _, t ->
+ val handler = BiFunction<Throwable, Any?, Throwable> { t, _ ->
assertTrue(t is TestException)
expect(5)
+ t
+ }
- }) {
+ val mono = mono(currentDispatcher()) {
expect(4)
subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled
try {
@@ -229,7 +235,7 @@ class MonoTest : TestBase() {
} finally {
throw TestException() // would not be able to handle it since mono is disposed
}
- }
+ }.subscriberContext { Context.of("reactor.onOperatorError.local", handler) }
mono.subscribe(object : Subscriber<Unit> {
override fun onSubscribe(s: Subscription) {
expect(2)
@@ -248,4 +254,35 @@ class MonoTest : TestBase() {
fun testIllegalArgumentException() {
assertFailsWith<IllegalArgumentException> { mono(Job()) { } }
}
+
+ @Test
+ fun testExceptionAfterCancellation() = runTest {
+ // Test exception is not reported to global handler
+ Flux
+ .interval(ofMillis(1))
+ .switchMap {
+ mono(coroutineContext) {
+ timeBomb().awaitFirst()
+ }
+ }
+ .onErrorReturn({
+ expect(1)
+ true
+ }, 42)
+ .blockLast()
+ finish(2)
+ }
+
+ private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
+
+ @Test
+ fun testLeakedException() = runBlocking {
+ // Test exception is not reported to global handler
+ val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect { }
+ }
+ }
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
index d76f1231..a0c32f9f 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
@@ -5,10 +5,21 @@
package kotlinx.coroutines.rx2
import io.reactivex.functions.*
+import io.reactivex.plugins.*
import kotlinx.coroutines.*
+import kotlin.coroutines.*
internal class RxCancellable(private val job: Job) : Cancellable {
override fun cancel() {
job.cancel()
}
+}
+
+internal fun handleUndeliverableException(cause: Throwable, context: CoroutineContext) {
+ if (cause is CancellationException) return // Async CE should be completely ignored
+ try {
+ RxJavaPlugins.onError(cause)
+ } catch (e: Throwable) {
+ handleCoroutineException(context, cause)
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
index c59b4bd6..ab96844c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
@@ -12,15 +12,9 @@ import kotlin.coroutines.*
import kotlin.internal.*
/**
- * Creates cold [Completable] that runs a given [block] in a coroutine.
+ * Creates cold [Completable] that runs a given [block] in a coroutine and emits its result.
* Every time the returned completable is subscribed, it starts a new coroutine.
* Unsubscribing cancels running coroutine.
- *
- * | **Coroutine action** | **Signal to subscriber**
- * | ------------------------------------- | ------------------------
- * | Completes successfully | `onCompleted`
- * | Failure with exception or unsubscribe | `onError`
- *
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -62,21 +56,19 @@ private class RxCompletableCoroutine(
) : AbstractCoroutine<Unit>(parentContext, true) {
override fun onCompleted(value: Unit) {
try {
- if (!subscriber.isDisposed) subscriber.onComplete()
+ subscriber.onComplete()
} catch (e: Throwable) {
- handleCoroutineException(context, e)
+ handleUndeliverableException(e, context)
}
}
override fun onCancelled(cause: Throwable, handled: Boolean) {
- if (!subscriber.isDisposed) {
- try {
- subscriber.onError(cause)
- } catch (e: Throwable) {
- handleCoroutineException(context, e)
+ try {
+ if (!subscriber.tryOnError(cause)) {
+ handleUndeliverableException(cause, context)
}
- } else if (!handled) {
- handleCoroutineException(context, cause)
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
}
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 8df6caee..bd369cad 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -94,9 +94,13 @@ public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create {
emitter.onComplete()
} catch (e: Throwable) {
// 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
- if (e !is CancellationException) emitter.onError(e)
- else emitter.onComplete()
-
+ if (e !is CancellationException) {
+ if (!emitter.tryOnError(e)) {
+ handleUndeliverableException(e, coroutineContext)
+ }
+ } else {
+ emitter.onComplete()
+ }
}
}
emitter.setCancellable(RxCancellable(job))
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
index 30a1ed7e..7924a3f1 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
@@ -16,17 +16,15 @@ import kotlin.internal.*
/**
* Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
* Every time the returned flowable is subscribed, it starts a new coroutine.
- * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
+ *
+ * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
- * | **Coroutine action** | **Signal to subscriber**
- * | -------------------------------------------- | ------------------------
- * | `send` | `onNext`
- * | Normal completion or `close` without cause | `onComplete`
- * | Failure with exception or `close` with cause | `onError`
- *
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -40,7 +38,7 @@ public fun <T: Any> rxFlowable(
): Flowable<T> {
require(context[Job] === null) { "Flowable context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
- return Flowable.fromPublisher(publishInternal(GlobalScope, context, block))
+ return Flowable.fromPublisher(publishInternal(GlobalScope, context, RX_HANDLER, block))
}
@Deprecated(
@@ -52,4 +50,6 @@ public fun <T: Any> rxFlowable(
public fun <T: Any> CoroutineScope.rxFlowable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Flowable<T> = Flowable.fromPublisher(publishInternal(this, context, block))
+): Flowable<T> = Flowable.fromPublisher(publishInternal(this, context, RX_HANDLER, block))
+
+private val RX_HANDLER: (Throwable, CoroutineContext) -> Unit = ::handleUndeliverableException \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
index 9f176e93..9fb5f650 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
@@ -12,16 +12,10 @@ import kotlin.coroutines.*
import kotlin.internal.*
/**
- * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine.
+ * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result.
+ * If [block] result is `null`, [onComplete][MaybeObserver.onComplete] is invoked without a value.
* Every time the returned observable is subscribed, it starts a new coroutine.
- * Coroutine returns a single, possibly null value. Unsubscribing cancels running coroutine.
- *
- * | **Coroutine action** | **Signal to subscriber**
- * | ------------------------------------- | ------------------------
- * | Returns a non-null value | `onSuccess`
- * | Returns a null | `onComplete`
- * | Failure with exception or unsubscribe | `onError`
- *
+ * Unsubscribing cancels running coroutine.
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -62,24 +56,20 @@ private class RxMaybeCoroutine<T>(
private val subscriber: MaybeEmitter<T>
) : AbstractCoroutine<T>(parentContext, true) {
override fun onCompleted(value: T) {
- if (!subscriber.isDisposed) {
- try {
- if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
- } catch(e: Throwable) {
- handleCoroutineException(context, e)
- }
+ try {
+ if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
}
}
override fun onCancelled(cause: Throwable, handled: Boolean) {
- if (!subscriber.isDisposed) {
- try {
- subscriber.onError(cause)
- } catch (e: Throwable) {
- handleCoroutineException(context, e)
+ try {
+ if (!subscriber.tryOnError(cause)) {
+ handleUndeliverableException(cause, context)
}
- } else if (!handled) {
- handleCoroutineException(context, cause)
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
}
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index 6ccf0f0b..b8de66df 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -19,16 +19,14 @@ import kotlin.internal.*
/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
* Every time the returned observable is subscribed, it starts a new coroutine.
- * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
*
- * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
- * Note that Rx 2.x [Observable] **does not support backpressure**. Use [rxFlowable].
+ * Coroutine emits ([ObservableEmitter.onNext]) values with `send`, completes ([ObservableEmitter.onComplete])
+ * when the coroutine completes or channel is explicitly closed and emits error ([ObservableEmitter.onError])
+ * if coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels running coroutine.
*
- * | **Coroutine action** | **Signal to subscriber**
- * | -------------------------------------------- | ------------------------
- * | `send` | `onNext`
- * | Normal completion or `close` without cause | `onComplete`
- * | Failure with exception or `close` with cause | `onError`
+ * Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
+ * Note that Rx 2.x [Observable] **does not support backpressure**.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
@@ -170,9 +168,9 @@ private class RxObservableCoroutine<T: Any>(
* by coroutines machinery, anyway, they should not be present in regular program flow,
* thus our goal here is just to expose it as soon as possible.
*/
- subscriber.onError(cause)
+ subscriber.tryOnError(cause)
if (!handled && cause.isFatal()) {
- handleCoroutineException(context, cause)
+ handleUndeliverableException(cause, context)
}
}
else {
@@ -180,7 +178,7 @@ private class RxObservableCoroutine<T: Any>(
}
} catch (e: Throwable) {
// Unhandled exception (cannot handle in other way, since we are already complete)
- handleCoroutineException(context, e)
+ handleUndeliverableException(e, context)
}
}
} finally {
@@ -208,4 +206,4 @@ internal fun Throwable.isFatal() = try {
false
} catch (e: Throwable) {
true
-} \ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
index 53fbaf65..610a5bcd 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx2
@@ -17,7 +17,6 @@ public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = Sch
/**
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
- * @param scheduler a scheduler.
*/
public class SchedulerCoroutineDispatcher(
/**
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
index f3573ee6..07088909 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
@@ -12,15 +12,9 @@ import kotlin.coroutines.*
import kotlin.internal.*
/**
- * Creates cold [single][Single] that will run a given [block] in a coroutine.
+ * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
* Every time the returned observable is subscribed, it starts a new coroutine.
- * Coroutine returns a single value. Unsubscribing cancels running coroutine.
- *
- * | **Coroutine action** | **Signal to subscriber**
- * | ------------------------------------- | ------------------------
- * | Returns a value | `onSuccess`
- * | Failure with exception or unsubscribe | `onError`
- *
+ * Unsubscribing cancels running coroutine.
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
@@ -62,21 +56,19 @@ private class RxSingleCoroutine<T: Any>(
) : AbstractCoroutine<T>(parentContext, true) {
override fun onCompleted(value: T) {
try {
- if (!subscriber.isDisposed) subscriber.onSuccess(value)
+ subscriber.onSuccess(value)
} catch (e: Throwable) {
- handleCoroutineException(context, e)
+ handleUndeliverableException(e, context)
}
}
override fun onCancelled(cause: Throwable, handled: Boolean) {
- if (!subscriber.isDisposed) {
- try {
- subscriber.onError(cause)
- } catch (e: Throwable) {
- handleCoroutineException(context, e)
+ try {
+ if (!subscriber.tryOnError(cause)) {
+ handleUndeliverableException(cause, context)
}
- } else if (!handled) {
- handleCoroutineException(context, cause)
+ } catch (e: Throwable) {
+ handleUndeliverableException(e, context)
}
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/Check.kt b/reactive/kotlinx-coroutines-rx2/test/Check.kt
index 29eda6fa..beb2c43a 100644
--- a/reactive/kotlinx-coroutines-rx2/test/Check.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/Check.kt
@@ -5,6 +5,8 @@
package kotlinx.coroutines.rx2
import io.reactivex.*
+import io.reactivex.functions.Consumer
+import io.reactivex.plugins.*
fun <T> checkSingleValue(
observable: Observable<T>,
@@ -64,3 +66,12 @@ fun checkErroneous(
}
}
+inline fun withExceptionHandler(noinline handler: (Throwable) -> Unit, block: () -> Unit) {
+ val original = RxJavaPlugins.getErrorHandler()
+ RxJavaPlugins.setErrorHandler { handler(it) }
+ try {
+ block()
+ } finally {
+ RxJavaPlugins.setErrorHandler(original)
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
index 9a12bafb..04e86975 100644
--- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import io.reactivex.disposables.*
+import io.reactivex.exceptions.*
import kotlinx.coroutines.*
import org.hamcrest.core.*
import org.junit.*
@@ -122,11 +123,11 @@ class CompletableTest : TestBase() {
fun testUnhandledException() = runTest() {
expect(1)
var disposable: Disposable? = null
- val eh = CoroutineExceptionHandler { _, t ->
- assertTrue(t is TestException)
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is TestException)
expect(5)
}
- val completable = rxCompletable(currentDispatcher() + eh) {
+ val completable = rxCompletable(currentDispatcher()) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
@@ -135,26 +136,40 @@ class CompletableTest : TestBase() {
throw TestException() // would not be able to handle it since mono is disposed
}
}
- completable.subscribe(object : CompletableObserver {
- override fun onSubscribe(d: Disposable) {
- expect(2)
- disposable = d
- }
- override fun onComplete() { expectUnreached() }
- override fun onError(t: Throwable) { expectUnreached() }
- })
- expect(3)
- yield() // run coroutine
- finish(6)
+ withExceptionHandler(handler) {
+ completable.subscribe(object : CompletableObserver {
+ override fun onSubscribe(d: Disposable) {
+ expect(2)
+ disposable = d
+ }
+
+ override fun onComplete() {
+ expectUnreached()
+ }
+
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ expect(3)
+ yield() // run coroutine
+ finish(6)
+ }
}
@Test
fun testFatalExceptionInSubscribe() = runTest {
- rxCompletable(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
- expect(1)
- 42
- }.subscribe({ throw LinkageError() })
- finish(3)
+ val handler: (Throwable) -> Unit = { e ->
+ assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2)
+ }
+
+ withExceptionHandler(handler) {
+ rxCompletable(Dispatchers.Unconfined) {
+ expect(1)
+ 42
+ }.subscribe({ throw LinkageError() })
+ finish(3)
+ }
}
@Test
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
index 4f3e7241..05b7ee92 100644
--- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines.rx2
+import io.reactivex.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
@@ -16,15 +17,15 @@ class FlowableExceptionHandlingTest : TestBase() {
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
}
- private inline fun <reified T : Throwable> ceh(expect: Int) = CoroutineExceptionHandler { _, t ->
- assertTrue(t is T)
+ private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
+ assertTrue(t is UndeliverableException && t.cause is T)
expect(expect)
}
private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
@Test
- fun testException() = runTest {
+ fun testException() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw TestException()
@@ -37,8 +38,8 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = runTest {
- rxFlowable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
+ fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.subscribe({
@@ -50,7 +51,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testExceptionAsynchronous() = runTest {
+ fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw TestException()
@@ -65,8 +66,8 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = runTest {
- rxFlowable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.publish()
@@ -80,8 +81,8 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionFromSubscribe() = runTest {
- rxFlowable(Dispatchers.Unconfined + ceh<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ rxFlowable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.subscribe({
@@ -92,7 +93,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testExceptionFromSubscribe() = runTest {
+ fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
send(Unit)
@@ -104,7 +105,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousExceptionFromSubscribe() = runTest {
+ fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
rxFlowable(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
send(Unit)
@@ -118,8 +119,8 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousFatalExceptionFromSubscribe() = runTest {
- rxFlowable(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxFlowable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.publish()
diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
new file mode 100644
index 00000000..1430dbf3
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import io.reactivex.exceptions.*
+import io.reactivex.plugins.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.Test
+import java.io.*
+import kotlin.test.*
+
+// Check that exception is not leaked to the global exception handler
+class LeakedExceptionTest : TestBase() {
+
+ private val handler: (Throwable) -> Unit =
+ { assertTrue { it is UndeliverableException && it.cause is TestException } }
+
+ @Test
+ fun testSingle() = withExceptionHandler(handler) {
+ val flow = rxSingle<Unit> { throw TestException() }.toFlowable().asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect { }
+ }
+ }
+ }
+
+ @Test
+ fun testObservable() = withExceptionHandler(handler) {
+ val flow = rxObservable<Unit> { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect { }
+ }
+ }
+ }
+
+ @Test
+ fun testFlowable() = withExceptionHandler(handler) {
+ val flow = rxFlowable<Unit> { throw TestException() }.asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect { }
+ }
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
index 326c83e4..deca961e 100644
--- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import io.reactivex.disposables.*
+import io.reactivex.exceptions.*
import io.reactivex.functions.*
import io.reactivex.internal.functions.Functions.*
import kotlinx.coroutines.*
@@ -66,8 +67,8 @@ class MaybeTest : TestBase() {
expectUnreached()
}, { error ->
expect(5)
- Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
- Assert.assertThat(error.message, IsEqual("OK"))
+ assertThat(error, IsInstanceOf(RuntimeException::class.java))
+ assertThat(error.message, IsEqual("OK"))
})
expect(3)
yield() // to started coroutine
@@ -251,11 +252,11 @@ class MaybeTest : TestBase() {
fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
- val eh = CoroutineExceptionHandler { _, t ->
- assertTrue(t is TestException)
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is TestException)
expect(5)
}
- val maybe = rxMaybe(currentDispatcher() + eh) {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
@@ -264,27 +265,45 @@ class MaybeTest : TestBase() {
throw TestException() // would not be able to handle it since mono is disposed
}
}
- maybe.subscribe(object : MaybeObserver<Unit> {
- override fun onSubscribe(d: Disposable) {
- expect(2)
- disposable = d
- }
- override fun onComplete() { expectUnreached() }
- override fun onSuccess(t: Unit) { expectUnreached() }
- override fun onError(t: Throwable) { expectUnreached() }
- })
- expect(3)
- yield() // run coroutine
- finish(6)
+ withExceptionHandler(handler) {
+ maybe.subscribe(object : MaybeObserver<Unit> {
+ override fun onSubscribe(d: Disposable) {
+ expect(2)
+ disposable = d
+ }
+
+ override fun onComplete() {
+ expectUnreached()
+ }
+
+ override fun onSuccess(t: Unit) {
+ expectUnreached()
+ }
+
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ expect(3)
+ yield() // run coroutine
+ finish(6)
+ }
}
@Test
fun testFatalExceptionInSubscribe() = runTest {
- rxMaybe(Dispatchers.Unconfined + CoroutineExceptionHandler{ _, e -> assertTrue(e is LinkageError); expect(2)}) {
- expect(1)
- 42
- }.subscribe({ throw LinkageError() })
- finish(3)
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is LinkageError)
+ expect(2)
+ }
+
+ withExceptionHandler(handler) {
+ rxMaybe(Dispatchers.Unconfined) {
+ expect(1)
+ 42
+ }.subscribe({ throw LinkageError() })
+ finish(3)
+ }
}
@Test
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
index 6d247cfa..d6cdd3ca 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines.rx2
+import io.reactivex.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
@@ -16,15 +17,15 @@ class ObservableExceptionHandlingTest : TestBase() {
ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
}
- private inline fun <reified T : Throwable> ceh(expect: Int) = CoroutineExceptionHandler { _, t ->
- assertTrue(t is T)
+ private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
+ assertTrue(t is UndeliverableException && t.cause is T)
expect(expect)
}
private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
@Test
- fun testException() = runTest {
+ fun testException() = withExceptionHandler({ expectUnreached() }) {
rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw TestException()
@@ -37,8 +38,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = runTest {
- rxObservable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
+ fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.subscribe({
@@ -50,7 +51,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testExceptionAsynchronous() = runTest {
+ fun testExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw TestException()
@@ -65,8 +66,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = runTest {
- rxObservable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.publish()
@@ -80,8 +81,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionFromSubscribe() = runTest {
- rxObservable(Dispatchers.Unconfined + ceh<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.subscribe({
@@ -92,7 +93,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testExceptionFromSubscribe() = runTest {
+ fun testExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
@@ -104,7 +105,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousExceptionFromSubscribe() = runTest {
+ fun testAsynchronousExceptionFromSubscribe() = withExceptionHandler({ expectUnreached() }) {
rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
@@ -118,8 +119,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousFatalExceptionFromSubscribe() = runTest {
- rxObservable(Dispatchers.Unconfined + ceh<LinkageError>(4)) {
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.publish()
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
index c71ef566..b9f6fe35 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableTest.kt
@@ -4,13 +4,22 @@
package kotlinx.coroutines.rx2
+import io.reactivex.*
+import io.reactivex.plugins.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.hamcrest.core.*
import org.junit.*
import org.junit.Test
+import java.util.concurrent.*
import kotlin.test.*
class ObservableTest : TestBase() {
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
@@ -129,4 +138,28 @@ class ObservableTest : TestBase() {
expect(4)
}
}
+
+ @Test
+ fun testExceptionAfterCancellation() {
+ // Test that no exceptions were reported to the global EH (it will fail the test if so)
+ val handler = { e: Throwable ->
+ assertFalse(e is CancellationException)
+ }
+ withExceptionHandler(handler) {
+ RxJavaPlugins.setErrorHandler {
+ require(it !is CancellationException)
+ }
+ Observable
+ .interval(1, TimeUnit.MILLISECONDS)
+ .take(1000)
+ .switchMapSingle {
+ rxSingle {
+ timeBomb().await()
+ }
+ }
+ .blockingSubscribe({}, {})
+ }
+ }
+
+ private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS).doOnSuccess { throw TestException() }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
index 92511493..d9581f86 100644
--- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
@@ -6,6 +6,7 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import io.reactivex.disposables.*
+import io.reactivex.exceptions.*
import io.reactivex.functions.*
import kotlinx.coroutines.*
import org.hamcrest.core.*
@@ -201,13 +202,19 @@ class SingleTest : TestBase() {
@Test
fun testFatalExceptionInSubscribe() = runTest {
- rxSingle(Dispatchers.Unconfined + CoroutineExceptionHandler { _, e -> assertTrue(e is LinkageError); expect(2) }) {
- expect(1)
- 42
- }.subscribe(Consumer {
- throw LinkageError()
- })
- finish(3)
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is LinkageError)
+ expect(2)
+ }
+ withExceptionHandler(handler) {
+ rxSingle(Dispatchers.Unconfined) {
+ expect(1)
+ 42
+ }.subscribe(Consumer {
+ throw LinkageError()
+ })
+ finish(3)
+ }
}
@Test
@@ -223,11 +230,11 @@ class SingleTest : TestBase() {
fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
- val eh = CoroutineExceptionHandler { _, t ->
- assertTrue(t is TestException)
+ val handler = { e: Throwable ->
+ assertTrue(e is UndeliverableException && e.cause is TestException)
expect(5)
}
- val single = rxSingle(currentDispatcher() + eh) {
+ val single = rxSingle(currentDispatcher()) {
expect(4)
disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
try {
@@ -236,16 +243,24 @@ class SingleTest : TestBase() {
throw TestException() // would not be able to handle it since mono is disposed
}
}
- single.subscribe(object : SingleObserver<Unit> {
- override fun onSubscribe(d: Disposable) {
- expect(2)
- disposable = d
- }
- override fun onSuccess(t: Unit) { expectUnreached() }
- override fun onError(t: Throwable) { expectUnreached() }
- })
- expect(3)
- yield() // run coroutine
- finish(6)
+ withExceptionHandler(handler) {
+ single.subscribe(object : SingleObserver<Unit> {
+ override fun onSubscribe(d: Disposable) {
+ expect(2)
+ disposable = d
+ }
+
+ override fun onSuccess(t: Unit) {
+ expectUnreached()
+ }
+
+ override fun onError(t: Throwable) {
+ expectUnreached()
+ }
+ })
+ expect(3)
+ yield() // run coroutine
+ finish(6)
+ }
}
}