diff options
author | Vsevolod Tolstopyatov <qwwdfsad@gmail.com> | 2019-07-29 21:31:46 -0700 |
---|---|---|
committer | Vsevolod Tolstopyatov <qwwdfsad@gmail.com> | 2019-08-05 17:08:20 +0300 |
commit | a33bf5a8332bb16335bc69f5a893f1d626932f8d (patch) | |
tree | bc4b49731602a9f56cfd36624f21c46c5af49e05 /kotlinx-coroutines-core | |
parent | db95996ceb2758dfc7ccfe9ac805dace2e89128f (diff) | |
download | kotlinx.coroutines-a33bf5a8332bb16335bc69f5a893f1d626932f8d.tar.gz |
Check for cancellation in concurrent flow merge on each element
* Implementation detail (launch on each value) is leaking into upstream behaviour
* The overhead is negligible compared to launching a new coroutines and sending to channel, but it provides a much approachable mental model when no suspension in the upstream flow happens (note: upstream never sends elements to the channel)
Fixes #1392
Diffstat (limited to 'kotlinx-coroutines-core')
4 files changed, 51 insertions, 1 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index e593d035..b1fe91ab 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -151,8 +151,14 @@ private class ChannelFlowMerge<T>( // The actual merge implementation with concurrency limit private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector<T>) { val semaphore = Semaphore(concurrency) - @Suppress("UNCHECKED_CAST") + val job: Job? = coroutineContext[Job] flow.collect { inner -> + /* + * We launch a coroutine on each emitted element and the only potential + * suspension point in this collector is `semaphore.acquire` that rarely suspends, + * so we manually check for cancellation to propagate it to the upstream in time. + */ + job?.ensureActive() semaphore.acquire() // Acquire concurrency permit scope.launch { try { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt index 65fef02c..0b1b208f 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt @@ -183,5 +183,19 @@ class BufferTest : TestBase() { } finish(n + 4) } + + @Test + fun testCancellation() = runTest { + val result = flow { + emit(1) + emit(2) + emit(3) + expectUnreached() + emit(4) + }.buffer(0) + .take(2) + .toList() + assertEquals(listOf(1, 2), result) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt index 6069ae6d..511a003a 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeTest.kt @@ -73,4 +73,19 @@ class FlatMapMergeTest : FlatMapMergeBaseTest() { assertFailsWith<CancellationException>(flow) finish(5) } + + @Test + fun testCancellation() = runTest { + val result = flow { + emit(1) + emit(2) + emit(3) + emit(4) + expectUnreached() // Cancelled by take + emit(5) + }.flatMapMerge(2) { v -> flow { emit(v) } } + .take(2) + .toList() + assertEquals(listOf(1, 2), result) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt index 5d5fa6c6..34c0476e 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt @@ -262,6 +262,21 @@ class FlowOnTest : TestBase() { } @Test + fun testCancellation() = runTest { + val result = flow { + emit(1) + emit(2) + emit(3) + expectUnreached() + emit(4) + }.flowOn(wrapperDispatcher()) + .buffer(0) + .take(2) + .toList() + assertEquals(listOf(1, 2), result) + } + + @Test fun testException() = runTest { val flow = flow { emit(314) |