aboutsummaryrefslogtreecommitdiff
path: root/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxConvert.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxConvert.kt10
1 files changed, 7 insertions, 3 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 8df6caee..bd369cad 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -94,9 +94,13 @@ public fun <T: Any> Flow<T>.asObservable() : Observable<T> = Observable.create {
emitter.onComplete()
} catch (e: Throwable) {
// 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete`
- if (e !is CancellationException) emitter.onError(e)
- else emitter.onComplete()
-
+ if (e !is CancellationException) {
+ if (!emitter.tryOnError(e)) {
+ handleUndeliverableException(e, coroutineContext)
+ }
+ } else {
+ emitter.onComplete()
+ }
}
}
emitter.setCancellable(RxCancellable(job))