diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt | 46 |
1 files changed, 21 insertions, 25 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 96ae6287..efa9c9c9 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -166,11 +166,12 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> public class FlowSubscription<T>( @JvmField public val flow: Flow<T>, @JvmField public val subscriber: Subscriber<in T> -) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) { +) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) { private val requested = atomic(0L) - private val producer = atomic<CancellableContinuation<Unit>?>(null) + private val producer = atomic<Continuation<Unit>?>(createInitialContinuation()) - override fun onStart() { + // This code wraps startCoroutineCancellable into continuation + private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) { ::flowProcessing.startCoroutineCancellable(this) } @@ -197,19 +198,17 @@ public class FlowSubscription<T>( */ private suspend fun consumeFlow() { flow.collect { value -> - /* - * Flow is scopeless, thus if it's not active, its subscription was cancelled. - * No intermediate "child failed, but flow coroutine is not" states are allowed. - */ - coroutineContext.ensureActive() - if (requested.value <= 0L) { + // Emit the value + subscriber.onNext(value) + // Suspend if needed before requesting the next value + if (requested.decrementAndGet() <= 0) { suspendCancellableCoroutine<Unit> { producer.value = it - if (requested.value != 0L) it.resumeSafely() } + } else { + // check for cancellation if we don't suspend + coroutineContext.ensureActive() } - requested.decrementAndGet() - subscriber.onNext(value) } } @@ -218,22 +217,19 @@ public class FlowSubscription<T>( } override fun request(n: Long) { - if (n <= 0) { - return - } - start() - requested.update { value -> + if (n <= 0) return + val old = requested.getAndUpdate { value -> val newValue = value + n if (newValue <= 0L) Long.MAX_VALUE else newValue } - val producer = producer.getAndSet(null) ?: return - producer.resumeSafely() - } - - private fun CancellableContinuation<Unit>.resumeSafely() { - val token = tryResume(Unit) - if (token != null) { - completeResume(token) + if (old <= 0L) { + assert(old == 0L) + // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine + while(true) { + val producer = producer.getAndSet(null) ?: continue // spin if not set yet + producer.resume(Unit) + break + } } } } |