diff options
author | dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> | 2020-02-13 16:44:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-13 16:44:49 +0300 |
commit | 9f4fd70597567dc4477e5d935d06a380edd09eae (patch) | |
tree | f3d3d7781605fb19957e83fd9b3c66015b03410c /reactive | |
parent | de491d25764fca8afbece56950f623f4c3155542 (diff) | |
download | kotlinx.coroutines-9f4fd70597567dc4477e5d935d06a380edd09eae.tar.gz |
Fix a race in some tests for JavaRX integration (#1801)
An extremely rare race could happen in any of the tests in
`LeakedExceptionTest` in the following case:
* `withExceptionHandler` runs the block passed to it;
* In one of the last iterations of `repeat`, `select` in `combine`
happens on both flows at the same time, that is, the block that
was passed to `rx[Something]` runs in two threads
simultaneously;
* One of these two threads (thread A) runs anomalously slow;
* The other thread successfully throws an exception;
* This exception is propagated to `catch`, so `collect` is
finished;
* `repeat` is exited, the block passed to `withExceptionHandler` is
done executing;
* `withExceptionHandler` sets back the usual exception handler,
which fails when an exception in JavaRX happens (see
https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling);
* Thread A wakes up and throws an exception. This time, it is
passed not to `handler`, which is made specifically to deal with
this, but to the default handler.
As a fix, now a special coroutine context passed to `rx[Something]`
ensures that the spawned executions are run in a thread pool that
blocks until all the tasks are done.
Diffstat (limited to 'reactive')
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt | 85 |
1 files changed, 67 insertions, 18 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt index 1430dbf3..7252ca21 100644 --- a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt @@ -6,12 +6,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.exceptions.* -import io.reactivex.plugins.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test -import java.io.* +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import kotlin.test.* // Check that exception is not leaked to the global exception handler @@ -22,37 +22,86 @@ class LeakedExceptionTest : TestBase() { @Test fun testSingle() = withExceptionHandler(handler) { - val flow = rxSingle<Unit> { throw TestException() }.toFlowable().asFlow() - runBlocking { - repeat(10000) { - combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + withFixedThreadPool(4) { dispatcher -> + val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect {} + } } } } @Test fun testObservable() = withExceptionHandler(handler) { - val flow = rxObservable<Unit> { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow() - runBlocking { - repeat(10000) { - combine(flow, flow) { _, _ -> Unit } - .catch {} - .collect { } + withFixedThreadPool(4) { dispatcher -> + val flow = rxObservable<Unit>(dispatcher) { throw TestException() } + .toFlowable(BackpressureStrategy.BUFFER) + .asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect {} + } } } } @Test fun testFlowable() = withExceptionHandler(handler) { - val flow = rxFlowable<Unit> { throw TestException() }.asFlow() - runBlocking { - repeat(10000) { + withFixedThreadPool(4) { dispatcher -> + val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow() + runBlocking { + repeat(10000) { + combine(flow, flow) { _, _ -> Unit } + .catch {} + .collect {} + } + } + } + } + + /** + * This test doesn't test much and was added to display a problem with straighforward use of + * [withExceptionHandler]. + * + * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context, + * this test would fail fairly often, while other tests were also vulnerable, but the problem is + * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher` + * to other tests. + * + * See the commit that introduced this test for a better explanation. + */ + @Test + fun testResettingExceptionHandler() = withExceptionHandler(handler) { + withFixedThreadPool(4) { dispatcher -> + val flow = rxFlowable<Unit>(dispatcher) { + if ((0..1).random() == 0) { + Thread.sleep(100) + } + throw TestException() + }.asFlow() + runBlocking { combine(flow, flow) { _, _ -> Unit } .catch {} - .collect { } + .collect {} } } } + + /** + * Run in a thread pool, then wait for all the tasks to finish. + */ + private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) { + val pool = Executors.newFixedThreadPool(numberOfThreads) + val dispatcher = pool.asCoroutineDispatcher() + block(dispatcher) + pool.shutdown() + while (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + /* deliberately empty */ + } + } } |