aboutsummaryrefslogtreecommitdiff
path: root/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt46
1 files changed, 21 insertions, 25 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
index 96ae6287..efa9c9c9 100644
--- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
@@ -166,11 +166,12 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T>
public class FlowSubscription<T>(
@JvmField public val flow: Flow<T>,
@JvmField public val subscriber: Subscriber<in T>
-) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) {
+) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) {
private val requested = atomic(0L)
- private val producer = atomic<CancellableContinuation<Unit>?>(null)
+ private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
- override fun onStart() {
+ // This code wraps startCoroutineCancellable into continuation
+ private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) {
::flowProcessing.startCoroutineCancellable(this)
}
@@ -197,19 +198,17 @@ public class FlowSubscription<T>(
*/
private suspend fun consumeFlow() {
flow.collect { value ->
- /*
- * Flow is scopeless, thus if it's not active, its subscription was cancelled.
- * No intermediate "child failed, but flow coroutine is not" states are allowed.
- */
- coroutineContext.ensureActive()
- if (requested.value <= 0L) {
+ // Emit the value
+ subscriber.onNext(value)
+ // Suspend if needed before requesting the next value
+ if (requested.decrementAndGet() <= 0) {
suspendCancellableCoroutine<Unit> {
producer.value = it
- if (requested.value != 0L) it.resumeSafely()
}
+ } else {
+ // check for cancellation if we don't suspend
+ coroutineContext.ensureActive()
}
- requested.decrementAndGet()
- subscriber.onNext(value)
}
}
@@ -218,22 +217,19 @@ public class FlowSubscription<T>(
}
override fun request(n: Long) {
- if (n <= 0) {
- return
- }
- start()
- requested.update { value ->
+ if (n <= 0) return
+ val old = requested.getAndUpdate { value ->
val newValue = value + n
if (newValue <= 0L) Long.MAX_VALUE else newValue
}
- val producer = producer.getAndSet(null) ?: return
- producer.resumeSafely()
- }
-
- private fun CancellableContinuation<Unit>.resumeSafely() {
- val token = tryResume(Unit)
- if (token != null) {
- completeResume(token)
+ if (old <= 0L) {
+ assert(old == 0L)
+ // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine
+ while(true) {
+ val producer = producer.getAndSet(null) ?: continue // spin if not set yet
+ producer.resume(Unit)
+ break
+ }
}
}
}