diff options
author | Vsevolod Tolstopyatov <qwwdfsad@gmail.com> | 2020-10-23 08:20:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-23 18:20:31 +0300 |
commit | ee78090b2805b38f28f6565108ea90429568ccfc (patch) | |
tree | 611bffb96fbf6f43cf8f829e071b1f9625acf69b /reactive | |
parent | 8df6f5ac43da556b74afffa05dcb5a67d2954c51 (diff) | |
download | kotlinx.coroutines-ee78090b2805b38f28f6565108ea90429568ccfc.tar.gz |
Fix potential crash in Rx2 and Rx3 asFlow extension (#2333)
Fixes #2104
Fixes #2299
Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Diffstat (limited to 'reactive')
4 files changed, 85 insertions, 2 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index cf73ef2e..41c82ed0 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { val observer = object : Observer<T> { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + // Is handled by the downstream flow + } + } override fun onError(e: Throwable) { close(e) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 00000000..159f3729 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val iterations = 100 * stressTestMultiplierSqrt + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testAsFlowCancellation() = runTest { + repeat(iterations) { + val latch = Channel<Unit>(1) + var i = 0 + val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) + .doOnNext { if (++i > 100) latch.offer(Unit) } + val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) + latch.receive() + job.cancelAndJoin() + } + } +} diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 9bb38c08..0978423a 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -81,7 +81,13 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { val observer = object : Observer<T> { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } - override fun onNext(t: T) { sendBlocking(t) } + override fun onNext(t: T) { + try { + sendBlocking(t) + } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 + // Is handled by the downstream flow + } + } override fun onError(e: Throwable) { close(e) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt new file mode 100644 index 00000000..431a7a78 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.exceptions.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import org.junit.* +import java.util.concurrent.* + +class ObservableSourceAsFlowStressTest : TestBase() { + + private val iterations = 100 * stressTestMultiplierSqrt + + @Before + fun setup() { + ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + @Test + fun testAsFlowCancellation() = runTest { + repeat(iterations) { + val latch = Channel<Unit>(1) + var i = 0 + val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) + .doOnNext { if (++i > 100) latch.offer(Unit) } + val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) + latch.receive() + job.cancelAndJoin() + } + } +} |