aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt85
1 files changed, 67 insertions, 18 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
index 1430dbf3..7252ca21 100644
--- a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt
@@ -6,12 +6,12 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import io.reactivex.exceptions.*
-import io.reactivex.plugins.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
-import java.io.*
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
import kotlin.test.*
// Check that exception is not leaked to the global exception handler
@@ -22,37 +22,86 @@ class LeakedExceptionTest : TestBase() {
@Test
fun testSingle() = withExceptionHandler(handler) {
- val flow = rxSingle<Unit> { throw TestException() }.toFlowable().asFlow()
- runBlocking {
- repeat(10000) {
- combine(flow, flow) { _, _ -> Unit }
- .catch {}
- .collect { }
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxSingle<Unit>(dispatcher) { throw TestException() }.toFlowable().asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
}
}
}
@Test
fun testObservable() = withExceptionHandler(handler) {
- val flow = rxObservable<Unit> { throw TestException() }.toFlowable(BackpressureStrategy.BUFFER).asFlow()
- runBlocking {
- repeat(10000) {
- combine(flow, flow) { _, _ -> Unit }
- .catch {}
- .collect { }
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxObservable<Unit>(dispatcher) { throw TestException() }
+ .toFlowable(BackpressureStrategy.BUFFER)
+ .asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
}
}
}
@Test
fun testFlowable() = withExceptionHandler(handler) {
- val flow = rxFlowable<Unit> { throw TestException() }.asFlow()
- runBlocking {
- repeat(10000) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxFlowable<Unit>(dispatcher) { throw TestException() }.asFlow()
+ runBlocking {
+ repeat(10000) {
+ combine(flow, flow) { _, _ -> Unit }
+ .catch {}
+ .collect {}
+ }
+ }
+ }
+ }
+
+ /**
+ * This test doesn't test much and was added to display a problem with straighforward use of
+ * [withExceptionHandler].
+ *
+ * If one was to remove `dispatcher` and launch `rxFlowable` with an empty coroutine context,
+ * this test would fail fairly often, while other tests were also vulnerable, but the problem is
+ * much more difficult to reproduce. Thus, this test is a justification for adding `dispatcher`
+ * to other tests.
+ *
+ * See the commit that introduced this test for a better explanation.
+ */
+ @Test
+ fun testResettingExceptionHandler() = withExceptionHandler(handler) {
+ withFixedThreadPool(4) { dispatcher ->
+ val flow = rxFlowable<Unit>(dispatcher) {
+ if ((0..1).random() == 0) {
+ Thread.sleep(100)
+ }
+ throw TestException()
+ }.asFlow()
+ runBlocking {
combine(flow, flow) { _, _ -> Unit }
.catch {}
- .collect { }
+ .collect {}
}
}
}
+
+ /**
+ * Run in a thread pool, then wait for all the tasks to finish.
+ */
+ private fun withFixedThreadPool(numberOfThreads: Int, block: (CoroutineDispatcher) -> Unit) {
+ val pool = Executors.newFixedThreadPool(numberOfThreads)
+ val dispatcher = pool.asCoroutineDispatcher()
+ block(dispatcher)
+ pool.shutdown()
+ while (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ /* deliberately empty */
+ }
+ }
}