From 3d59fef9572b7116139ef8555481438db55842b1 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 21 Jan 2020 14:58:47 +0300 Subject: Fix context support in Publisher.asFlow.flowOn * When using asFlow().flowOn(...) context is now properly tracked and taken into account for both execution context of the reactive subscription and for injection into Reactor context. * Publisher.asFlow slow-path implementation is simplified. It does not sure specialized openSubscription anymore, but always uses the same flow request logic. Fixes #1765 --- .../src/ReactiveFlow.kt | 55 +++++++++++----------- .../test/PublisherAsFlowTest.kt | 4 +- .../test/FlowAsFluxTest.kt | 52 ++++++++++++++++++-- .../test/FluxContextTest.kt | 43 +++++++++++++++++ .../test/FlowableContextTest.kt | 43 +++++++++++++++++ 5 files changed, 163 insertions(+), 34 deletions(-) create mode 100644 reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt create mode 100644 reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt (limited to 'reactive') diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index b3b764a4..5accbf24 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.reactive @@ -27,7 +27,7 @@ import kotlin.coroutines.* * see its documentation for additional details. */ public fun Publisher.asFlow(): Flow = - PublisherAsFlow(this, 1) + PublisherAsFlow(this) /** * Transforms the given flow to a reactive specification compliant [Publisher]. @@ -39,30 +39,11 @@ public fun Flow.asPublisher(): Publisher = FlowAsPublisher(this) private class PublisherAsFlow( private val publisher: Publisher, - capacity: Int -) : ChannelFlow(EmptyCoroutineContext, capacity) { + context: CoroutineContext = EmptyCoroutineContext, + capacity: Int = 1 +) : ChannelFlow(context, capacity) { override fun create(context: CoroutineContext, capacity: Int): ChannelFlow = - PublisherAsFlow(publisher, capacity) - - override fun produceImpl(scope: CoroutineScope): ReceiveChannel { - // use another channel for conflation (cannot do openSubscription) - if (capacity < 0) return super.produceImpl(scope) - // Open subscription channel directly - val channel = publisher - .injectCoroutineContext(scope.coroutineContext) - .openSubscription(capacity) - val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause -> - channel.cancel(cause?.let { - it as? CancellationException ?: CancellationException("Job was cancelled", it) - }) - } - if (handle != null && handle !== NonDisposableHandle) { - (channel as SendChannel<*>).invokeOnClose { - handle.dispose() - } - } - return channel - } + PublisherAsFlow(publisher, context, capacity) private val requestSize: Long get() = when (capacity) { @@ -73,8 +54,26 @@ private class PublisherAsFlow( } override suspend fun collect(collector: FlowCollector) { + val collectContext = coroutineContext + val newDispatcher = context[ContinuationInterceptor] + if (newDispatcher == null || newDispatcher == collectContext[ContinuationInterceptor]) { + // fast path -- subscribe directly in this dispatcher + return collectImpl(collectContext + context, collector) + } + // slow path -- produce in a separate dispatcher + collectSlowPath(collector) + } + + private suspend fun collectSlowPath(collector: FlowCollector) { + coroutineScope { + collector.emitAll(produceImpl(this + context)) + } + } + + private suspend fun collectImpl(injectContext: CoroutineContext, collector: FlowCollector) { val subscriber = ReactiveSubscriber(capacity, requestSize) - publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber) + // inject subscribe context into publisher + publisher.injectCoroutineContext(injectContext).subscribe(subscriber) try { var consumed = 0L while (true) { @@ -90,9 +89,9 @@ private class PublisherAsFlow( } } - // The second channel here is used only for broadcast + // The second channel here is used for produceIn/broadcastIn and slow-path (dispatcher change) override suspend fun collectTo(scope: ProducerScope) = - collect(SendingCollector(scope.channel)) + collectImpl(scope.coroutineContext, SendingCollector(scope.channel)) } @Suppress("SubscriberImplementation") diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt index a37719de..e458c918 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt @@ -120,7 +120,7 @@ class PublisherAsFlowTest : TestBase() { 7 -> try { send(value) } catch (e: CancellationException) { - finish(6) + expect(5) throw e } else -> expectUnreached() @@ -143,6 +143,6 @@ class PublisherAsFlowTest : TestBase() { } } } - expect(5) + finish(6) } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt index ccef000b..e4bd8b31 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -4,16 +4,17 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.Test -import reactor.core.publisher.Mono +import reactor.core.publisher.* import reactor.util.context.Context -import kotlin.test.assertEquals +import kotlin.test.* class FlowAsFluxTest : TestBase() { @Test - fun testFlowToFluxContextPropagation() { + fun testFlowAsFluxContextPropagation() { val flux = flow { (1..4).forEach { i -> emit(createMono(i).awaitFirst()) } - } .asFlux() + } + .asFlux() .subscriberContext(Context.of(1, "1")) .subscriberContext(Context.of(2, "2", 3, "3", 4, "4")) val list = flux.collectList().block()!! @@ -24,4 +25,47 @@ class FlowAsFluxTest : TestBase() { val ctx = coroutineContext[ReactorContext]!!.context ctx.getOrDefault(i, "noValue") } + + @Test + fun testFluxAsFlowContextPropagationWithFlowOn() = runTest { + expect(1) + Flux.create { + it.next("OK") + it.complete() + } + .subscriberContext { ctx -> + expect(2) + assertEquals("CTX", ctx.get(1)) + ctx + } + .asFlow() + .flowOn(ReactorContext(Context.of(1, "CTX"))) + .collect { + expect(3) + assertEquals("OK", it) + } + finish(4) + } + + @Test + fun testFluxAsFlowContextPropagationFromScope() = runTest { + expect(1) + withContext(ReactorContext(Context.of(1, "CTX"))) { + Flux.create { + it.next("OK") + it.complete() + } + .subscriberContext { ctx -> + expect(2) + assertEquals("CTX", ctx.get(1)) + ctx + } + .asFlow() + .collect { + expect(3) + assertEquals("OK", it) + } + } + finish(4) + } } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt new file mode 100644 index 00000000..1ed3a164 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactor + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* +import org.junit.* +import org.junit.Test +import reactor.core.publisher.* +import kotlin.test.* + +class FluxContextTest : TestBase() { + private val dispatcher = newSingleThreadContext("FluxContextTest") + + @After + fun tearDown() { + dispatcher.close() + } + + @Test + fun testFluxCreateAsFlowThread() = runTest { + expect(1) + val mainThread = Thread.currentThread() + val dispatcherThread = withContext(dispatcher) { Thread.currentThread() } + assertTrue(dispatcherThread != mainThread) + Flux.create { + assertEquals(dispatcherThread, Thread.currentThread()) + it.next("OK") + it.complete() + } + .asFlow() + .flowOn(dispatcher) + .collect { + expect(2) + assertEquals("OK", it) + assertEquals(mainThread, Thread.currentThread()) + } + finish(3) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt new file mode 100644 index 00000000..2cc32435 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FlowableContextTest : TestBase() { + private val dispatcher = newSingleThreadContext("FlowableContextTest") + + @After + fun tearDown() { + dispatcher.close() + } + + @Test + fun testFlowableCreateAsFlowThread() = runTest { + expect(1) + val mainThread = Thread.currentThread() + val dispatcherThread = withContext(dispatcher) { Thread.currentThread() } + assertTrue(dispatcherThread != mainThread) + Flowable.create({ + assertEquals(dispatcherThread, Thread.currentThread()) + it.onNext("OK") + it.onComplete() + }, BackpressureStrategy.BUFFER) + .asFlow() + .flowOn(dispatcher) + .collect { + expect(2) + assertEquals("OK", it) + assertEquals(mainThread, Thread.currentThread()) + } + finish(3) + } +} \ No newline at end of file -- cgit v1.2.3