aboutsummaryrefslogtreecommitdiff
path: root/reactive
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2020-10-23 08:20:31 -0700
committerGitHub <noreply@github.com>2020-10-23 18:20:31 +0300
commitee78090b2805b38f28f6565108ea90429568ccfc (patch)
tree611bffb96fbf6f43cf8f829e071b1f9625acf69b /reactive
parent8df6f5ac43da556b74afffa05dcb5a67d2954c51 (diff)
downloadkotlinx.coroutines-ee78090b2805b38f28f6565108ea90429568ccfc.tar.gz
Fix potential crash in Rx2 and Rx3 asFlow extension (#2333)
Fixes #2104 Fixes #2299 Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Diffstat (limited to 'reactive')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxConvert.kt8
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt35
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxConvert.kt8
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt36
4 files changed, 85 insertions, 2 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index cf73ef2e..41c82ed0 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
- override fun onNext(t: T) { sendBlocking(t) }
+ override fun onNext(t: T) {
+ try {
+ sendBlocking(t)
+ } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
+ // Is handled by the downstream flow
+ }
+ }
override fun onError(e: Throwable) { close(e) }
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
new file mode 100644
index 00000000..159f3729
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import java.util.concurrent.*
+
+class ObservableSourceAsFlowStressTest : TestBase() {
+
+ private val iterations = 100 * stressTestMultiplierSqrt
+
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ @Test
+ fun testAsFlowCancellation() = runTest {
+ repeat(iterations) {
+ val latch = Channel<Unit>(1)
+ var i = 0
+ val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
+ .doOnNext { if (++i > 100) latch.offer(Unit) }
+ val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
+ latch.receive()
+ job.cancelAndJoin()
+ }
+ }
+}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index 9bb38c08..0978423a 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
- override fun onNext(t: T) { sendBlocking(t) }
+ override fun onNext(t: T) {
+ try {
+ sendBlocking(t)
+ } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
+ // Is handled by the downstream flow
+ }
+ }
override fun onError(e: Throwable) { close(e) }
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
new file mode 100644
index 00000000..431a7a78
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.exceptions.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+import org.junit.*
+import java.util.concurrent.*
+
+class ObservableSourceAsFlowStressTest : TestBase() {
+
+ private val iterations = 100 * stressTestMultiplierSqrt
+
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ @Test
+ fun testAsFlowCancellation() = runTest {
+ repeat(iterations) {
+ val latch = Channel<Unit>(1)
+ var i = 0
+ val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
+ .doOnNext { if (++i > 100) latch.offer(Unit) }
+ val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
+ latch.receive()
+ job.cancelAndJoin()
+ }
+ }
+}