aboutsummaryrefslogtreecommitdiff
path: root/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/src/RxConvert.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxConvert.kt8
1 files changed, 7 insertions, 1 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index 9bb38c08..0978423a 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
- override fun onNext(t: T) { sendBlocking(t) }
+ override fun onNext(t: T) {
+ try {
+ sendBlocking(t)
+ } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
+ // Is handled by the downstream flow
+ }
+ }
override fun onError(e: Throwable) { close(e) }
}