aboutsummaryrefslogtreecommitdiff
path: root/reactive
diff options
context:
space:
mode:
authordkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com>2020-02-13 16:44:49 +0300
committerGitHub <noreply@github.com>2020-02-13 16:44:49 +0300
commit9f4fd70597567dc4477e5d935d06a380edd09eae (patch)
treef3d3d7781605fb19957e83fd9b3c66015b03410c /reactive
parentde491d25764fca8afbece56950f623f4c3155542 (diff)
downloadkotlinx.coroutines-9f4fd70597567dc4477e5d935d06a380edd09eae.tar.gz
Fix a race in some tests for JavaRX integration (#1801)
An extremely rare race could happen in any of the tests in `LeakedExceptionTest` in the following case: * `withExceptionHandler` runs the block passed to it; * In one of the last iterations of `repeat`, `select` in `combine` happens on both flows at the same time, that is, the block that was passed to `rx[Something]` runs in two threads simultaneously; * One of these two threads (thread A) runs anomalously slow; * The other thread successfully throws an exception; * This exception is propagated to `catch`, so `collect` is finished; * `repeat` is exited, the block passed to `withExceptionHandler` is done executing; * `withExceptionHandler` sets back the usual exception handler, which fails when an exception in JavaRX happens (see https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling); * Thread A wakes up and throws an exception. This time, it is passed not to `handler`, which is made specifically to deal with this, but to the default handler. As a fix, now a special coroutine context passed to `rx[Something]` ensures that the spawned executions are run in a thread pool that blocks until all the tasks are done.
Diffstat (limited to 'reactive')
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt85
1 files changed, 67 insertions, 18 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
index 1430dbf3..7252ca21 100644
--- a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
@@ -6,12 +6,12 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import io.reactivex.exceptions.*
-import io.reactivex.plugins.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
-import java.io.*
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
import kotlin.test.*
// Check that exception is not leaked to the global exception handler
@@ -22,37 +22,86 @@ class LeakedExceptionTest : TestBase() {
@Test
fun testSingle() = withExceptionHandler(handler) {
- val flow = rxSingle<Unit> { throw TestException() }.toFlowable().asFlow()
- runBlocking {
- repeat(10000) {
- combine(flow, flow) { _, _ -> Unit }
- .catch {}
- .collect { }
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
}
}
}
@Test
fun testObservable() = withExceptionHandler(handler) {
- val flow = rxObservable<Unit> { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow()
- runBlocking {
- repeat(10000) {
- combine(flow, flow) { _, _ -> Unit }
- .catch {}
- .collect { }
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
+ .toFlowable(BackpressureStrategy.BUFFER)
+ .asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
}
}
}
@Test
fun testFlowable() = withExceptionHandler(handler) {
- val flow = rxFlowable<Unit> { throw TestException() }.asFlow()
- runBlocking {
- repeat(10000) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
+ }
+ }
+ }
+
+ /**
+ * This test doesn't test much and was added to display a problem with straighforward use of
+ * [withExceptionHandler].
+ *
+ * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
+ * this test would fail fairly often, while other tests were also vulnerable, but the problem is
+ * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
+ * to other tests.
+ *
+ * See the commit that introduced this test for a better explanation.
+ */
+ @Test
+ fun testResettingExceptionHandler() = withExceptionHandler(handler) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxFlowable<Unit>(dispatcher) {
+ if ((0..1).random() == 0) {
+ Thread.sleep(100)
+ }
+ throw TestException()
+ }.asFlow()
+ runBlocking {
combine(flow, flow) { _, _ -> Unit }
.catch {}
- .collect { }
+ .collect {}
}
}
}
+
+ /**
+ * Run in a thread pool, then wait for all the tasks to finish.
+ */
+ private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
+ val pool = Executors.newFixedThreadPool(numberOfThreads)
+ val dispatcher = pool.asCoroutineDispatcher()
+ block(dispatcher)
+ pool.shutdown()
+ while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ /* deliberately empty */
+ }
+ }
}