aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com>2024-03-25 14:16:16 +0100
committerGitHub <noreply@github.com>2024-03-25 16:16:16 +0300
commit0ca735851f0a225b6a7582232a1c9847a27fd059 (patch)
tree09d4b27f643e77efee7116f8fef6f85607b0c4cf
parent60d2fe8471ce4d289d526f04b51389b06b1c064f (diff)
downloadkotlinx.coroutines-0ca735851f0a225b6a7582232a1c9847a27fd059.tar.gz
Fix `Flow.timeout` swallowing the channel closure exception (#4072)
Fixes #4071
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Delay.kt1
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt11
2 files changed, 12 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index cad34a0d..2a701c0c 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -394,6 +394,7 @@ private fun <T> Flow<T>.timeoutInternal(
value.onSuccess {
downStream.emit(it)
}.onClosed {
+ it?.let { throw it }
return@onReceiveCatching false
}
return@onReceiveCatching true
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt
index a2ca101e..0162a216 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt
@@ -237,6 +237,17 @@ class TimeoutTest : TestBase() {
testImmediateTimeout(-1.seconds)
}
+ @Test
+ fun testClosing() = runTest {
+ assertFailsWith<TestException> {
+ channelFlow<Int> { close(TestException()) }
+ .timeout(Duration.INFINITE)
+ .collect {
+ expectUnreached()
+ }
+ }
+ }
+
private fun testImmediateTimeout(timeout: Duration) {
expect(1)
val flow = emptyFlow<Int>().timeout(timeout)