diff options
author | Roman Elizarov <elizarov@gmail.com> | 2020-07-16 18:33:31 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-16 18:33:31 +0300 |
commit | d7189701495c24fe99f394a721033cfe9f7a2436 (patch) | |
tree | bd6b4cd638f044c7b82c06c793ae005648e3f6e4 /reactive | |
parent | 5e91dc45044b28f8b10d76e231e886b0b7da42f1 (diff) | |
download | kotlinx.coroutines-d7189701495c24fe99f394a721033cfe9f7a2436.tar.gz |
Fix race in Flow.asPublisher (#2124)
The race was leading to emitting more items via onNext than requested, the corresponding stress-test was added, too
Fixes #2109
Diffstat (limited to 'reactive')
4 files changed, 170 insertions, 35 deletions
diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt index 8017ee5b..488695de 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -16,10 +16,10 @@ class FlowAsPublisherTest : TestBase() { fun testErrorOnCancellationIsReported() { expect(1) flow<Int> { - emit(2) try { - hang { expect(3) } + emit(2) } finally { + expect(3) throw TestException() } }.asPublisher().subscribe(object : JFlow.Subscriber<Int> { @@ -52,12 +52,11 @@ class FlowAsPublisherTest : TestBase() { expect(1) flow<Int> { emit(2) - hang { expect(3) } }.asPublisher().subscribe(object : JFlow.Subscriber<Int> { private lateinit var subscription: JFlow.Subscription override fun onComplete() { - expect(4) + expect(3) } override fun onSubscribe(s: JFlow.Subscription?) { @@ -74,6 +73,6 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(5) + finish(4) } } diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 96ae6287..efa9c9c9 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -166,11 +166,12 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> public class FlowSubscription<T>( @JvmField public val flow: Flow<T>, @JvmField public val subscriber: Subscriber<in T> -) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, false) { +) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) { private val requested = atomic(0L) - private val producer = atomic<CancellableContinuation<Unit>?>(null) + private val producer = atomic<Continuation<Unit>?>(createInitialContinuation()) - override fun onStart() { + // This code wraps startCoroutineCancellable into continuation + private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) { ::flowProcessing.startCoroutineCancellable(this) } @@ -197,19 +198,17 @@ public class FlowSubscription<T>( */ private suspend fun consumeFlow() { flow.collect { value -> - /* - * Flow is scopeless, thus if it's not active, its subscription was cancelled. - * No intermediate "child failed, but flow coroutine is not" states are allowed. - */ - coroutineContext.ensureActive() - if (requested.value <= 0L) { + // Emit the value + subscriber.onNext(value) + // Suspend if needed before requesting the next value + if (requested.decrementAndGet() <= 0) { suspendCancellableCoroutine<Unit> { producer.value = it - if (requested.value != 0L) it.resumeSafely() } + } else { + // check for cancellation if we don't suspend + coroutineContext.ensureActive() } - requested.decrementAndGet() - subscriber.onNext(value) } } @@ -218,22 +217,19 @@ public class FlowSubscription<T>( } override fun request(n: Long) { - if (n <= 0) { - return - } - start() - requested.update { value -> + if (n <= 0) return + val old = requested.getAndUpdate { value -> val newValue = value + n if (newValue <= 0L) Long.MAX_VALUE else newValue } - val producer = producer.getAndSet(null) ?: return - producer.resumeSafely() - } - - private fun CancellableContinuation<Unit>.resumeSafely() { - val token = tryResume(Unit) - if (token != null) { - completeResume(token) + if (old <= 0L) { + assert(old == 0L) + // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine + while(true) { + val producer = producer.getAndSet(null) ?: continue // spin if not set yet + producer.resume(Unit) + break + } } } } diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index 86334928..c044d927 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -16,10 +16,10 @@ class FlowAsPublisherTest : TestBase() { fun testErrorOnCancellationIsReported() { expect(1) flow<Int> { - emit(2) try { - hang { expect(3) } + emit(2) } finally { + expect(3) throw TestException() } }.asPublisher().subscribe(object : Subscriber<Int> { @@ -52,12 +52,11 @@ class FlowAsPublisherTest : TestBase() { expect(1) flow<Int> { emit(2) - hang { expect(3) } }.asPublisher().subscribe(object : Subscriber<Int> { private lateinit var subscription: Subscription override fun onComplete() { - expect(4) + expect(3) } override fun onSubscribe(s: Subscription?) { @@ -74,6 +73,6 @@ class FlowAsPublisherTest : TestBase() { expectUnreached() } }) - finish(5) + finish(4) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt new file mode 100644 index 00000000..736a6640 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherRequestStressTest.kt @@ -0,0 +1,141 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import org.junit.* +import org.reactivestreams.* +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* +import kotlin.random.* + +/** + * This stress-test is self-contained reproducer for the race in [Flow.asPublisher] extension + * that was originally reported in the issue + * [#2109](https://github.com/Kotlin/kotlinx.coroutines/issues/2109). + * The original reproducer used a flow that loads a file using AsynchronousFileChannel + * (that issues completion callbacks from multiple threads) + * and uploads it to S3 via Amazon SDK, which internally uses netty for I/O + * (which uses a single thread for connection-related callbacks). + * + * This stress-test essentially mimics the logic in multiple interacting threads: several emitter threads that form + * the flow and a single requesting thread works on the subscriber's side to periodically request more + * values when the number of items requested drops below the threshold. + */ +@Suppress("ReactiveStreamsSubscriberImplementation") +class PublisherRequestStressTest : TestBase() { + private val testDurationSec = 3 * stressTestMultiplier + + // Original code in Amazon SDK uses 4 and 16 as low/high watermarks. + // There constants were chosen so that problem reproduces asap with particular this code. + private val minDemand = 8L + private val maxDemand = 16L + + private val nEmitThreads = 4 + + private val emitThreadNo = AtomicInteger() + + private val emitPool = Executors.newFixedThreadPool(nEmitThreads) { r -> + Thread(r, "PublisherRequestStressTest-emit-${emitThreadNo.incrementAndGet()}") + } + + private val reqPool = Executors.newSingleThreadExecutor { r -> + Thread(r, "PublisherRequestStressTest-req") + } + + private val nextValue = AtomicLong(0) + + @After + fun tearDown() { + emitPool.shutdown() + reqPool.shutdown() + emitPool.awaitTermination(10, TimeUnit.SECONDS) + reqPool.awaitTermination(10, TimeUnit.SECONDS) + } + + private lateinit var subscription: Subscription + + @Test + fun testRequestStress() { + val expectedValue = AtomicLong(0) + val requestedTill = AtomicLong(0) + val completionLatch = CountDownLatch(1) + val callingOnNext = AtomicInteger() + + val publisher = mtFlow().asPublisher() + var error = false + + publisher.subscribe(object : Subscriber<Long> { + private var demand = 0L // only updated from reqPool + + override fun onComplete() { + completionLatch.countDown() + } + + override fun onSubscribe(sub: Subscription) { + subscription = sub + maybeRequestMore() + } + + private fun maybeRequestMore() { + if (demand >= minDemand) return + val nextDemand = Random.nextLong(minDemand + 1..maxDemand) + val more = nextDemand - demand + demand = nextDemand + requestedTill.addAndGet(more) + subscription.request(more) + } + + override fun onNext(value: Long) { + check(callingOnNext.getAndIncrement() == 0) // make sure it is not concurrent + // check for expected value + check(value == expectedValue.get()) + // check that it does not exceed requested values + check(value < requestedTill.get()) + val nextExpected = value + 1 + expectedValue.set(nextExpected) + // send more requests from request thread + reqPool.execute { + demand-- // processed an item + maybeRequestMore() + } + callingOnNext.decrementAndGet() + } + + override fun onError(ex: Throwable?) { + error = true + error("Failed", ex) + } + }) + var prevExpected = -1L + for (second in 1..testDurationSec) { + if (error) break + Thread.sleep(1000) + val expected = expectedValue.get() + println("$second: expectedValue = $expected") + check(expected > prevExpected) // should have progress + prevExpected = expected + } + if (!error) { + subscription.cancel() + completionLatch.await() + } + } + + private fun mtFlow(): Flow<Long> = flow { + while (currentCoroutineContext().isActive) { + emit(aWait()) + } + } + + private suspend fun aWait(): Long = suspendCancellableCoroutine { cont -> + emitPool.execute(Runnable { + cont.resume(nextValue.getAndIncrement()) + }) + } +}
\ No newline at end of file |