aboutsummaryrefslogtreecommitdiff
path: root/reactive
diff options
context:
space:
mode:
authorRoman Elizarov <elizarov@gmail.com>2020-01-21 14:58:47 +0300
committerRoman Elizarov <elizarov@gmail.com>2020-01-24 19:10:55 +0300
commit3d59fef9572b7116139ef8555481438db55842b1 (patch)
tree335ba9a1af7b882b9f3705d5af613ca2b6f9ce62 /reactive
parentf18e0e484831bc0fc61a2d929428725a92126b83 (diff)
downloadkotlinx.coroutines-3d59fef9572b7116139ef8555481438db55842b1.tar.gz
Fix context support in Publisher.asFlow.flowOn
* When using asFlow().flowOn(...) context is now properly tracked and taken into account for both execution context of the reactive subscription and for injection into Reactor context. * Publisher.asFlow slow-path implementation is simplified. It does not sure specialized openSubscription anymore, but always uses the same flow request logic. Fixes #1765
Diffstat (limited to 'reactive')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt55
-rw-r--r--reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt4
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt52
-rw-r--r--reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt43
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt43
5 files changed, 163 insertions, 34 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
index b3b764a4..5accbf24 100644
--- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive
@@ -27,7 +27,7 @@ import kotlin.coroutines.*
* see its documentation for additional details.
*/
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
- PublisherAsFlow(this, 1)
+ PublisherAsFlow(this)
/**
* Transforms the given flow to a reactive specification compliant [Publisher].
@@ -39,30 +39,11 @@ public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
- capacity: Int
-) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
+ context: CoroutineContext = EmptyCoroutineContext,
+ capacity: Int = 1
+) : ChannelFlow<T>(context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
- PublisherAsFlow(publisher, capacity)
-
- override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
- // use another channel for conflation (cannot do openSubscription)
- if (capacity < 0) return super.produceImpl(scope)
- // Open subscription channel directly
- val channel = publisher
- .injectCoroutineContext(scope.coroutineContext)
- .openSubscription(capacity)
- val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
- channel.cancel(cause?.let {
- it as? CancellationException ?: CancellationException("Job was cancelled", it)
- })
- }
- if (handle != null && handle !== NonDisposableHandle) {
- (channel as SendChannel<*>).invokeOnClose {
- handle.dispose()
- }
- }
- return channel
- }
+ PublisherAsFlow(publisher, context, capacity)
private val requestSize: Long
get() = when (capacity) {
@@ -73,8 +54,26 @@ private class PublisherAsFlow<T : Any>(
}
override suspend fun collect(collector: FlowCollector<T>) {
+ val collectContext = coroutineContext
+ val newDispatcher = context[ContinuationInterceptor]
+ if (newDispatcher == null || newDispatcher == collectContext[ContinuationInterceptor]) {
+ // fast path -- subscribe directly in this dispatcher
+ return collectImpl(collectContext + context, collector)
+ }
+ // slow path -- produce in a separate dispatcher
+ collectSlowPath(collector)
+ }
+
+ private suspend fun collectSlowPath(collector: FlowCollector<T>) {
+ coroutineScope {
+ collector.emitAll(produceImpl(this + context))
+ }
+ }
+
+ private suspend fun collectImpl(injectContext: CoroutineContext, collector: FlowCollector<T>) {
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
- publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
+ // inject subscribe context into publisher
+ publisher.injectCoroutineContext(injectContext).subscribe(subscriber)
try {
var consumed = 0L
while (true) {
@@ -90,9 +89,9 @@ private class PublisherAsFlow<T : Any>(
}
}
- // The second channel here is used only for broadcast
+ // The second channel here is used for produceIn/broadcastIn and slow-path (dispatcher change)
override suspend fun collectTo(scope: ProducerScope<T>) =
- collect(SendingCollector(scope.channel))
+ collectImpl(scope.coroutineContext, SendingCollector(scope.channel))
}
@Suppress("SubscriberImplementation")
diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
index a37719de..e458c918 100644
--- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
+++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt
@@ -120,7 +120,7 @@ class PublisherAsFlowTest : TestBase() {
7 -> try {
send(value)
} catch (e: CancellationException) {
- finish(6)
+ expect(5)
throw e
}
else -> expectUnreached()
@@ -143,6 +143,6 @@ class PublisherAsFlowTest : TestBase() {
}
}
}
- expect(5)
+ finish(6)
}
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
index ccef000b..e4bd8b31 100644
--- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
+++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
@@ -4,16 +4,17 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
-import reactor.core.publisher.Mono
+import reactor.core.publisher.*
import reactor.util.context.Context
-import kotlin.test.assertEquals
+import kotlin.test.*
class FlowAsFluxTest : TestBase() {
@Test
- fun testFlowToFluxContextPropagation() {
+ fun testFlowAsFluxContextPropagation() {
val flux = flow<String> {
(1..4).forEach { i -> emit(createMono(i).awaitFirst()) }
- } .asFlux()
+ }
+ .asFlux()
.subscriberContext(Context.of(1, "1"))
.subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
val list = flux.collectList().block()!!
@@ -24,4 +25,47 @@ class FlowAsFluxTest : TestBase() {
val ctx = coroutineContext[ReactorContext]!!.context
ctx.getOrDefault(i, "noValue")
}
+
+ @Test
+ fun testFluxAsFlowContextPropagationWithFlowOn() = runTest {
+ expect(1)
+ Flux.create<String> {
+ it.next("OK")
+ it.complete()
+ }
+ .subscriberContext { ctx ->
+ expect(2)
+ assertEquals("CTX", ctx.get(1))
+ ctx
+ }
+ .asFlow()
+ .flowOn(ReactorContext(Context.of(1, "CTX")))
+ .collect {
+ expect(3)
+ assertEquals("OK", it)
+ }
+ finish(4)
+ }
+
+ @Test
+ fun testFluxAsFlowContextPropagationFromScope() = runTest {
+ expect(1)
+ withContext(ReactorContext(Context.of(1, "CTX"))) {
+ Flux.create<String> {
+ it.next("OK")
+ it.complete()
+ }
+ .subscriberContext { ctx ->
+ expect(2)
+ assertEquals("CTX", ctx.get(1))
+ ctx
+ }
+ .asFlow()
+ .collect {
+ expect(3)
+ assertEquals("OK", it)
+ }
+ }
+ finish(4)
+ }
} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt
new file mode 100644
index 00000000..1ed3a164
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/test/FluxContextTest.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.*
+import org.junit.Test
+import reactor.core.publisher.*
+import kotlin.test.*
+
+class FluxContextTest : TestBase() {
+ private val dispatcher = newSingleThreadContext("FluxContextTest")
+
+ @After
+ fun tearDown() {
+ dispatcher.close()
+ }
+
+ @Test
+ fun testFluxCreateAsFlowThread() = runTest {
+ expect(1)
+ val mainThread = Thread.currentThread()
+ val dispatcherThread = withContext(dispatcher) { Thread.currentThread() }
+ assertTrue(dispatcherThread != mainThread)
+ Flux.create<String> {
+ assertEquals(dispatcherThread, Thread.currentThread())
+ it.next("OK")
+ it.complete()
+ }
+ .asFlow()
+ .flowOn(dispatcher)
+ .collect {
+ expect(2)
+ assertEquals("OK", it)
+ assertEquals(mainThread, Thread.currentThread())
+ }
+ finish(3)
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt
new file mode 100644
index 00000000..2cc32435
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowableContextTest.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class FlowableContextTest : TestBase() {
+ private val dispatcher = newSingleThreadContext("FlowableContextTest")
+
+ @After
+ fun tearDown() {
+ dispatcher.close()
+ }
+
+ @Test
+ fun testFlowableCreateAsFlowThread() = runTest {
+ expect(1)
+ val mainThread = Thread.currentThread()
+ val dispatcherThread = withContext(dispatcher) { Thread.currentThread() }
+ assertTrue(dispatcherThread != mainThread)
+ Flowable.create<String>({
+ assertEquals(dispatcherThread, Thread.currentThread())
+ it.onNext("OK")
+ it.onComplete()
+ }, BackpressureStrategy.BUFFER)
+ .asFlow()
+ .flowOn(dispatcher)
+ .collect {
+ expect(2)
+ assertEquals("OK", it)
+ assertEquals(mainThread, Thread.currentThread())
+ }
+ finish(3)
+ }
+} \ No newline at end of file