aboutsummaryrefslogtreecommitdiff
path: root/kotlinx-coroutines-core
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2019-07-29 21:31:46 -0700
committerVsevolod Tolstopyatov <qwwdfsad@gmail.com>2019-08-05 17:08:20 +0300
commita33bf5a8332bb16335bc69f5a893f1d626932f8d (patch)
treebc4b49731602a9f56cfd36624f21c46c5af49e05 /kotlinx-coroutines-core
parentdb95996ceb2758dfc7ccfe9ac805dace2e89128f (diff)
downloadkotlinx.coroutines-a33bf5a8332bb16335bc69f5a893f1d626932f8d.tar.gz
Check for cancellation in concurrent flow merge on each element
* Implementation detail (launch on each value) is leaking into upstream behaviour * The overhead is negligible compared to launching a new coroutines and sending to channel, but it provides a much approachable mental model when no suspension in the upstream flow happens (note: upstream never sends elements to the channel) Fixes #1392
Diffstat (limited to 'kotlinx-coroutines-core')
-rw-r--r--kotlinx-coroutines-core/common/src/flow/operators/Merge.kt8
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt14
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt15
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt15
4 files changed, 51 insertions, 1 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
index e593d035..b1fe91ab 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
@@ -151,8 +151,14 @@ private class ChannelFlowMerge<T>(
// The actual merge implementation with concurrency limit
private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) {
val semaphore = Semaphore(concurrency)
- @Suppress("UNCHECKED_CAST")
+ val job: Job? = coroutineContext[Job]
flow.collect { inner ->
+ /*
+ * We launch a coroutine on each emitted element and the only potential
+ * suspension point in this collector is `semaphore.acquire` that rarely suspends,
+ * so we manually check for cancellation to propagate it to the upstream in time.
+ */
+ job?.ensureActive()
semaphore.acquire() // Acquire concurrency permit
scope.launch {
try {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
index 65fef02c..0b1b208f 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
@@ -183,5 +183,19 @@ class BufferTest : TestBase() {
}
finish(n + 4)
}
+
+ @Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ expectUnreached()
+ emit(4)
+ }.buffer(0)
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
index 6069ae6d..511a003a 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt
@@ -73,4 +73,19 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() {
assertFailsWith<CancellationException>(flow)
finish(5)
}
+
+ @Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ emit(4)
+ expectUnreached() // Cancelled by take
+ emit(5)
+ }.flatMapMerge(2) { v -> flow { emit(v) } }
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
index 5d5fa6c6..34c0476e 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
@@ -262,6 +262,21 @@ class FlowOnTest : TestBase() {
}
@Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ expectUnreached()
+ emit(4)
+ }.flowOn(wrapperDispatcher())
+ .buffer(0)
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
+
+ @Test
fun testException() = runTest {
val flow = flow {
emit(314)