diff options
Diffstat (limited to 'kotlinx-coroutines-core/jvm')
134 files changed, 1654 insertions, 1128 deletions
diff --git a/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin b/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin Binary files differindex 76ee4115..397aaf67 100644 --- a/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin +++ b/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin diff --git a/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro b/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro index 60c8d612..1a9ae1c7 100644 --- a/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro +++ b/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro @@ -3,12 +3,12 @@ -keepnames class kotlinx.coroutines.CoroutineExceptionHandler {} # Most of volatile fields are updated with AFU and should not be mangled --keepclassmembernames class kotlinx.** { +-keepclassmembers class kotlinx.coroutines.** { volatile <fields>; } # Same story for the standard library's SafeContinuation that also uses AtomicReferenceFieldUpdater --keepclassmembernames class kotlin.coroutines.SafeContinuation { +-keepclassmembers class kotlin.coroutines.SafeContinuation { volatile <fields>; } diff --git a/kotlinx-coroutines-core/jvm/src/TimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index 4b6fd991..3f7ac675 100644 --- a/kotlinx-coroutines-core/jvm/src/TimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ // Need InlineOnly for efficient bytecode on Android @@ -10,21 +10,21 @@ package kotlinx.coroutines import java.util.concurrent.locks.* import kotlin.internal.InlineOnly -internal interface TimeSource { - fun currentTimeMillis(): Long - fun nanoTime(): Long - fun wrapTask(block: Runnable): Runnable - fun trackTask() - fun unTrackTask() - fun registerTimeLoopThread() - fun unregisterTimeLoopThread() - fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 - fun unpark(thread: Thread) +internal abstract class AbstractTimeSource { + abstract fun currentTimeMillis(): Long + abstract fun nanoTime(): Long + abstract fun wrapTask(block: Runnable): Runnable + abstract fun trackTask() + abstract fun unTrackTask() + abstract fun registerTimeLoopThread() + abstract fun unregisterTimeLoopThread() + abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 + abstract fun unpark(thread: Thread) } // For tests only // @JvmField: Don't use JvmField here to enable R8 optimizations via "assumenosideeffects" -internal var timeSource: TimeSource? = null +internal var timeSource: AbstractTimeSource? = null @InlineOnly internal inline fun currentTimeMillis(): Long = diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index e4504ccd..edb43031 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:JvmMultifileClass @@ -63,13 +63,14 @@ private class BlockingCoroutine<T>( parentContext: CoroutineContext, private val blockedThread: Thread, private val eventLoop: EventLoop? -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, true, true) { + override val isScopedCoroutine: Boolean get() = true override fun afterCompletion(state: Any?) { // wake up blocked thread if (Thread.currentThread() != blockedThread) - LockSupport.unpark(blockedThread) + unpark(blockedThread) } @Suppress("UNCHECKED_CAST") diff --git a/kotlinx-coroutines-core/jvm/src/CommonPool.kt b/kotlinx-coroutines-core/jvm/src/CommonPool.kt index 22033131..502630b0 100644 --- a/kotlinx-coroutines-core/jvm/src/CommonPool.kt +++ b/kotlinx-coroutines-core/jvm/src/CommonPool.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -28,7 +28,7 @@ internal object CommonPool : ExecutorCoroutineDispatcher() { * Note that until Java 10, if an application is run within a container, * `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores. */ - public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism" + private const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism" override val executor: Executor get() = pool ?: getOrCreatePoolSync() @@ -62,7 +62,7 @@ internal object CommonPool : ExecutorCoroutineDispatcher() { ?: return createPlainPool() // Fallback to plain thread pool // Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode if (!usePrivatePool && requestedParallelism < 0) { - Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService } + Try { fjpClass.getMethod("commonPool").invoke(null) as? ExecutorService } ?.takeIf { isGoodCommonPool(fjpClass, it) } ?.let { return it } } diff --git a/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt b/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt index 706f6c49..4835f796 100644 --- a/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt +++ b/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt index 5a69d48a..e91bb9fd 100644 --- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt @@ -1,13 +1,13 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.scheduling.* -import java.util.concurrent.atomic.* import kotlin.coroutines.* +import kotlin.coroutines.jvm.internal.CoroutineStackFrame internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler" @@ -48,6 +48,102 @@ internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, c } } +/** + * Executes a block using a context of a given continuation. + */ +internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T { + val context = continuation.context + val oldValue = updateThreadContext(context, countOrElement) + val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) { + // Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them + continuation.updateUndispatchedCompletion(context, oldValue) + } else { + null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context + } + try { + return block() + } finally { + if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext()) { + restoreThreadContext(context, oldValue) + } + } +} + +internal fun Continuation<*>.updateUndispatchedCompletion(context: CoroutineContext, oldValue: Any?): UndispatchedCoroutine<*>? { + if (this !is CoroutineStackFrame) return null + /* + * Fast-path to detect whether we have unispatched coroutine at all in our stack. + * + * Implementation note. + * If we ever find that stackwalking for thread-locals is way too slow, here is another idea: + * 1) Store undispatched coroutine right in the `UndispatchedMarker` instance + * 2) To avoid issues with cross-dispatch boundary, remove `UndispatchedMarker` + * from the context when creating dispatched coroutine in `withContext`. + * Another option is to "unmark it" instead of removing to save an allocation. + * Both options should work, but it requires more careful studying of the performance + * and, mostly, maintainability impact. + */ + val potentiallyHasUndispatchedCorotuine = context[UndispatchedMarker] !== null + if (!potentiallyHasUndispatchedCorotuine) return null + val completion = undispatchedCompletion() + completion?.saveThreadContext(context, oldValue) + return completion +} + +internal tailrec fun CoroutineStackFrame.undispatchedCompletion(): UndispatchedCoroutine<*>? { + // Find direct completion of this continuation + val completion: CoroutineStackFrame = when (this) { + is DispatchedCoroutine<*> -> return null + else -> callerFrame ?: return null // something else -- not supported + } + if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine! + return completion.undispatchedCompletion() // walk up the call stack with tail call +} + +/** + * Marker indicating that [UndispatchedCoroutine] exists somewhere up in the stack. + * Used as a performance optimization to avoid stack walking where it is not nesessary. + */ +private object UndispatchedMarker: CoroutineContext.Element, CoroutineContext.Key<UndispatchedMarker> { + override val key: CoroutineContext.Key<*> + get() = this +} + +// Used by withContext when context changes, but dispatcher stays the same +internal actual class UndispatchedCoroutine<in T>actual constructor ( + context: CoroutineContext, + uCont: Continuation<T> +) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) { + + private var savedContext: CoroutineContext? = null + private var savedOldValue: Any? = null + + fun saveThreadContext(context: CoroutineContext, oldValue: Any?) { + savedContext = context + savedOldValue = oldValue + } + + fun clearThreadContext(): Boolean { + if (savedContext == null) return false + savedContext = null + savedOldValue = null + return true + } + + override fun afterResume(state: Any?) { + savedContext?.let { context -> + restoreThreadContext(context, savedOldValue) + savedContext = null + savedOldValue = null + } + // resume undispatched -- update context but stay on the same dispatcher + val result = recoverResult(state, uCont) + withContinuationContext(uCont, null) { + uCont.resumeWith(result) + } + } +} + internal actual val CoroutineContext.coroutineName: String? get() { if (!DEBUG) return null val coroutineId = this[CoroutineId] ?: return null diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt b/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt index af37e73c..6d069692 100644 --- a/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt +++ b/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/Debug.kt b/kotlinx-coroutines-core/jvm/src/Debug.kt index 8108d235..911914a0 100644 --- a/kotlinx-coroutines-core/jvm/src/Debug.kt +++ b/kotlinx-coroutines-core/jvm/src/Debug.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ // Need InlineOnly for efficient bytecode on Android diff --git a/kotlinx-coroutines-core/jvm/src/DebugStrings.kt b/kotlinx-coroutines-core/jvm/src/DebugStrings.kt index 2ccfebc6..32bd07a7 100644 --- a/kotlinx-coroutines-core/jvm/src/DebugStrings.kt +++ b/kotlinx-coroutines-core/jvm/src/DebugStrings.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 787cbf9c..fe020276 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index 8033fb38..d82598ea 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") @@ -8,7 +8,6 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.scheduling.* -import java.util.* import kotlin.coroutines.* /** @@ -108,9 +107,10 @@ public actual object Dispatchers { * * ### Implementation note * - * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using - * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — - * typically execution continues in the same thread. + * This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using + * `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default] + * dispatcher does not lead to an actual switching to another thread — typically execution + * continues in the same thread. * As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used) * during operations over IO dispatcher. */ diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index d86f632a..e49c7dc7 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/Exceptions.kt b/kotlinx-coroutines-core/jvm/src/Exceptions.kt index 0684ce23..007a0c98 100644 --- a/kotlinx-coroutines-core/jvm/src/Exceptions.kt +++ b/kotlinx-coroutines-core/jvm/src/Exceptions.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("FunctionName") diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 8ffc22d8..7ea3cc68 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -1,9 +1,10 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor service is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution + * * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) override fun toString(): String = dispatcher.toString() } -private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() { - init { - initFutureCancellation() - } -} +internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { -internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay { - - private var removesFutureOnCancellation: Boolean = false - - internal fun initFutureCancellation() { - removesFutureOnCancellation = removeFutureOnCancel(executor) + /* + * Attempts to reflectively (to be Java 6 compatible) invoke + * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup + * internal scheduler queue on cancellation. + */ + init { + removeFutureOnCancel(executor) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } } - /* - * removesFutureOnCancellation is required to avoid memory leak. - * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine. - * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation. - */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock( + ResumeUndispatchedRunnable(this, continuation), + continuation.context, + timeMillis + ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { continuation.cancelFutureOnCancellation(future) @@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = if (removesFutureOnCancellation) { - scheduleBlock(block, context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) } } - private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null @@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun toString(): String = executor.toString() - override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor + override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor override fun hashCode(): Int = System.identityHashCode(executor) } diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index bd16f49a..b27a9708 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:JvmMultifileClass @@ -13,42 +13,40 @@ import java.util.concurrent.* * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCompletion { future.cancel(false) } + * invokeOnCompletion { if (it != null) future.cancel(false) } * ``` * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle = - invokeOnCompletion(handler = CancelFutureOnCompletion(this, future)) // TODO make it work only on cancellation as well? + invokeOnCompletion(handler = CancelFutureOnCompletion(future)) /** * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCancellation { future.cancel(false) } + * invokeOnCancellation { if (it != null) future.cancel(false) } * ``` */ public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit = invokeOnCancellation(handler = CancelFutureOnCancel(future)) private class CancelFutureOnCompletion( - job: Job, private val future: Future<*> -) : JobNode<Job>(job) { +) : JobNode() { override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } - override fun toString() = "CancelFutureOnCompletion[$future]" } private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler() { override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } override fun toString() = "CancelFutureOnCancel[$future]" } diff --git a/kotlinx-coroutines-core/jvm/src/Interruptible.kt b/kotlinx-coroutines-core/jvm/src/Interruptible.kt index 070aa624..b873eadf 100644 --- a/kotlinx-coroutines-core/jvm/src/Interruptible.kt +++ b/kotlinx-coroutines-core/jvm/src/Interruptible.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/Runnable.kt b/kotlinx-coroutines-core/jvm/src/Runnable.kt index 14d01105..844f9fca 100644 --- a/kotlinx-coroutines-core/jvm/src/Runnable.kt +++ b/kotlinx-coroutines-core/jvm/src/Runnable.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt b/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt index 478df822..6a00f45f 100644 --- a/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt +++ b/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt b/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt index 1fd85110..37fd70a2 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt index aa18cd38..99e3b46c 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt @@ -1,13 +1,11 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines -import kotlinx.coroutines.internal.* import java.util.concurrent.* import java.util.concurrent.atomic.AtomicInteger -import kotlin.coroutines.* /** * Creates a coroutine execution context using a single thread with built-in [yield] support. @@ -61,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = @ObsoleteCoroutinesApi public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher { require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" } - return ThreadPoolDispatcher(nThreads, name) -} - -internal class PoolThread( - @JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests - target: Runnable, name: String -) : Thread(target, name) { - init { isDaemon = true } -} - -/** - * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are - * created with [newSingleThreadContext] and [newFixedThreadPoolContext]. - */ -internal class ThreadPoolDispatcher internal constructor( - private val nThreads: Int, - private val name: String -) : ExecutorCoroutineDispatcherBase() { - private val threadNo = AtomicInteger() - - override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target -> - PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) - } - - init { - initFutureCancellation() + val threadNo = AtomicInteger() + val executor = Executors.newScheduledThreadPool(nThreads) { runnable -> + val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) + t.isDaemon = true + t } - - /** - * Closes this dispatcher -- shuts down all threads in this pool and releases resources. - */ - public override fun close() { - (executor as ExecutorService).shutdown() - } - - override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" + return executor.asCoroutineDispatcher() } diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt index a9054265..4657bc7d 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -127,7 +127,11 @@ private open class ActorCoroutine<E>( parentContext: CoroutineContext, channel: Channel<E>, active: Boolean -) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> { +) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> { + + init { + initParentJob(parentContext[Job]) + } override fun onCancelling(cause: Throwable?) { _channel.cancel(cause?.let { @@ -159,11 +163,17 @@ private class LazyActorCoroutine<E>( return super.send(element) } + @Suppress("DEPRECATION") override fun offer(element: E): Boolean { start() return super.offer(element) } + override fun trySend(element: E): ChannelResult<Unit> { + start() + return super.trySend(element) + } + override fun close(cause: Throwable?): Boolean { // close the channel _first_ val closed = super.close(cause) diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt index 2c949959..0df8278b 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:JvmMultifileClass @@ -10,18 +10,87 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* /** - * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull], - * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details). + * **Deprecated** blocking variant of send. + * This method is deprecated in the favour of [trySendBlocking]. * - * This is a way to call [Channel.send] method inside a blocking code using [runBlocking], - * so this function should not be used from coroutine. + * `sendBlocking` is a dangerous primitive — it throws an exception + * if the channel was closed or, more commonly, cancelled. + * Cancellation exceptions in non-blocking code are unexpected and frequently + * trigger internal failures. + * + * These bugs are hard-to-spot during code review and they forced users to write + * their own wrappers around `sendBlocking`. + * So this function is deprecated and replaced with a more explicit primitive. + * + * The real-world example of broken usage with Firebase: + * + * ```kotlin + * callbackFlow { + * val listener = object : ValueEventListener { + * override fun onDataChange(snapshot: DataSnapshot) { + * // This line may fail and crash the app when the downstream flow is cancelled + * sendBlocking(DataSnapshot(snapshot)) + * } + * + * override fun onCancelled(error: DatabaseError) { + * close(error.toException()) + * } + * } + * + * firebaseQuery.addValueEventListener(listener) + * awaitClose { firebaseQuery.removeEventListener(listener) } + * } + * ``` */ +@Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'trySendBlocking'. " + + "Consider handling the result of 'trySendBlocking' explicitly and rethrow exception if necessary", + replaceWith = ReplaceWith("trySendBlocking(element)") +) public fun <E> SendChannel<E>.sendBlocking(element: E) { // fast path - if (offer(element)) + if (trySend(element).isSuccess) return // slow path runBlocking { send(element) } } + +/** + * Adds [element] into to this channel, **blocking** the caller while this channel is full, + * and returning either [successful][ChannelResult.isSuccess] result when the element was added, or + * failed result representing closed channel with a corresponding exception. + * + * This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching, + * so this function should not be used from coroutine. + * + * Example of usage: + * + * ``` + * // From callback API + * channel.trySendBlocking(element) + * .onSuccess { /* request next element or debug log */ } + * .onFailure { t: Throwable? -> /* throw or log */ } + * ``` + * + * For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it. + * + * @throws [InterruptedException] if the current thread is interrupted during the blocking send operation. + */ +@Throws(InterruptedException::class) +public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> { + /* + * Sent successfully -- bail out. + * But failure may indicate either that the channel it full or that + * it is close. Go to slow path on failure to simplify the successful path and + * to materialize default exception. + */ + trySend(element).onSuccess { return ChannelResult.success(Unit) } + return runBlocking { + val r = runCatching { send(element) } + if (r.isSuccess) ChannelResult.success(Unit) + else ChannelResult.closed(r.exceptionOrNull()) + } +} diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt index 1e6797ac..6c23982e 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -21,13 +21,13 @@ public enum class TickerMode { * ``` * val channel = ticker(delay = 100) * delay(350) // 250 ms late - * println(channel.poll()) // prints Unit - * println(channel.poll()) // prints null + * println(channel.tryReceive().getOrNull()) // prints Unit + * println(channel.tryReceive().getOrNull()) // prints null * * delay(50) - * println(channel.poll()) // prints Unit, delay was adjusted + * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted * delay(50) - * println(channel.poll()) // prints null, we'are not late relatively to previous element + * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element * ``` */ FIXED_PERIOD, diff --git a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt index 5a1a1ed1..8ef0c182 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.debug diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt index 79f024cc..ffb9c2da 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.debug.internal diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt index 9d9fa3fb..6c353929 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.debug.internal @@ -27,4 +27,4 @@ internal class DebugCoroutineInfo( public val lastObservedFrame: CoroutineStackFrame? = source.lastObservedFrame // field is used as of 1.4-M3 @get:JvmName("lastObservedStackTrace") // method with this name is used as of 1.4-M3 public val lastObservedStackTrace: List<StackTraceElement> = source.lastObservedStackTrace() -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt index cf007bb9..07b9419f 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.debug.internal @@ -72,7 +72,7 @@ internal class DebugCoroutineInfoImpl( private fun creationStackTrace(): List<StackTraceElement> { val bottom = creationStackBottom ?: return emptyList() // Skip "Coroutine creation stacktrace" frame - return sequence<StackTraceElement> { yieldFrames(bottom.callerFrame) }.toList() + return sequence { yieldFrames(bottom.callerFrame) }.toList() } private tailrec suspend fun SequenceScope<StackTraceElement>.yieldFrames(frame: CoroutineStackFrame?) { diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt new file mode 100644 index 00000000..8dc5b7c2 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("unused") + +package kotlinx.coroutines.debug.internal + +import kotlin.coroutines.* + +/* + * This class is used by ByteBuddy from kotlinx-coroutines-debug as kotlin.coroutines.jvm.internal.DebugProbesKt replacement. + * In theory, it should belong to kotlinx-coroutines-debug, but placing it here significantly simplifies the + * Android AS debugger that does on-load DEX transformation + */ + +// Stubs which are injected as coroutine probes. Require direct match of signatures +internal fun probeCoroutineResumed(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineResumed(frame) + +internal fun probeCoroutineSuspended(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineSuspended(frame) +internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> = + DebugProbesImpl.probeCoroutineCreated(completion) diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt index 83bc02c6..05befc1a 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.debug.internal @@ -282,7 +282,7 @@ internal object DebugProbesImpl { it.fileName == "ContinuationImpl.kt" } - val (continuationStartFrame, frameSkipped) = findContinuationStartIndex( + val (continuationStartFrame, delta) = findContinuationStartIndex( indexOfResumeWith, actualTrace, coroutineTrace @@ -290,7 +290,6 @@ internal object DebugProbesImpl { if (continuationStartFrame == -1) return coroutineTrace - val delta = if (frameSkipped) 1 else 0 val expectedSize = indexOfResumeWith + coroutineTrace.size - continuationStartFrame - 1 - delta val result = ArrayList<StackTraceElement>(expectedSize) for (index in 0 until indexOfResumeWith - delta) { @@ -312,16 +311,22 @@ internal object DebugProbesImpl { * If method above `resumeWith` has no line number (thus it is `stateMachine.invokeSuspend`), * it's skipped and attempt to match next one is made because state machine could have been missing in the original coroutine stacktrace. * - * Returns index of such frame (or -1) and flag indicating whether frame with state machine was skipped + * Returns index of such frame (or -1) and number of skipped frames (up to 2, for state machine and for access$). */ private fun findContinuationStartIndex( indexOfResumeWith: Int, actualTrace: Array<StackTraceElement>, coroutineTrace: List<StackTraceElement> - ): Pair<Int, Boolean> { - val result = findIndexOfFrame(indexOfResumeWith - 1, actualTrace, coroutineTrace) - if (result == -1) return findIndexOfFrame(indexOfResumeWith - 2, actualTrace, coroutineTrace) to true - return result to false + ): Pair<Int, Int> { + /* + * Since Kotlin 1.5.0 we have these access$ methods that we have to skip. + * So we have to test next frame for invokeSuspend, for $access and for actual suspending call. + */ + repeat(3) { + val result = findIndexOfFrame(indexOfResumeWith - 1 - it, actualTrace, coroutineTrace) + if (result != -1) return result to it + } + return -1 to 0 } private fun findIndexOfFrame( @@ -477,33 +482,40 @@ internal object DebugProbesImpl { /* * Trim intervals of internal methods from the stacktrace (bounds are excluded from trimming) - * E.g. for sequence [e, i1, i2, i3, e, i4, e, i5, i6, e7] + * E.g. for sequence [e, i1, i2, i3, e, i4, e, i5, i6, i7] * output will be [e, i1, i3, e, i4, e, i5, i7] + * + * If an interval of internal methods ends in a synthetic method, the outermost non-synthetic method in that + * interval will also be included. */ val result = ArrayList<StackTraceElement>(size - probeIndex + 1) result += createArtificialFrame(ARTIFICIAL_FRAME_MESSAGE) - var includeInternalFrame = true - for (i in (probeIndex + 1) until size - 1) { - val element = stackTrace[i] - if (!element.isInternalMethod) { - includeInternalFrame = true - result += element - continue - } - - if (includeInternalFrame) { - result += element - includeInternalFrame = false - } else if (stackTrace[i + 1].isInternalMethod) { - continue + var i = probeIndex + 1 + while (i < size) { + if (stackTrace[i].isInternalMethod) { + result += stackTrace[i] // we include the boundary of the span in any case + // first index past the end of the span of internal methods that starts from `i` + var j = i + 1 + while (j < size && stackTrace[j].isInternalMethod) { + ++j + } + // index of the last non-synthetic internal methods in this span, or `i` if there are no such methods + var k = j - 1 + while (k > i && stackTrace[k].fileName == null) { + k -= 1 + } + if (k > i && k < j - 1) { + /* there are synthetic internal methods at the end of this span, but there is a non-synthetic method + after `i`, so we include it. */ + result += stackTrace[k] + } + result += stackTrace[j - 1] // we include the other boundary of this span in any case, too + i = j } else { - result += element - includeInternalFrame = true + result += stackTrace[i] + ++i } - } - - result += stackTrace[size - 1] return result } diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt index 3e9533b0..0b63cd23 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("UNUSED") diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt index 37c60eeb..40101192 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.debug.internal @@ -14,4 +14,4 @@ internal class StackTraceFrame( private val stackTraceElement: StackTraceElement ) : CoroutineStackFrame { override fun getStackTraceElement(): StackTraceElement = stackTraceElement -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt index 4fb958ac..d178060d 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.flow.internal diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt index ab42b634..ea973287 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.flow.internal @@ -36,7 +36,7 @@ internal actual class SafeCollector<T> actual constructor( override val context: CoroutineContext get() = completion?.context ?: EmptyCoroutineContext - override fun invokeSuspend(result: Result<Any?>): Any? { + override fun invokeSuspend(result: Result<Any?>): Any { result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } completion?.resumeWith(result as Result<Unit>) return COROUTINE_SUSPENDED diff --git a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt index 75b668a3..050b9747 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -9,7 +9,7 @@ import java.util.* import java.util.concurrent.* import kotlin.concurrent.withLock as withLockJvm -internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteArrayList<E>() +internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteArrayList() @Suppress("ACTUAL_WITHOUT_EXPECT") internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock diff --git a/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt b/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt index a03163db..60328ebd 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal diff --git a/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt b/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt index 30cd09ef..e93de2aa 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal diff --git a/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt index f508749e..9dda30b5 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt index 97f99781..9bbc6dc9 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE") @@ -11,13 +11,13 @@ import kotlinx.coroutines.* private typealias Node = LockFreeLinkedListNode @PublishedApi -internal const val UNDECIDED = 0 +internal const val UNDECIDED: Int = 0 @PublishedApi -internal const val SUCCESS = 1 +internal const val SUCCESS: Int = 1 @PublishedApi -internal const val FAILURE = 2 +internal const val FAILURE: Int = 2 @PublishedApi internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE") @@ -259,7 +259,7 @@ public actual open class LockFreeLinkedListNode { // Helps with removal of this node public actual fun helpRemove() { // Note: this node must be already removed - (next as Removed).ref.correctPrev(null) + (next as Removed).ref.helpRemovePrev() } // Helps with removal of nodes that are previous to this @@ -322,7 +322,7 @@ public actual open class LockFreeLinkedListNode { private val _affectedNode = atomic<Node?>(null) final override val affectedNode: Node? get() = _affectedNode.value - final override val originalNext: Node? get() = queue + final override val originalNext: Node get() = queue override fun retry(affected: Node, next: Any): Boolean = next !== queue diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 5b2b9ff6..2d447413 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -35,7 +35,7 @@ internal object MainDispatcherLoader { ).iterator().asSequence().toList() } @Suppress("ConstantConditionIf") - factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories) + factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories) ?: createMissingDispatcher() } catch (e: Throwable) { // Service loader can throw an exception as well @@ -67,7 +67,10 @@ public fun MainCoroutineDispatcher.isMissing(): Boolean = this is MissingMainCor @Suppress("MayBeConstant") private val SUPPORT_MISSING = true -@Suppress("ConstantConditionIf") +@Suppress( + "ConstantConditionIf", + "IMPLICIT_NOTHING_TYPE_ARGUMENT_AGAINST_NOT_NOTHING_EXPECTED_TYPE" // KT-47626 +) private fun createMissingDispatcher(cause: Throwable? = null, errorHint: String? = null) = if (SUPPORT_MISSING) MissingMainCoroutineDispatcher(cause, errorHint) else cause?.let { throw it } ?: throwMissingMainDispatcherException() diff --git a/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt b/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt index 2f4d1e05..48e01fbe 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("NOTHING_TO_INLINE", "INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") diff --git a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt index 208d3f2e..6153862e 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt @@ -1,8 +1,8 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("UNCHECKED_CAST", "NO_EXPLICIT_VISIBILITY_IN_API_MODE") +@file:Suppress("UNCHECKED_CAST") package kotlinx.coroutines.internal @@ -29,7 +29,7 @@ private val stackTraceRecoveryClassName = runCatching { internal actual fun <E : Throwable> recoverStackTrace(exception: E): E { if (!RECOVER_STACK_TRACES) return exception // No unwrapping on continuation-less path: exception is not reported multiple times via slow paths - val copy = tryCopyException(exception) ?: return exception + val copy = tryCopyAndVerify(exception) ?: return exception return copy.sanitizeStackTrace() } @@ -66,9 +66,7 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co val (cause, recoveredStacktrace) = exception.causeAndStacktrace() // Try to create an exception of the same type and get stacktrace from continuation - val newException = tryCopyException(cause) ?: return exception - // Verify that the new exception has the same message as the original one (bail out if not, see #1631) - if (newException.message != cause.message) return exception + val newException = tryCopyAndVerify(cause) ?: return exception // Update stacktrace val stacktrace = createStackTrace(continuation) if (stacktrace.isEmpty()) return exception @@ -80,6 +78,14 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co return createFinalException(cause, newException, stacktrace) } +private fun <E : Throwable> tryCopyAndVerify(exception: E): E? { + val newException = tryCopyException(exception) ?: return null + // Verify that the new exception has the same message as the original one (bail out if not, see #1631) + // CopyableThrowable has control over its message and thus can modify it the way it wants + if (exception !is CopyableThrowable<*> && newException.message != exception.message) return null + return newException +} + /* * Here we partially copy original exception stackTrace to make current one much prettier. * E.g. for @@ -210,6 +216,7 @@ internal actual typealias CoroutineStackFrame = kotlin.coroutines.jvm.internal.C @Suppress("ACTUAL_WITHOUT_EXPECT") internal actual typealias StackTraceElement = java.lang.StackTraceElement +@Suppress("EXTENSION_SHADOWED_BY_MEMBER") internal actual fun Throwable.initCause(cause: Throwable) { // Resolved to member, verified by test initCause(cause) diff --git a/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt b/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt index 2b57b26c..6284f3a0 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal diff --git a/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt b/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt index bf34c1a9..73853720 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:JvmName("SystemPropsKt") diff --git a/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt b/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt index 9d9d30e4..8536cef6 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -7,17 +7,26 @@ package kotlinx.coroutines.internal import kotlinx.coroutines.* import kotlin.coroutines.* - -private val ZERO = Symbol("ZERO") +@JvmField +internal val NO_THREAD_ELEMENTS = Symbol("NO_THREAD_ELEMENTS") // Used when there are >= 2 active elements in the context -private class ThreadState(val context: CoroutineContext, n: Int) { - private var a = arrayOfNulls<Any>(n) +@Suppress("UNCHECKED_CAST") +private class ThreadState(@JvmField val context: CoroutineContext, n: Int) { + private val values = arrayOfNulls<Any>(n) + private val elements = arrayOfNulls<ThreadContextElement<Any?>>(n) private var i = 0 - fun append(value: Any?) { a[i++] = value } - fun take() = a[i++] - fun start() { i = 0 } + fun append(element: ThreadContextElement<*>, value: Any?) { + values[i] = value + elements[i++] = element as ThreadContextElement<Any?> + } + + fun restore(context: CoroutineContext) { + for (i in elements.indices.reversed()) { + elements[i]!!.restoreThreadContext(context, values[i]) + } + } } // Counts ThreadContextElements in the context @@ -42,17 +51,7 @@ private val findOne = private val updateState = fun (state: ThreadState, element: CoroutineContext.Element): ThreadState { if (element is ThreadContextElement<*>) { - state.append(element.updateThreadContext(state.context)) - } - return state - } - -// Restores state for all ThreadContextElements in the context from the given ThreadState -private val restoreState = - fun (state: ThreadState, element: CoroutineContext.Element): ThreadState { - @Suppress("UNCHECKED_CAST") - if (element is ThreadContextElement<*>) { - (element as ThreadContextElement<Any?>).restoreThreadContext(state.context, state.take()) + state.append(element, element.updateThreadContext(state.context)) } return state } @@ -60,12 +59,13 @@ private val restoreState = internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!! // countOrElement is pre-cached in dispatched continuation +// returns NO_THREAD_ELEMENTS if the contest does not have any ThreadContextElements internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? { @Suppress("NAME_SHADOWING") val countOrElement = countOrElement ?: threadContextElements(context) @Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS") return when { - countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements + countOrElement === 0 -> NO_THREAD_ELEMENTS // very fast path when there are no active ThreadContextElements // ^^^ identity comparison for speed, we know zero always has the same identity countOrElement is Int -> { // slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values @@ -82,11 +82,10 @@ internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any? internal fun restoreThreadContext(context: CoroutineContext, oldState: Any?) { when { - oldState === ZERO -> return // very fast path when there are no ThreadContextElements + oldState === NO_THREAD_ELEMENTS -> return // very fast path when there are no ThreadContextElements oldState is ThreadState -> { // slow path with multiple stored ThreadContextElements - oldState.start() - context.fold(oldState, restoreState) + oldState.restore(context) } else -> { // fast path for one ThreadContextElement, but need to find it diff --git a/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt b/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt index ff0970a2..0207334a 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 62cf80f7..84d9d9f8 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.scheduling @@ -146,7 +146,7 @@ internal class CoroutineScheduler( * * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]). */ - internal fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) { + fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) { parkedWorkersStack.loop { top -> val index = (top and PARKED_INDEX_MASK).toInt() val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK @@ -169,12 +169,12 @@ internal class CoroutineScheduler( * It does nothing is this worker is already physically linked to the stack. * This method is invoked only from the worker thread itself. * This invocation always precedes [LockSupport.parkNanos]. - * See [Worker.doPark]. + * See [Worker.tryPark]. * * Returns `true` if worker was added to the stack by this invocation, `false` if it was already * registered in the stack. */ - internal fun parkedWorkersStackPush(worker: Worker): Boolean { + fun parkedWorkersStackPush(worker: Worker): Boolean { if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out /* * The below loop can be entered only if this worker was not in the stack and, since no other thread @@ -403,7 +403,7 @@ internal class CoroutineScheduler( } } - internal fun createTask(block: Runnable, taskContext: TaskContext): Task { + fun createTask(block: Runnable, taskContext: TaskContext): Task { val nanoTime = schedulerTimeSource.nanoTime() if (block is Task) { block.submissionTime = nanoTime @@ -422,7 +422,7 @@ internal class CoroutineScheduler( tryUnpark() // Try unpark again in case there was race between permit release and parking } - internal fun signalCpuWork() { + fun signalCpuWork() { if (tryUnpark()) return if (tryCreateWorker()) return tryUnpark() @@ -654,7 +654,7 @@ internal class CoroutineScheduler( * Releases CPU token if worker has any and changes state to [newState]. * Returns `true` if CPU permit was returned to the pool */ - internal fun tryReleaseCpu(newState: WorkerState): Boolean { + fun tryReleaseCpu(newState: WorkerState): Boolean { val previousState = state val hadCpu = previousState == WorkerState.CPU_ACQUIRED if (hadCpu) releaseCpuPermit() @@ -721,7 +721,19 @@ internal class CoroutineScheduler( } assert { localQueue.size == 0 } workerCtl.value = PARKED // Update value once - while (inStack()) { // Prevent spurious wakeups + /* + * inStack() prevents spurious wakeups, while workerCtl.value == PARKED + * prevents the following race: + * + * - T2 scans the queue, adds itself to the stack, goes to rescan + * - T2 suspends in 'workerCtl.value = PARKED' line + * - T1 pops T2 from the stack, claims workerCtl, suspends + * - T2 fails 'while (inStack())' check, goes to full rescan + * - T2 adds itself to the stack, parks + * - T1 unparks T2, bails out with success + * - T2 unparks and loops in 'while (inStack())' + */ + while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups if (isTerminated || state == WorkerState.TERMINATED) break tryReleaseCpu(WorkerState.PARKING) interrupted() // Cleanup interruptions @@ -762,7 +774,7 @@ internal class CoroutineScheduler( * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes. * ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks */ - internal fun nextInt(upperBound: Int): Int { + fun nextInt(upperBound: Int): Int { var r = rngState r = r xor (r shl 13) r = r xor (r shr 17) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 202c6e1d..7227b07c 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.scheduling diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index a317b975..da867c98 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.scheduling @@ -52,7 +52,7 @@ internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos( ) @JvmField -internal var schedulerTimeSource: TimeSource = NanoTimeSource +internal var schedulerTimeSource: SchedulerTimeSource = NanoTimeSource /** * Marker indicating that task is CPU-bound and will not block @@ -108,10 +108,11 @@ internal class TaskImpl( // Open for tests internal class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false) -internal abstract class TimeSource { +// Was previously TimeSource, renamed due to KT-42625 and KT-23727 +internal abstract class SchedulerTimeSource { abstract fun nanoTime(): Long } -internal object NanoTimeSource : TimeSource() { +internal object NanoTimeSource : SchedulerTimeSource() { override fun nanoTime() = System.nanoTime() } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index 354b3a1b..6a9a8a5a 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.scheduling diff --git a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt index 649c9537..8526ca21 100644 --- a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.test diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt deleted file mode 100644 index cfed5af4..00000000 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt +++ /dev/null @@ -1,10 +0,0 @@ -kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350 - (Coroutine boundary) - at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:170) - at kotlinx.coroutines.channels.ChannelCoroutine.offer(ChannelCoroutine.kt) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:153) -Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350 - at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:599) - at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:164) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:151) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt deleted file mode 100644 index ac8f5f4e..00000000 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt +++ /dev/null @@ -1,8 +0,0 @@ -kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43) - (Coroutine boundary) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest.channelReceiveOrNull(StackTraceRecoveryChannelsTest.kt:70) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:44) -Caused by: kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt index 2d480861..3bfd08e5 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt @@ -1,7 +1,7 @@ kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$$inlined$select$lambda$1.invokeSuspend(StackTraceRecoverySelectTest.kt:33) + at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$2$1.invokeSuspend(StackTraceRecoverySelectTest.kt) (Coroutine boundary) - at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$testSelectJoin$1.invokeSuspend(StackTraceRecoverySelectTest.kt:20) + at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$testSelectJoin$1.invokeSuspend(StackTraceRecoverySelectTest.kt) Caused by: kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$$inlined$select$lambda$1.invokeSuspend(StackTraceRecoverySelectTest.kt:33) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
\ No newline at end of file + at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$2$1.invokeSuspend(StackTraceRecoverySelectTest.kt) + at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt) diff --git a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt new file mode 100644 index 00000000..89bbbfd7 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +package kotlinx.coroutines + +import org.jetbrains.kotlinx.lincheck.* +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* +import org.jetbrains.kotlinx.lincheck.strategy.stress.* +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.junit.* + +abstract class AbstractLincheckTest : VerifierState() { + open fun <O: Options<O, *>> O.customize(isStressTest: Boolean): O = this + open fun ModelCheckingOptions.customize(isStressTest: Boolean): ModelCheckingOptions = this + open fun StressOptions.customize(isStressTest: Boolean): StressOptions = this + + @Test + fun modelCheckingTest() = ModelCheckingOptions() + .iterations(if (isStressTest) 100 else 20) + .invocationsPerIteration(if (isStressTest) 10_000 else 1_000) + .commonConfiguration() + .customize(isStressTest) + .check(this::class) + + @Test + fun stressTest() = StressOptions() + .iterations(if (isStressTest) 100 else 20) + .invocationsPerIteration(if (isStressTest) 10_000 else 1_000) + .commonConfiguration() + .customize(isStressTest) + .check(this::class) + + private fun <O : Options<O, *>> O.commonConfiguration(): O = this + .actorsBefore(if (isStressTest) 3 else 1) + .threads(3) + .actorsPerThread(if (isStressTest) 4 else 2) + .actorsAfter(if (isStressTest) 3 else 0) + .customize(isStressTest) + + override fun extractState(): Any = error("Not implemented") +} diff --git a/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt b/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt index 02675208..59ff76a5 100644 --- a/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt @@ -8,7 +8,10 @@ import kotlin.test.* class AsyncJvmTest : TestBase() { - // This must be a common test but it fails on JS because of KT-21961 + // We have the same test in common module, but the maintainer uses this particular file + // and semi-automatically types cmd+N + AsyncJvm in order to duck-tape any JVM samples/repros, + // please do not remove this test + @Test fun testAsyncWithFinally() = runTest { expect(1) diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt index 55c05c55..c7c2c04e 100644 --- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() { private fun keepMe(a: ByteArray) { // does nothing, makes sure the variable is kept in state-machine } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt new file mode 100644 index 00000000..dbe9cb37 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import kotlin.test.* + +class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { + + private var callsToSchedule = 0 + + private inner class STPE : ScheduledThreadPoolExecutor(1) { + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + if (delay != 0L) ++callsToSchedule + return super.schedule(command, delay, unit) + } + } + + private inner class SES : ScheduledExecutorService by STPE() + + @Test + fun testScheduledThreadPool() = runTest { + val executor = STPE() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } + + @Test + fun testScheduledExecutorService() = runTest { + val executor = SES() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt index 15cb83ce..8a7878c9 100644 --- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt @@ -70,8 +70,18 @@ class FailFastOnStartTest : TestBase() { val actor = actor<Int>(Dispatchers.Main, start = CoroutineStart.LAZY) { fail() } actor.send(1) } - + private fun mainException(e: Throwable): Boolean { return e is IllegalStateException && e.message?.contains("Module with the Main dispatcher is missing") ?: false } + + @Test + fun testProduceNonChild() = runTest(expected = ::mainException) { + produce<Int>(Job() + Dispatchers.Main) { fail() } + } + + @Test + fun testAsyncNonChild() = runTest(expected = ::mainException) { + async<Int>(Job() + Dispatchers.Main) { fail() } + } } diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt index c9f722a5..04b0ba54 100644 --- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt @@ -33,7 +33,7 @@ class FailingCoroutinesMachineryTest( private var caught: Throwable? = null private val latch = CountDownLatch(1) - private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t;latch.countDown() } + private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t; latch.countDown() } private val lazyOuterDispatcher = lazy { newFixedThreadPoolContext(1, "") } private object FailingUpdate : ThreadContextElement<Unit> { @@ -115,14 +115,20 @@ class FailingCoroutinesMachineryTest( @Test fun testElement() = runTest { - launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} + // Top-level throwing dispatcher may rethrow an exception right here + runCatching { + launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} + } checkException() } @Test fun testNestedElement() = runTest { - launch(NonCancellable + dispatcher.value + exceptionHandler) { - launch(element) { } + // Top-level throwing dispatcher may rethrow an exception right here + runCatching { + launch(NonCancellable + dispatcher.value + exceptionHandler) { + launch(element) { } + } } checkException() } diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt index e8079ebd..179b2e5e 100644 --- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt +++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -11,7 +11,6 @@ import java.util.* import java.util.Collections.* import java.util.concurrent.atomic.* import java.util.concurrent.locks.* -import kotlin.collections.ArrayList import kotlin.test.* object FieldWalker { @@ -56,7 +55,7 @@ object FieldWalker { * Reflectively starts to walk through object graph and map to all the reached object to their path * in from root. Use [showPath] do display a path if needed. */ - private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> { + private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> { val visited = IdentityHashMap<Any, Ref>() if (root == null) return visited visited[root] = Ref.RootRef @@ -90,6 +89,7 @@ object FieldWalker { cur = ref.parent path += "[${ref.index}]" } + else -> error("Should not be reached") } } path.reverse() @@ -154,8 +154,9 @@ object FieldWalker { while (true) { val fields = type.declaredFields.filter { !it.type.isPrimitive - && (statics || !Modifier.isStatic(it.modifiers)) - && !(it.type.isArray && it.type.componentType.isPrimitive) + && (statics || !Modifier.isStatic(it.modifiers)) + && !(it.type.isArray && it.type.componentType.isPrimitive) + && it.name != "previousOut" // System.out from TestBase that we store in a field to restore later } fields.forEach { it.isAccessible = true } // make them all accessible result.addAll(fields) diff --git a/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt new file mode 100644 index 00000000..6bbfdd1b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import kotlin.coroutines.* +import kotlin.test.* + +class IntellijIdeaDebuggerEvaluatorCompatibilityTest { + + /* + * This test verifies that our CoroutineScope is accessible to IDEA debugger. + * + * Consider the following scenario: + * ``` + * runBlocking<Unit> { // this: CoroutineScope + * println("runBlocking") + * } + * ``` + * user puts breakpoint to `println` line, opens "Evaluate" window + * and executes `launch { println("launch") }`. They (obviously) expect it to work, but + * it won't: `{}` in `runBlocking` is `SuspendLambda` and `this` is an unused implicit receiver + * that is removed by the compiler (because it's unused). + * + * But we still want to provide consistent user experience for functions with `CoroutineScope` receiver, + * for that IDEA debugger tries to retrieve the scope via `kotlin.coroutines.coroutineContext[Job] as? CoroutineScope` + * and with this test we're fixing this behaviour. + * + * Note that this behaviour is not carved in stone: IDEA fallbacks to `kotlin.coroutines.coroutineContext` for the context if necessary. + */ + + @Test + fun testScopeIsAccessible() = runBlocking<Unit> { + verify() + + withContext(Job()) { + verify() + } + + coroutineScope { + verify() + } + + supervisorScope { + verify() + } + + } + + private suspend fun verify() { + val ctx = coroutineContext + assertTrue { ctx.job is CoroutineScope } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt index 5090e7c0..6d474185 100644 --- a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt b/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt deleted file mode 100644 index 62ded9f9..00000000 --- a/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -package kotlinx.coroutines - -import org.jetbrains.kotlinx.lincheck.* -import org.jetbrains.kotlinx.lincheck.strategy.stress.* -import kotlin.reflect.* - -class LCStressOptionsDefault : StressOptions() { - init { - iterations(100 * stressTestMultiplierCbrt) - invocationsPerIteration(1000 * stressTestMultiplierCbrt) - actorsBefore(if (isStressTest) 3 else 0) - threads(3) - actorsPerThread(if (isStressTest) 3 else 2) - } -} - -fun Options<*,*>.check(testClass: KClass<*>) = LinChecker.check(testClass.java, this)
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt new file mode 100644 index 00000000..8a20e084 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import org.junit.Test +import kotlin.test.* + +class ReusableCancellableContinuationLeakStressTest : TestBase() { + + @Suppress("UnnecessaryVariable") + private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T { + val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in + return r + } + + private val iterations = 100_000 * stressTestMultiplier + + class Leak(val i: Int) + + @Test // Simplified version of #2564 + fun testReusableContinuationLeak() = runTest { + val channel = produce(capacity = 1) { // from the main thread + (0 until iterations).forEach { + send(Leak(it)) + } + } + + launch(Dispatchers.Default) { + repeat (iterations) { + val value = channel.receiveBatch() + assertEquals(it, value.i) + } + (channel as Job).join() + + FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt index 56f1e283..06839f4a 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -39,7 +39,7 @@ class ReusableCancellableContinuationTest : TestBase() { repeat(iterations) { suspender { - assertTrue(channel.offer(it)) + assertTrue(channel.trySend(it).isSuccess) } } channel.close() diff --git a/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt new file mode 100644 index 00000000..a256815d --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.flow.* +import org.junit.* + +class ReusableContinuationStressTest : TestBase() { + + private val iterations = 1000 * stressTestMultiplierSqrt + + @Test // Originally reported by @denis-bezrukov in #2736 + fun testDebounceWithStateFlow() = runBlocking<Unit> { + withContext(Dispatchers.Default) { + repeat(iterations) { + launch { // <- load the dispatcher and OS scheduler + runStressTestOnce(1, 1) + } + } + } + } + + private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope { + val stateFlow = MutableStateFlow(0) + val emitter = launch { + repeat(1000) { i -> + stateFlow.emit(i) + delay(delay.toLong()) + } + } + var last = 0 + stateFlow.debounce(debounce.toLong()).take(100).collect { i -> + if (i - last > 100) { + last = i + } + } + emitter.cancel() + } +} diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt index e20362ff..de38df6b 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -171,4 +171,15 @@ class RunBlockingTest : TestBase() { } rb.hashCode() // unused } + + @Test + fun testCancelledParent() { + val job = Job() + job.cancel() + assertFailsWith<CancellationException> { + runBlocking(job) { + expectUnreached() + } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt index e755b17d..49c93c7f 100644 --- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt @@ -41,7 +41,7 @@ class RunInterruptibleTest : TestBase() { val job = launch { runInterruptible(Dispatchers.IO) { expect(2) - latch.offer(Unit) + latch.trySend(Unit) try { Thread.sleep(10_000L) expectUnreached() diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index 17238e87..61a2c8b8 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -7,11 +7,10 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlinx.coroutines.scheduling.* import org.junit.* -import java.lang.Math.* +import java.io.* import java.util.* import java.util.concurrent.atomic.* import kotlin.coroutines.* -import kotlin.math.* import kotlin.test.* private val VERBOSE = systemProp("test.verbose", false) @@ -23,12 +22,15 @@ public actual val isStressTest = System.getProperty("stressTest")?.toBoolean() ? public val stressTestMultiplierSqrt = if (isStressTest) 5 else 1 +private const val SHUTDOWN_TIMEOUT = 1_000L // 1s at most to wait per thread + /** * Multiply various constants in stress tests by this factor, so that they run longer during nightly stress test. */ public actual val stressTestMultiplier = stressTestMultiplierSqrt * stressTestMultiplierSqrt -public val stressTestMultiplierCbrt = cbrt(stressTestMultiplier.toDouble()).roundToInt() +@Suppress("ACTUAL_WITHOUT_EXPECT") +public actual typealias TestResult = Unit /** * Base class for tests, so that tests for predictable scheduling of actions in multiple coroutines sharing a single @@ -49,7 +51,11 @@ public val stressTestMultiplierCbrt = cbrt(stressTestMultiplier.toDouble()).roun * } * ``` */ -public actual open class TestBase actual constructor() { +public actual open class TestBase(private var disableOutCheck: Boolean) { + + actual constructor(): this(false) + + public actual val isBoundByJsTestTimeout = false private var actionIndex = AtomicInteger() private var finished = AtomicBoolean() private var error = AtomicReference<Throwable>() @@ -58,9 +64,15 @@ public actual open class TestBase actual constructor() { private lateinit var threadsBefore: Set<Thread> private val uncaughtExceptions = Collections.synchronizedList(ArrayList<Throwable>()) private var originalUncaughtExceptionHandler: Thread.UncaughtExceptionHandler? = null - private val SHUTDOWN_TIMEOUT = 1_000L // 1s at most to wait per thread + /* + * System.out that we redefine in order to catch any debugging/diagnostics + * 'println' from main source set. + * NB: We do rely on the name 'previousOut' in the FieldWalker in order to skip its + * processing + */ + private lateinit var previousOut: PrintStream - /** + /** * Throws [IllegalStateException] like `error` in stdlib, but also ensures that the test will not * complete successfully even if this exception is consumed somewhere in the test. */ @@ -113,7 +125,7 @@ public actual open class TestBase actual constructor() { } /** - * Asserts that this it the last action in the test. It must be invoked by any test that used [expect]. + * Asserts that this is the last action in the test. It must be invoked by any test that used [expect]. */ public actual fun finish(index: Int) { expect(index) @@ -133,6 +145,16 @@ public actual open class TestBase actual constructor() { finished.set(false) } + private object TestOutputStream : PrintStream(object : OutputStream() { + override fun write(b: Int) { + error("Detected unexpected call to 'println' from source code") + } + }) + + fun println(message: Any?) { + previousOut.println(message) + } + @Before fun before() { initPoolsBeforeTest() @@ -143,6 +165,10 @@ public actual open class TestBase actual constructor() { e.printStackTrace() uncaughtExceptions.add(e) } + if (!disableOutCheck) { + previousOut = System.out + System.setOut(TestOutputStream) + } } @After @@ -154,7 +180,7 @@ public actual open class TestBase actual constructor() { } // Shutdown all thread pools shutdownPoolsAfterTest() - // Check that that are now leftover threads + // Check that are now leftover threads runCatching { checkTestThreads(threadsBefore) }.onFailure { @@ -162,6 +188,9 @@ public actual open class TestBase actual constructor() { } // Restore original uncaught exception handler Thread.setDefaultUncaughtExceptionHandler(originalUncaughtExceptionHandler) + if (!disableOutCheck) { + System.setOut(previousOut) + } if (uncaughtExceptions.isNotEmpty()) { makeError("Expected no uncaught exceptions, but got $uncaughtExceptions") } @@ -187,7 +216,7 @@ public actual open class TestBase actual constructor() { expected: ((Throwable) -> Boolean)? = null, unhandled: List<(Throwable) -> Boolean> = emptyList(), block: suspend CoroutineScope.() -> Unit - ) { + ): TestResult { var exCount = 0 var ex: Throwable? = null try { diff --git a/kotlinx-coroutines-core/jvm/test/ThreadContextElementRestoreTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadContextElementRestoreTest.kt new file mode 100644 index 00000000..e2ab4d72 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ThreadContextElementRestoreTest.kt @@ -0,0 +1,198 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import kotlin.coroutines.* +import kotlin.test.* + +class ThreadContextElementRestoreTest : TestBase() { + private val tl = ThreadLocal<String?>() + + // Checks that ThreadLocal context is properly restored after executing the given block inside + // withContext(tl.asContextElement("OK")) code running in different outer contexts + private inline fun check(crossinline block: suspend () -> Unit) = runTest { + val mainDispatcher = coroutineContext[ContinuationInterceptor] as CoroutineDispatcher + // Scenario #1: withContext(ThreadLocal) direct from runTest + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + // Scenario #2: withContext(ThreadLocal) from coroutineScope + coroutineScope { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + // Scenario #3: withContext(ThreadLocal) from undispatched withContext + withContext(CoroutineName("NAME")) { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + // Scenario #4: withContext(ThreadLocal) from dispatched withContext + withContext(wrapperDispatcher()) { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + // Scenario #5: withContext(ThreadLocal) from withContext(ThreadLocal) + withContext(tl.asContextElement(null)) { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + // Scenario #6: withContext(ThreadLocal) from withTimeout + withTimeout(1000) { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + // Scenario #7: withContext(ThreadLocal) from withContext(Unconfined) + withContext(Dispatchers.Unconfined) { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + // Scenario #8: withContext(ThreadLocal) from withContext(Default) + withContext(Dispatchers.Default) { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + // Scenario #9: withContext(ThreadLocal) from withContext(mainDispatcher) + withContext(mainDispatcher) { + withContext(tl.asContextElement("OK")) { + block() + assertEquals("OK", tl.get()) + } + assertEquals(null, tl.get()) + } + } + + @Test + fun testSimpleNoSuspend() = + check {} + + @Test + fun testSimpleDelay() = check { + delay(1) + } + + @Test + fun testSimpleYield() = check { + yield() + } + + private suspend fun deepDelay() { + deepDelay2(); deepDelay2() + } + + private suspend fun deepDelay2() { + delay(1); delay(1) + } + + @Test + fun testDeepDelay() = check { + deepDelay() + } + + private suspend fun deepYield() { + deepYield2(); deepYield2() + } + + private suspend fun deepYield2() { + yield(); yield() + } + + @Test + fun testDeepYield() = check { + deepYield() + } + + @Test + fun testCoroutineScopeDelay() = check { + coroutineScope { + delay(1) + } + } + + @Test + fun testCoroutineScopeYield() = check { + coroutineScope { + yield() + } + } + + @Test + fun testWithContextUndispatchedDelay() = check { + withContext(CoroutineName("INNER")) { + delay(1) + } + } + + @Test + fun testWithContextUndispatchedYield() = check { + withContext(CoroutineName("INNER")) { + yield() + } + } + + @Test + fun testWithContextDispatchedDelay() = check { + withContext(wrapperDispatcher()) { + delay(1) + } + } + + @Test + fun testWithContextDispatchedYield() = check { + withContext(wrapperDispatcher()) { + yield() + } + } + + @Test + fun testWithTimeoutDelay() = check { + withTimeout(1000) { + delay(1) + } + } + + @Test + fun testWithTimeoutYield() = check { + withTimeout(1000) { + yield() + } + } + + @Test + fun testWithUnconfinedContextDelay() = check { + withContext(Dispatchers.Unconfined) { + delay(1) + } + } + @Test + fun testWithUnconfinedContextYield() = check { + withContext(Dispatchers.Unconfined) { + yield() + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/ThreadContextOrderTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadContextOrderTest.kt new file mode 100644 index 00000000..49f4a12e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ThreadContextOrderTest.kt @@ -0,0 +1,65 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.internal.* +import org.junit.Test +import kotlin.coroutines.* +import kotlin.test.* + +class ThreadContextOrderTest : TestBase() { + /* + * The test verifies that two thread context elements are correctly nested: + * The restoration order is the reverse of update order. + */ + private val transactionalContext = ThreadLocal<String>() + private val loggingContext = ThreadLocal<String>() + + private val transactionalElement = object : ThreadContextElement<String> { + override val key = ThreadLocalKey(transactionalContext) + + override fun updateThreadContext(context: CoroutineContext): String { + assertEquals("test", loggingContext.get()) + val previous = transactionalContext.get() + transactionalContext.set("tr coroutine") + return previous + } + + override fun restoreThreadContext(context: CoroutineContext, oldState: String) { + assertEquals("test", loggingContext.get()) + assertEquals("tr coroutine", transactionalContext.get()) + transactionalContext.set(oldState) + } + } + + private val loggingElement = object : ThreadContextElement<String> { + override val key = ThreadLocalKey(loggingContext) + + override fun updateThreadContext(context: CoroutineContext): String { + val previous = loggingContext.get() + loggingContext.set("log coroutine") + return previous + } + + override fun restoreThreadContext(context: CoroutineContext, oldState: String) { + assertEquals("log coroutine", loggingContext.get()) + assertEquals("tr coroutine", transactionalContext.get()) + loggingContext.set(oldState) + } + } + + @Test + fun testCorrectOrder() = runTest { + transactionalContext.set("test") + loggingContext.set("test") + launch(transactionalElement + loggingElement) { + assertEquals("log coroutine", loggingContext.get()) + assertEquals("tr coroutine", transactionalContext.get()) + } + assertEquals("test", loggingContext.get()) + assertEquals("test", transactionalContext.get()) + + } +} diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index ca399f53..bd9a185f 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -42,7 +42,7 @@ private const val REAL_PARK_NANOS = 10_000_000L // 10 ms -- park for a little to @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") internal class VirtualTimeSource( private val log: PrintStream? -) : TimeSource { +) : AbstractTimeSource() { private val mainThread: Thread = Thread.currentThread() private var checkpointNanos: Long = System.nanoTime() @@ -142,7 +142,7 @@ internal class VirtualTimeSource( } private fun minParkedTill(): Long = - threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED + threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED @Synchronized fun shutdown() { diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt index ae95e694..d3b2ff12 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -78,4 +78,14 @@ class ActorLazyTest : TestBase() { job.join() finish(5) } -}
\ No newline at end of file + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor<Int>(start = CoroutineStart.LAZY) { + expectUnreached() + } + finish(2) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt index bdca5039..5a2778d5 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -69,11 +69,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithoutCause() = runTest { val actor = actor<Int>(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) assertEquals(42, element) - val next = channel.receiveOrNull() - assertNull(next) + val next = channel.receiveCatching() + assertNull(next.exceptionOrNull()) expect(3) } @@ -88,11 +88,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithCause() = runTest { val actor = actor<Int>(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) - require(element!! == 42) + require(element == 42) try { - channel.receiveOrNull() + channel.receive() } catch (e: IOException) { expect(3) } @@ -111,7 +111,7 @@ class ActorTest(private val capacity: Int) : TestBase() { val job = async { actor<Int>(capacity = capacity) { expect(1) - channel.receiveOrNull() + channel.receive() expectUnreached() } } @@ -173,11 +173,24 @@ class ActorTest(private val capacity: Int) : TestBase() { fun testCloseFreshActor() = runTest { for (start in CoroutineStart.values()) { val job = launch { - val actor = actor<Int>(start = start) { for (i in channel) {} } + val actor = actor<Int>(start = start) { + for (i in channel) { + } + } actor.close() } job.join() } } + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor<Int> { + expectUnreached() + } + finish(2) + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt index 2e73b243..8c9777b4 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -67,10 +67,10 @@ class BroadcastChannelMultiReceiveStressTest( val channel = broadcast.openSubscription() when (receiverIndex % 5) { 0 -> doReceive(channel, receiverIndex) - 1 -> doReceiveOrNull(channel, receiverIndex) + 1 -> doReceiveCatching(channel, receiverIndex) 2 -> doIterator(channel, receiverIndex) 3 -> doReceiveSelect(channel, receiverIndex) - 4 -> doReceiveSelectOrNull(channel, receiverIndex) + 4 -> doReceiveCatchingSelect(channel, receiverIndex) } channel.cancel() } @@ -124,9 +124,9 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) { + private suspend fun doReceiveCatching(channel: ReceiveChannel<Long>, receiverIndex: Int) { while (true) { - val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break) + val stop = doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) if (stop) break } } @@ -148,11 +148,11 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) { while (true) { - val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break + val event = select<Long?> { channel.onReceiveCatching { it.getOrNull() } } ?: break val stop = doReceived(receiverIndex, event) if (stop) break } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt index 76713aa1..86adfee0 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -15,7 +15,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { // total counters private var sendCnt = 0 - private var offerFailedCnt = 0 + private var trySendFailedCnt = 0 private var receivedCnt = 0 private var undeliveredCnt = 0 @@ -23,7 +23,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private var lastReceived = 0 private var dSendCnt = 0 private var dSendExceptionCnt = 0 - private var dOfferFailedCnt = 0 + private var dTrySendFailedCnt = 0 private var dReceivedCnt = 0 private val dUndeliveredCnt = AtomicInteger() @@ -43,30 +43,30 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { joinAll(j1, j2) // All elements must be either received or undelivered (IN every run) - if (dSendCnt - dOfferFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { + if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { println(" Send: $dSendCnt") - println("Send Exception: $dSendExceptionCnt") - println(" Offer failed: $dOfferFailedCnt") + println("Send exception: $dSendExceptionCnt") + println("trySend failed: $dTrySendFailedCnt") println(" Received: $dReceivedCnt") println(" Undelivered: ${dUndeliveredCnt.get()}") error("Failed") } - offerFailedCnt += dOfferFailedCnt + trySendFailedCnt += dTrySendFailedCnt receivedCnt += dReceivedCnt undeliveredCnt += dUndeliveredCnt.get() // clear for next run dSendCnt = 0 dSendExceptionCnt = 0 - dOfferFailedCnt = 0 + dTrySendFailedCnt = 0 dReceivedCnt = 0 dUndeliveredCnt.set(0) } // Stats - println(" Send: $sendCnt") - println(" Offer failed: $offerFailedCnt") - println(" Received: $receivedCnt") - println(" Undelivered: $undeliveredCnt") - assertEquals(sendCnt - offerFailedCnt, receivedCnt + undeliveredCnt) + println(" Send: $sendCnt") + println("trySend failed: $trySendFailedCnt") + println(" Received: $receivedCnt") + println(" Undelivered: $undeliveredCnt") + assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt) } private suspend fun sendOne(channel: Channel<Int>) { @@ -75,11 +75,11 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { try { when (Random.nextInt(2)) { 0 -> channel.send(i) - 1 -> if (!channel.offer(i)) { - dOfferFailedCnt++ + 1 -> if (!channel.trySend(i).isSuccess) { + dTrySendFailedCnt++ } } - } catch(e: Throwable) { + } catch (e: Throwable) { assertTrue(e is CancellationException) // the only exception possible in this test dSendExceptionCnt++ throw e @@ -89,7 +89,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private suspend fun receiveOne(channel: Channel<Int>) { val received = when (Random.nextInt(3)) { 0 -> channel.receive() - 1 -> channel.receiveOrNull() ?: error("Cannot be closed yet") + 1 -> channel.receiveCatching().getOrElse { error("Cannot be closed yet") } 2 -> select { channel.onReceive { it } } @@ -99,4 +99,4 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { dReceivedCnt++ lastReceived = received } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt deleted file mode 100644 index 256ef621..00000000 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.channels - -import kotlinx.atomicfu.* -import kotlinx.coroutines.* -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicLongArray -import kotlin.math.* -import kotlin.test.* - -/** - * Tests lock-freedom of send and receive operations on rendezvous and conflated channels. - * There is a single channel with two sender and two receiver threads. - * When one sender or receiver gets suspended at most one other operation is allowed to cease having progress - * (`allowSuspendedThreads = 1`). - * - * **Note**: In the current implementation buffered channels are not lock-free, so this test would fail - * if channel is created with a buffer. - */ -class ChannelLFStressTest : TestBase() { - private val nSeconds = 5 * stressTestMultiplier - private val env = LockFreedomTestEnvironment("ChannelLFStressTest", allowSuspendedThreads = 1) - private lateinit var channel: Channel<Long> - - private val sendIndex = AtomicLong() - private val receiveCount = AtomicLong() - private val duplicateCount = AtomicLong() - - private val nCheckedSize = 10_000_000 - private val nChecked = (nCheckedSize * Long.SIZE_BITS).toLong() - private val receivedBits = AtomicLongArray(nCheckedSize) // bit set of received values - - @Test - fun testRendezvousLockFreedom() { - channel = Channel() - performLockFreedomTest() - // ensure that all sent were received - checkAllReceived() - } - - private fun performLockFreedomTest() { - env.onCompletion { - // We must cancel the channel to abort both senders & receivers - channel.cancel(TestCompleted()) - } - repeat(2) { env.testThread("sender-$it") { sender() } } - repeat(2) { env.testThread("receiver-$it") { receiver() } } - env.performTest(nSeconds) { - println("Sent: $sendIndex, Received: $receiveCount, dups: $duplicateCount") - } - // ensure no duplicates - assertEquals(0L, duplicateCount.get()) - } - - private fun checkAllReceived() { - for (i in 0 until min(sendIndex.get(), nChecked)) { - assertTrue(isReceived(i)) - } - } - - private suspend fun sender() { - val value = sendIndex.getAndIncrement() - try { - channel.send(value) - } catch (e: TestCompleted) { - check(env.isCompleted) // expected when test was completed - markReceived(value) // fake received (actually failed to send) - } - } - - private suspend fun receiver() { - val value = try { - channel.receive() - } catch (e: TestCompleted) { - check(env.isCompleted) // expected when test was completed - return - } - receiveCount.incrementAndGet() - markReceived(value) - } - - private fun markReceived(value: Long) { - if (value >= nChecked) return // too big - val index = (value / Long.SIZE_BITS).toInt() - val mask = 1L shl (value % Long.SIZE_BITS).toInt() - while (true) { - val bits = receivedBits.get(index) - if (bits and mask != 0L) { - duplicateCount.incrementAndGet() - break - } - if (receivedBits.compareAndSet(index, bits, bits or mask)) break - } - } - - private fun isReceived(value: Long): Boolean { - val index = (value / Long.SIZE_BITS).toInt() - val mask = 1L shl (value % Long.SIZE_BITS).toInt() - val bits = receivedBits.get(index) - return bits and mask != 0L - } - - private class TestCompleted : CancellationException() -} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt index f414c333..a6345cc5 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -60,10 +60,10 @@ class ChannelSendReceiveStressTest( launch(pool + CoroutineName("receiver$receiverIndex")) { when (receiverIndex % 5) { 0 -> doReceive(receiverIndex) - 1 -> doReceiveOrNull(receiverIndex) + 1 -> doReceiveCatching(receiverIndex) 2 -> doIterator(receiverIndex) 3 -> doReceiveSelect(receiverIndex) - 4 -> doReceiveSelectOrNull(receiverIndex) + 4 -> doReceiveCatchingSelect(receiverIndex) } receiversCompleted.incrementAndGet() } @@ -152,9 +152,9 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatching(receiverIndex: Int) { while (true) { - doReceived(receiverIndex, channel.receiveOrNull() ?: break) + doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) } } @@ -173,10 +173,10 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(receiverIndex: Int) { while (true) { - val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break + val event = select<Int?> { channel.onReceiveCatching { it.getOrNull() } } ?: break doReceived(receiverIndex, event) } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt index 1188329a..12334326 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -68,7 +68,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T try { block() } finally { - if (!done.offer(true)) + if (!done.trySend(true).isSuccess) error(IllegalStateException("failed to offer to done channel")) } } @@ -188,9 +188,9 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T val receivedData = when (receiveMode) { 1 -> channel.receive() 2 -> select { channel.onReceive { it } } - 3 -> channel.receiveOrNull() ?: error("Should not be closed") - 4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } } - 5 -> channel.receiveOrClosed().value + 3 -> channel.receiveCatching().getOrElse { error("Should not be closed") } + 4 -> select { channel.onReceiveCatching { it.getOrElse { error("Should not be closed") } } } + 5 -> channel.receiveCatching().getOrThrow() 6 -> { val iterator = channel.iterator() check(iterator.hasNext()) { "Should not be closed" } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt index da20f0c5..8512aebc 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt @@ -11,7 +11,7 @@ import kotlin.test.* class ChannelsJvmTest : TestBase() { @Test - fun testBlocking() { + fun testTrySendBlocking() { val ch = Channel<Int>() val sum = GlobalScope.async { var sum = 0 @@ -19,9 +19,36 @@ class ChannelsJvmTest : TestBase() { sum } repeat(10) { - ch.sendBlocking(it) + assertTrue(ch.trySendBlocking(it).isSuccess) } ch.close() assertEquals(45, runBlocking { sum.await() }) } + + @Test + fun testTrySendBlockingClosedChannel() { + run { + val channel = Channel<Unit>().also { it.close() } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is ClosedSendChannelException) } + .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel<Unit>().also { it.close(TestException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestException) } + .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel<Unit>().also { it.cancel(TestCancellationException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestCancellationException) } + .also { assertTrue { it.isClosed } } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt index eb7be575..2b3c05bc 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt @@ -29,7 +29,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { launch(Dispatchers.Default + CoroutineName("Sender$senderId")) { repeat(nEvents) { i -> if (i % nSenders == senderId) { - broadcast.offer(i) + broadcast.trySend(i) sentTotal.incrementAndGet() yield() } @@ -63,7 +63,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { try { withTimeout(timeLimit) { senders.forEach { it.join() } - broadcast.offer(nEvents) // last event to signal receivers termination + broadcast.trySend(nEvents) // last event to signal receivers termination receivers.forEach { it.join() } } } catch (e: CancellationException) { @@ -86,4 +86,4 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { cancel() value } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt index 316b3785..793d7e44 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt @@ -1,15 +1,12 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels import kotlinx.coroutines.* -import org.junit.After -import org.junit.Test -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.* +import org.junit.* +import java.util.concurrent.atomic.* class ConflatedChannelCloseStressTest : TestBase() { @@ -37,12 +34,9 @@ class ConflatedChannelCloseStressTest : TestBase() { var x = senderId try { while (isActive) { - try { - curChannel.get().offer(x) + curChannel.get().trySend(x).onSuccess { x += nSenders sent.incrementAndGet() - } catch (e: ClosedSendChannelException) { - // ignore } } } finally { @@ -64,7 +58,9 @@ class ConflatedChannelCloseStressTest : TestBase() { } val receiver = async(pool + NonCancellable) { while (isActive) { - curChannel.get().receiveOrNull() + curChannel.get().receiveCatching().getOrElse { + it?.let { throw it } + } received.incrementAndGet() } } @@ -110,4 +106,4 @@ class ConflatedChannelCloseStressTest : TestBase() { } class StopException : Exception() -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 51789078..fbc28a18 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -48,7 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() delayChannel.cancel() delay(5100) - assertFailsWith<CancellationException> { delayChannel.poll() } + assertFailsWith<CancellationException> { delayChannel.tryReceive().getOrThrow() } } } @@ -112,13 +112,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() var sum = 0 var n = 0 whileSelect { - this@averageInTimeWindow.onReceiveOrClosed { + this@averageInTimeWindow.onReceiveCatching { if (it.isClosed) { // Send leftovers and bail out if (n != 0) send(sum / n.toDouble()) false } else { - sum += it.value + sum += it.getOrThrow() ++n true } @@ -159,9 +159,9 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() } } -fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll()) +fun ReceiveChannel<Unit>.checkEmpty() = assertNull(tryReceive().getOrNull()) fun ReceiveChannel<Unit>.checkNotEmpty() { - assertNotNull(poll()) - assertNull(poll()) + assertNotNull(tryReceive().getOrNull()) + assertNull(tryReceive().getOrNull()) } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt index f52f8b5b..2d8c0ebc 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -38,13 +38,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { } @Test - fun testReceiveOrNullFromClosedChannel() = runTest { - val channel = Channel<Int>() - channel.close(RecoverableTestException()) - channelReceiveOrNull(channel) - } - - @Test fun testSendToClosedChannel() = runTest { val channel = Channel<Int>() channel.close(RecoverableTestException()) @@ -67,7 +60,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { } private suspend fun channelReceive(channel: Channel<Int>) = channelOp { channel.receive() } - private suspend fun channelReceiveOrNull(channel: Channel<Int>) = channelOp { channel.receiveOrNull() } private suspend inline fun channelOp(block: () -> Unit) { try { @@ -145,25 +137,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { deferred.await() } - // See https://github.com/Kotlin/kotlinx.coroutines/issues/950 - @Test - fun testCancelledOffer() = runTest { - expect(1) - val job = Job() - val actor = actor<Int>(job, Channel.UNLIMITED) { - consumeEach { - expectUnreached() // is cancelled before offer - } - } - job.cancel() - try { - actor.offer(1) - } catch (e: Exception) { - verifyStackTrace("channels/${name.methodName}", e) - finish(2) - } - } - private suspend fun Channel<Int>.sendWithContext(ctx: CoroutineContext) = withContext(ctx) { sendInChannel() yield() // TCE @@ -177,4 +150,4 @@ class StackTraceRecoveryChannelsTest : TestBase() { private suspend fun Channel<Int>.sendFromScope() = coroutineScope { sendWithContext(wrapperDispatcher(coroutineContext)) } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt index 70336659..dba738a8 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.exceptions import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import org.junit.Test import kotlin.test.* @@ -71,4 +72,56 @@ class StackTraceRecoveryCustomExceptionsTest : TestBase() { assertEquals("custom", cause.message) } } + + class WrongMessageException(token: String) : RuntimeException("Token $token") + + @Test + fun testWrongMessageException() = runTest { + val result = runCatching { + coroutineScope<Unit> { + throw WrongMessageException("OK") + } + } + val ex = result.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is WrongMessageException) + assertEquals("Token OK", ex.message) + } + + @Test + fun testWrongMessageExceptionInChannel() = runTest { + val result = produce<Unit>(SupervisorJob() + Dispatchers.Unconfined) { + throw WrongMessageException("OK") + } + val ex = runCatching { + @Suppress("ControlFlowWithEmptyBody") + for (unit in result) { + // Iterator has a special code path + } + }.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is WrongMessageException) + assertEquals("Token OK", ex.message) + } + + class CopyableWithCustomMessage( + message: String?, + cause: Throwable? = null + ) : RuntimeException(message, cause), + CopyableThrowable<CopyableWithCustomMessage> { + + override fun createCopy(): CopyableWithCustomMessage { + return CopyableWithCustomMessage("Recovered: [$message]", cause) + } + } + + @Test + fun testCustomCopyableMessage() = runTest { + val result = runCatching { + coroutineScope<Unit> { + throw CopyableWithCustomMessage("OK") + } + } + val ex = result.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is CopyableWithCustomMessage) + assertEquals("Recovered: [OK]", ex.message) + } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt index bea18a43..a85bb7a2 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt @@ -86,7 +86,6 @@ class StackTraceRecoveryNestedScopesTest : TestBase() { "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callWithTimeout\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:37)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callCoroutineScope\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:43)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:68)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest.verifyAwait(StackTraceRecoveryNestedScopesTest.kt:76)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:71)\n" + "Caused by: kotlinx.coroutines.RecoverableTestException\n" + diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt index 5073b7fd..02607c03 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt @@ -58,7 +58,7 @@ class StackTraceRecoveryNestedTest : TestBase() { try { rootAsync.awaitRootLevel() } catch (e: RecoverableTestException) { - e.verifyException("await\$suspendImpl", "awaitRootLevel") + e.verifyException("awaitRootLevel") finish(8) } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt index 290420e4..0d7648c5 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt @@ -45,9 +45,9 @@ class StackTraceRecoverySelectTest : TestBase() { private suspend fun doSelectAwait(deferred: Deferred<Unit>): Int { return select { deferred.onAwait { - yield() // Hide the stackstrace + yield() // Hide the frame 42 } } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt index dbbd77c4..0a8b6530 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt @@ -34,14 +34,13 @@ class StackTraceRecoveryTest : TestBase() { val deferred = createDeferred(3) val traces = listOf( "java.util.concurrent.ExecutionException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:49)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:44)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:17)\n", "Caused by: java.util.concurrent.ExecutionException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" + "\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n" ) nestedMethod(deferred, *traces.toTypedArray()) @@ -59,7 +58,6 @@ class StackTraceRecoveryTest : TestBase() { "java.util.concurrent.ExecutionException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:44)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:81)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:75)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:71)", @@ -94,7 +92,6 @@ class StackTraceRecoveryTest : TestBase() { "kotlinx.coroutines.RecoverableTestException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testWithContext\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.outerMethod(StackTraceRecoveryTest.kt:150)\n" + @@ -132,7 +129,6 @@ class StackTraceRecoveryTest : TestBase() { "kotlinx.coroutines.RecoverableTestException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCoroutineScope\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2\$1.invokeSuspend(StackTraceRecoveryTest.kt:193)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" + @@ -228,13 +224,13 @@ class StackTraceRecoveryTest : TestBase() { val e = exception assertNotNull(e) verifyStackTrace(e, "kotlinx.coroutines.RecoverableTestException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" + - "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" + - "Caused by: kotlinx.coroutines.RecoverableTestException") + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.access\$throws(StackTraceRecoveryTest.kt:20)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" + + "\t(Coroutine boundary)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" + + "Caused by: kotlinx.coroutines.RecoverableTestException") } private class Callback(val cont: CancellableContinuation<*>) @@ -261,22 +257,8 @@ class StackTraceRecoveryTest : TestBase() { private suspend fun awaitCallback(channel: Channel<Callback>) { suspendCancellableCoroutine<Unit> { cont -> - channel.offer(Callback(cont)) + channel.trySend(Callback(cont)) } yield() // nop to make sure it is not a tail call } - - @Test - fun testWrongMessageException() = runTest { - val result = runCatching { - coroutineScope<Unit> { - throw WrongMessageException("OK") - } - } - val ex = result.exceptionOrNull() ?: error("Expected to fail") - assertTrue(ex is WrongMessageException) - assertEquals("Token OK", ex.message) - } - - public class WrongMessageException(token: String) : RuntimeException("Token $token") } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt index 6034fccb..edce175d 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -15,8 +15,8 @@ class SuppressionTests : TestBase() { @Test fun testNotificationsWithException() = runTest { expect(1) - val coroutineContext = kotlin.coroutines.coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) { + val coroutineContext = kotlin.coroutines.coroutineContext + NonCancellable // workaround for KT-22984 + val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) { override fun onStart() { expect(3) } @@ -82,4 +82,4 @@ class SuppressionTests : TestBase() { assertTrue(e.cause!!.suppressed.isEmpty()) } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt index e3db2626..f1be284c 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt @@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() { fun testThrowingConsumer() = runTest { var i = 0 val api = CallbackApi { - runCatching { it.offer(++i) } + it.trySend(++i) } val flow = callbackFlow<Int> { @@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() { var i = 0 val api = CallbackApi { if (i < 5) { - it.offer(++i) + it.trySend(++i) } else { it.close(RuntimeException()) } } - val flow = callbackFlow<Int>() { + val flow = callbackFlow<Int> { api.start(channel) awaitClose { api.stop() diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt new file mode 100644 index 00000000..98240fc9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.* +import org.junit.* + +/** + * Tests that shared flows keep strong reference to their source flows. + * See https://github.com/Kotlin/kotlinx.coroutines/issues/2557 + */ +@OptIn(DelicateCoroutinesApi::class) +class SharingReferenceTest : TestBase() { + private val token = object {} + + /* + * Single-threaded executor that we are using to ensure that the flow being sharing actually + * suspended (spilled its locals, attached to parent), so we can verify reachability. + * Without that, it's possible to have a situation where target flow is still + * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails. + */ + @get:Rule + val executor = ExecutorRule(1) + + private val weakEmitter = flow { + emit(null) + // suspend forever without keeping a strong reference to continuation -- this is a model of + // a callback API that does not keep a strong reference it is listeners, but works + suspendCancellableCoroutine<Unit> { } + // using the token here to make it easily traceable + emit(token) + } + + @Test + fun testShareInReference() { + val flow = weakEmitter.shareIn(ContextScope(executor), SharingStarted.Eagerly, 0) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInReference() { + val flow = weakEmitter.stateIn(ContextScope(executor), SharingStarted.Eagerly, null) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInSuspendingReference() = runTest { + val flow = weakEmitter.stateIn(GlobalScope) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + private fun linearize() { + runBlocking(executor) { } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt index dc3cd43c..e55eaad1 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt @@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() { for (second in 1..nSeconds) { delay(1000) val cs = collected.map { it.sum() } - println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}") } emitters.cancelAndJoin() collectors.cancelAndJoin() @@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() { @Test fun testTenEmittersAndCollectors() = stress(10, 10) -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt new file mode 100644 index 00000000..660ed0aa --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* +import kotlin.test.* +import kotlin.test.Test + +class StateFlowUpdateStressTest : TestBase() { + private val iterations = 1_000_000 * stressTestMultiplier + + @get:Rule + public val executor = ExecutorRule(2) + + @Test + fun testUpdate() = doTest { update { it + 1 } } + + @Test + fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } } + + @Test + fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } } + + private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest { + val flow = MutableStateFlow(0) + val j1 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + val j2 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + joinAll(j1, j2) + assertEquals(iterations, flow.value) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt index 15e12614..529f8817 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt @@ -2,16 +2,15 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// This file was automatically generated from basics.md by Knit tool. Do not edit. +// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. package kotlinx.coroutines.guide.exampleBasic01 import kotlinx.coroutines.* -fun main() { - GlobalScope.launch { // launch a new coroutine in background and continue +fun main() = runBlocking { // this: CoroutineScope + launch { // launch a new coroutine and continue delay(1000L) // non-blocking delay for 1 second (default time unit is ms) println("World!") // print after delay } - println("Hello,") // main thread continues while coroutine is delayed - Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive + println("Hello") // main coroutine continues while a previous one is delayed } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt index 4f178ca6..6bf2af4c 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt @@ -2,18 +2,18 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// This file was automatically generated from basics.md by Knit tool. Do not edit. +// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. package kotlinx.coroutines.guide.exampleBasic02 import kotlinx.coroutines.* -fun main() { - GlobalScope.launch { // launch a new coroutine in background and continue - delay(1000L) - println("World!") - } - println("Hello,") // main thread continues here immediately - runBlocking { // but this expression blocks the main thread - delay(2000L) // ... while we delay for 2 seconds to keep JVM alive - } +fun main() = runBlocking { // this: CoroutineScope + launch { doWorld() } + println("Hello") +} + +// this is your first suspending function +suspend fun doWorld() { + delay(1000L) + println("World!") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt index f80113c5..67b6894a 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt @@ -2,16 +2,19 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// This file was automatically generated from basics.md by Knit tool. Do not edit. +// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. package kotlinx.coroutines.guide.exampleBasic03 import kotlinx.coroutines.* -fun main() = runBlocking<Unit> { // start main coroutine - GlobalScope.launch { // launch a new coroutine in background and continue +fun main() = runBlocking { + doWorld() +} + +suspend fun doWorld() = coroutineScope { // this: CoroutineScope + launch { delay(1000L) println("World!") } - println("Hello,") // main coroutine continues here immediately - delay(2000L) // delaying for 2 seconds to keep JVM alive + println("Hello") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt index 33c928a0..efac7085 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt @@ -2,16 +2,26 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// This file was automatically generated from basics.md by Knit tool. Do not edit. +// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. package kotlinx.coroutines.guide.exampleBasic04 import kotlinx.coroutines.* +// Sequentially executes doWorld followed by "Done" fun main() = runBlocking { - val job = GlobalScope.launch { // launch a new coroutine and keep a reference to its Job + doWorld() + println("Done") +} + +// Concurrently executes both sections +suspend fun doWorld() = coroutineScope { // this: CoroutineScope + launch { + delay(2000L) + println("World 2") + } + launch { delay(1000L) - println("World!") + println("World 1") } - println("Hello,") - job.join() // wait until child coroutine completes + println("Hello") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt index 52b490d2..193f2cc3 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt @@ -2,15 +2,17 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// This file was automatically generated from basics.md by Knit tool. Do not edit. +// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. package kotlinx.coroutines.guide.exampleBasic05 import kotlinx.coroutines.* -fun main() = runBlocking { // this: CoroutineScope - launch { // launch a new coroutine in the scope of runBlocking +fun main() = runBlocking { + val job = launch { // launch a new coroutine and keep a reference to its Job delay(1000L) println("World!") } - println("Hello,") + println("Hello") + job.join() // wait until child coroutine completes + println("Done") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt index a1b5bc5f..24b890a0 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt @@ -2,26 +2,16 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// This file was automatically generated from basics.md by Knit tool. Do not edit. +// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. package kotlinx.coroutines.guide.exampleBasic06 import kotlinx.coroutines.* -fun main() = runBlocking { // this: CoroutineScope - launch { - delay(200L) - println("Task from runBlocking") - } - - coroutineScope { // Creates a coroutine scope +fun main() = runBlocking { + repeat(100_000) { // launch a lot of coroutines launch { - delay(500L) - println("Task from nested launch") + delay(5000L) + print(".") } - - delay(100L) - println("Task from coroutine scope") // This line will be printed before the nested launch } - - println("Coroutine scope is over") // This line is not printed until the nested launch completes } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt deleted file mode 100644 index 32c02b86..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic07 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - launch { doWorld() } - println("Hello,") -} - -// this is your first suspending function -suspend fun doWorld() { - delay(1000L) - println("World!") -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt deleted file mode 100644 index bb7786f2..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic08 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - repeat(100_000) { // launch a lot of coroutines - launch { - delay(5000L) - print(".") - } - } -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt deleted file mode 100644 index 9f998b52..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic09 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - GlobalScope.launch { - repeat(1000) { i -> - println("I'm sleeping $i ...") - delay(500L) - } - } - delay(1300L) // just quit after delay -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt index 312dc72b..35536a7d 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt @@ -23,10 +23,12 @@ fun main() { println("Completed in $time ms") } +@OptIn(DelicateCoroutinesApi::class) fun somethingUsefulOneAsync() = GlobalScope.async { doSomethingUsefulOne() } +@OptIn(DelicateCoroutinesApi::class) fun somethingUsefulTwoAsync() = GlobalScope.async { doSomethingUsefulTwo() } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt index e23eaf25..c6ad4516 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt @@ -10,9 +10,9 @@ import kotlinx.coroutines.* fun main() = runBlocking<Unit> { // launch a coroutine to process some kind of incoming request val request = launch { - // it spawns two other jobs, one with GlobalScope - GlobalScope.launch { - println("job1: I run in GlobalScope and execute independently!") + // it spawns two other jobs + launch(Job()) { + println("job1: I run in my own Job and execute independently!") delay(1000) println("job1: I am not affected by cancellation of the request") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt index e08ddd08..24cbabe0 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions01 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val job = GlobalScope.launch { // root coroutine with launch println("Throwing exception from launch") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt index 67fdaa71..c3ab68a5 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions02 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt index 9c9b43d2..b966c1ea 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions04 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt index 04f9385f..5f1f3d89 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.exceptions.* import kotlinx.coroutines.* import java.io.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt index 5a5b276b..bc9f77b9 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt @@ -8,6 +8,7 @@ package kotlinx.coroutines.guide.exampleExceptions06 import kotlinx.coroutines.* import java.io.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt index 57fe6382..22380d3a 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt @@ -11,17 +11,21 @@ import kotlinx.coroutines.selects.* suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> { - a.onReceiveOrNull { value -> - if (value == null) - "Channel 'a' is closed" - else + a.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "a -> '$value'" + } else { + "Channel 'a' is closed" + } } - b.onReceiveOrNull { value -> - if (value == null) - "Channel 'b' is closed" - else + b.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "b -> '$value'" + } else { + "Channel 'b' is closed" + } } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt index 464e9b20..68b44564 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt @@ -13,12 +13,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select<Deferred<String>?> { // return next deferred value from this select or null - input.onReceiveOrNull { update -> - update // replaces next value to wait + input.onReceiveCatching { update -> + update.getOrNull() } - current.onAwait { value -> + current.onAwait { value -> send(value) // send value that current deferred has produced - input.receiveOrNull() // and use the next deferred from the input channel + input.receiveCatching().getOrNull() // and use the next deferred from the input channel } } if (next == null) { diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt index ea5003b0..7e54fb1d 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt @@ -2,7 +2,7 @@ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -// This file was automatically generated from basics.md by Knit tool. Do not edit. +// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. package kotlinx.coroutines.guide.test import kotlinx.coroutines.knit.* @@ -12,7 +12,7 @@ class BasicsGuideTest { @Test fun testExampleBasic01() { test("ExampleBasic01") { kotlinx.coroutines.guide.exampleBasic01.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -20,7 +20,7 @@ class BasicsGuideTest { @Test fun testExampleBasic02() { test("ExampleBasic02") { kotlinx.coroutines.guide.exampleBasic02.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -28,7 +28,7 @@ class BasicsGuideTest { @Test fun testExampleBasic03() { test("ExampleBasic03") { kotlinx.coroutines.guide.exampleBasic03.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -36,50 +36,26 @@ class BasicsGuideTest { @Test fun testExampleBasic04() { test("ExampleBasic04") { kotlinx.coroutines.guide.exampleBasic04.main() }.verifyLines( - "Hello,", - "World!" + "Hello", + "World 1", + "World 2", + "Done" ) } @Test fun testExampleBasic05() { test("ExampleBasic05") { kotlinx.coroutines.guide.exampleBasic05.main() }.verifyLines( - "Hello,", - "World!" + "Hello", + "World!", + "Done" ) } @Test fun testExampleBasic06() { - test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.verifyLines( - "Task from coroutine scope", - "Task from runBlocking", - "Task from nested launch", - "Coroutine scope is over" - ) - } - - @Test - fun testExampleBasic07() { - test("ExampleBasic07") { kotlinx.coroutines.guide.exampleBasic07.main() }.verifyLines( - "Hello,", - "World!" - ) - } - - @Test - fun testExampleBasic08() { - test("ExampleBasic08") { kotlinx.coroutines.guide.exampleBasic08.main() }.also { lines -> + test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.also { lines -> check(lines.size == 1 && lines[0] == ".".repeat(100_000)) } } - - @Test - fun testExampleBasic09() { - test("ExampleBasic09") { kotlinx.coroutines.guide.exampleBasic09.main() }.verifyLines( - "I'm sleeping 0 ...", - "I'm sleeping 1 ...", - "I'm sleeping 2 ..." - ) - } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index d6f1c21d..1a84fb94 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -57,7 +57,7 @@ class DispatcherGuideTest { @Test fun testExampleContext06() { test("ExampleContext06") { kotlinx.coroutines.guide.exampleContext06.main() }.verifyLines( - "job1: I run in GlobalScope and execute independently!", + "job1: I run in my own Job and execute independently!", "job2: I am a child of the request coroutine", "job1: I am not affected by cancellation of the request", "main: Who has survived request cancellation?" diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt deleted file mode 100644 index 225b8481..00000000 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -import kotlinx.atomicfu.LockFreedomTestEnvironment -import kotlinx.coroutines.stressTestMultiplier -import org.junit.Test -import java.util.* -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicReference -import kotlin.test.* - -/** - * This stress test has 4 threads adding randomly to the list and them immediately undoing - * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically). - */ -class LockFreeLinkedListAtomicLFStressTest { - private val env = LockFreedomTestEnvironment("LockFreeLinkedListAtomicLFStressTest") - - private data class Node(val i: Long) : LockFreeLinkedListNode() - - private val nSeconds = 5 * stressTestMultiplier - - private val nLists = 4 - private val nAdderThreads = 4 - private val nRemoverThreads = 4 - - private val lists = Array(nLists) { LockFreeLinkedListHead() } - - private val undone = AtomicLong() - private val missed = AtomicLong() - private val removed = AtomicLong() - private val error = AtomicReference<Throwable>() - private val index = AtomicLong() - - @Test - fun testStress() { - repeat(nAdderThreads) { threadId -> - val rnd = Random() - env.testThread(name = "adder-$threadId") { - when (rnd.nextInt(4)) { - 0 -> { - val list = lists[rnd.nextInt(nLists)] - val node = Node(index.incrementAndGet()) - addLastOp(list, node) - randomSpinWaitIntermission() - tryRemoveOp(node) - } - 1 -> { - // just to test conditional add - val list = lists[rnd.nextInt(nLists)] - val node = Node(index.incrementAndGet()) - addLastIfTrueOp(list, node) - randomSpinWaitIntermission() - tryRemoveOp(node) - } - 2 -> { - // just to test failed conditional add and burn some time - val list = lists[rnd.nextInt(nLists)] - val node = Node(index.incrementAndGet()) - addLastIfFalseOp(list, node) - } - 3 -> { - // add two atomically - val idx1 = rnd.nextInt(nLists - 1) - val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1) - check(idx1 < idx2) // that is our global order - val list1 = lists[idx1] - val list2 = lists[idx2] - val node1 = Node(index.incrementAndGet()) - val node2 = Node(index.incrementAndGet()) - addTwoOp(list1, node1, list2, node2) - randomSpinWaitIntermission() - tryRemoveOp(node1) - randomSpinWaitIntermission() - tryRemoveOp(node2) - } - else -> error("Cannot happen") - } - } - } - repeat(nRemoverThreads) { threadId -> - val rnd = Random() - env.testThread(name = "remover-$threadId") { - val idx1 = rnd.nextInt(nLists - 1) - val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1) - check(idx1 < idx2) // that is our global order - val list1 = lists[idx1] - val list2 = lists[idx2] - removeTwoOp(list1, list2) - } - } - env.performTest(nSeconds) { - val undone = undone.get() - val missed = missed.get() - val removed = removed.get() - println(" Adders undone $undone node additions") - println(" Adders missed $missed nodes") - println("Remover removed $removed nodes") - } - error.get()?.let { throw it } - assertEquals(missed.get(), removed.get()) - assertTrue(undone.get() > 0) - assertTrue(missed.get() > 0) - lists.forEach { it.validate() } - } - - private fun addLastOp(list: LockFreeLinkedListHead, node: Node) { - list.addLast(node) - } - - private fun addLastIfTrueOp(list: LockFreeLinkedListHead, node: Node) { - assertTrue(list.addLastIf(node) { true }) - } - - private fun addLastIfFalseOp(list: LockFreeLinkedListHead, node: Node) { - assertFalse(list.addLastIf(node) { false }) - } - - private fun addTwoOp(list1: LockFreeLinkedListHead, node1: Node, list2: LockFreeLinkedListHead, node2: Node) { - val add1 = list1.describeAddLast(node1) - val add2 = list2.describeAddLast(node2) - val op = object : AtomicOp<Any?>() { - init { - add1.atomicOp = this - add2.atomicOp = this - } - override fun prepare(affected: Any?): Any? = - add1.prepare(this) ?: - add2.prepare(this) - - override fun complete(affected: Any?, failure: Any?) { - add1.complete(this, failure) - add2.complete(this, failure) - } - } - assertTrue(op.perform(null) == null) - } - - private fun tryRemoveOp(node: Node) { - if (node.remove()) - undone.incrementAndGet() - else - missed.incrementAndGet() - } - - private fun removeTwoOp(list1: LockFreeLinkedListHead, list2: LockFreeLinkedListHead) { - val remove1 = list1.describeRemoveFirst() - val remove2 = list2.describeRemoveFirst() - val op = object : AtomicOp<Any?>() { - init { - remove1.atomicOp = this - remove2.atomicOp = this - } - override fun prepare(affected: Any?): Any? = - remove1.prepare(this) ?: - remove2.prepare(this) - - override fun complete(affected: Any?, failure: Any?) { - remove1.complete(this, failure) - remove2.complete(this, failure) - } - } - val success = op.perform(null) == null - if (success) removed.addAndGet(2) - } -}
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt new file mode 100644 index 00000000..30fbfee2 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly + +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* + +internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name) + +internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher = + ClosedAfterGuideTestDispatcher(nThreads, name) + +internal class PoolThread( + @JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests + target: Runnable, name: String +) : Thread(target, name) { + init { + isDaemon = true + } +} + +private class ClosedAfterGuideTestDispatcher( + private val nThreads: Int, + private val name: String +) : ExecutorCoroutineDispatcher() { + private val threadNo = AtomicInteger() + + override val executor: Executor = + Executors.newScheduledThreadPool(nThreads, object : ThreadFactory { + override fun newThread(target: java.lang.Runnable): Thread { + return PoolThread( + this@ClosedAfterGuideTestDispatcher, + target, + if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet() + ) + } + }) + + override fun dispatch(context: CoroutineContext, block: Runnable) { + executor.execute(wrapTask(block)) + } + + override fun close() { + (executor as ExecutorService).shutdown() + } + + override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" +} diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index 7eda9043..2e61ec6b 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.knit @@ -11,8 +11,6 @@ import kotlinx.knit.test.* import java.util.concurrent.* import kotlin.test.* -fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block) - // helper function to dump exception to stdout for ease of debugging failed tests private inline fun <T> outputException(name: String, block: () -> T): T = try { block() } @@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) { } throw t } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt index 8836fdc7..74cc1783 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt @@ -3,7 +3,7 @@ */ @file:Suppress("unused") -package kotlinx.coroutines.linearizability +package kotlinx.coroutines.lincheck import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -11,37 +11,37 @@ import kotlinx.coroutines.channels.Channel.Factory.CONFLATED import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.selects.* +import org.jetbrains.kotlinx.lincheck.* import org.jetbrains.kotlinx.lincheck.annotations.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.paramgen.* import org.jetbrains.kotlinx.lincheck.verifier.* -import org.junit.* -class RendezvousChannelLCStressTest : ChannelLCStressTestBase( +class RendezvousChannelLincheckTest : ChannelLincheckTestBase( c = Channel(RENDEZVOUS), sequentialSpecification = SequentialRendezvousChannel::class.java ) class SequentialRendezvousChannel : SequentialIntChannelBase(RENDEZVOUS) -class Array1ChannelLCStressTest : ChannelLCStressTestBase( +class Array1ChannelLincheckTest : ChannelLincheckTestBase( c = Channel(1), sequentialSpecification = SequentialArray1RendezvousChannel::class.java ) class SequentialArray1RendezvousChannel : SequentialIntChannelBase(1) -class Array2ChannelLCStressTest : ChannelLCStressTestBase( +class Array2ChannelLincheckTest : ChannelLincheckTestBase( c = Channel(2), sequentialSpecification = SequentialArray2RendezvousChannel::class.java ) class SequentialArray2RendezvousChannel : SequentialIntChannelBase(2) -class UnlimitedChannelLCStressTest : ChannelLCStressTestBase( +class UnlimitedChannelLincheckTest : ChannelLincheckTestBase( c = Channel(UNLIMITED), sequentialSpecification = SequentialUnlimitedChannel::class.java ) class SequentialUnlimitedChannel : SequentialIntChannelBase(UNLIMITED) -class ConflatedChannelLCStressTest : ChannelLCStressTestBase( +class ConflatedChannelLincheckTest : ChannelLincheckTestBase( c = Channel(CONFLATED), sequentialSpecification = SequentialConflatedChannel::class.java ) @@ -51,8 +51,11 @@ class SequentialConflatedChannel : SequentialIntChannelBase(CONFLATED) Param(name = "value", gen = IntGen::class, conf = "1:5"), Param(name = "closeToken", gen = IntGen::class, conf = "1:3") ) -abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val sequentialSpecification: Class<*>) { - @Operation +abstract class ChannelLincheckTestBase( + private val c: Channel<Int>, + private val sequentialSpecification: Class<*> +) : AbstractLincheckTest() { + @Operation(promptCancellation = true) suspend fun send(@Param(name = "value") value: Int): Any = try { c.send(value) } catch (e: NumberedCancellationException) { @@ -60,11 +63,12 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val } @Operation - fun offer(@Param(name = "value") value: Int): Any = try { - c.offer(value) - } catch (e: NumberedCancellationException) { - e.testResult - } + fun trySend(@Param(name = "value") value: Int): Any = c.trySend(value) + .onSuccess { return true } + .onFailure { + return if (it is NumberedCancellationException) it.testResult + else false + } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -74,7 +78,7 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val e.testResult } - @Operation + @Operation(promptCancellation = true) suspend fun receive(): Any = try { c.receive() } catch (e: NumberedCancellationException) { @@ -82,11 +86,10 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val } @Operation - fun poll(): Any? = try { - c.poll() - } catch (e: NumberedCancellationException) { - e.testResult - } + fun tryReceive(): Any? = + c.tryReceive() + .onSuccess { return it } + .onFailure { return if (it is NumberedCancellationException) it.testResult else null } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -96,7 +99,7 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val e.testResult } - @Operation + @Operation(causesBlocking = true) fun close(@Param(name = "closeToken") token: Int): Boolean = c.close(NumberedCancellationException(token)) // TODO: this operation should be (and can be!) linearizable, but is not @@ -113,11 +116,8 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val // @Operation fun isEmpty() = c.isEmpty - @Test - fun test() = LCStressOptionsDefault() - .actorsBefore(0) - .sequentialSpecification(sequentialSpecification) - .check(this::class) + override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O = + actorsBefore(0).sequentialSpecification(sequentialSpecification) } private class NumberedCancellationException(number: Int) : CancellationException() { @@ -131,7 +131,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta private val buffer = ArrayList<Int>() private var closedMessage: String? = null - suspend fun send(x: Int): Any = when (val offerRes = offer(x)) { + suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) { true -> Unit false -> suspendCancellableCoroutine { cont -> senders.add(cont to x) @@ -139,7 +139,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta else -> offerRes } - fun offer(element: Int): Any { + fun trySend(element: Int): Any { if (closedMessage !== null) return closedMessage!! if (capacity == CONFLATED) { if (resumeFirstReceiver(element)) return true @@ -163,11 +163,11 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta return false } - suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont -> + suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont -> receivers.add(cont) } - fun poll(): Any? { + fun tryReceive(): Any? { if (buffer.isNotEmpty()) { val el = buffer.removeAt(0) resumeFirstSender().also { @@ -221,4 +221,4 @@ private fun <T> CancellableContinuation<T>.resume(res: T): Boolean { val token = tryResume(res) ?: return false completeResume(token) return true -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt index 5f91c640..4f1bb6ad 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt @@ -3,18 +3,17 @@ */ @file:Suppress("unused") -package kotlinx.coroutines.linearizability +package kotlinx.coroutines.lincheck import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import org.jetbrains.kotlinx.lincheck.annotations.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.paramgen.* -import org.jetbrains.kotlinx.lincheck.verifier.* -import kotlin.test.* +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* @Param(name = "value", gen = IntGen::class, conf = "1:5") -class LockFreeListLCStressTest : VerifierState() { +class LockFreeListLincheckTest : AbstractLincheckTest() { class Node(val value: Int): LockFreeLinkedListNode() private val q: LockFreeLinkedListHead = LockFreeLinkedListHead() @@ -43,12 +42,12 @@ class LockFreeListLCStressTest : VerifierState() { private fun Any.isSame(value: Int) = this is Node && this.value == value - @Test - fun testAddRemoveLinearizability() = LCStressOptionsDefault().check(this::class) - override fun extractState(): Any { val elements = ArrayList<Int>() q.forEach<Node> { elements.add(it.value) } return elements } + + override fun ModelCheckingOptions.customize(isStressTest: Boolean) = + checkObstructionFreedom() }
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeTaskQueueLincheckTest.kt index de494cc1..2a9164e1 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeTaskQueueLincheckTest.kt @@ -3,19 +3,21 @@ */ @file:Suppress("unused") -package kotlinx.coroutines.linearizability +package kotlinx.coroutines.lincheck import kotlinx.coroutines.* import kotlinx.coroutines.internal.* +import org.jetbrains.kotlinx.lincheck.* import org.jetbrains.kotlinx.lincheck.annotations.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.paramgen.* -import org.jetbrains.kotlinx.lincheck.verifier.* +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* import org.jetbrains.kotlinx.lincheck.verifier.quiescent.* -import kotlin.test.* @Param(name = "value", gen = IntGen::class, conf = "1:3") -internal abstract class AbstractLockFreeTaskQueueWithoutRemoveLCStressTest protected constructor(singleConsumer: Boolean) : VerifierState() { +internal abstract class AbstractLockFreeTaskQueueWithoutRemoveLincheckTest( + val singleConsumer: Boolean +) : AbstractLincheckTest() { @JvmField protected val q = LockFreeTaskQueue<Int>(singleConsumer = singleConsumer) @@ -25,20 +27,24 @@ internal abstract class AbstractLockFreeTaskQueueWithoutRemoveLCStressTest prote @Operation fun addLast(@Param(name = "value") value: Int) = q.addLast(value) - @QuiescentConsistent - @Operation(group = "consumer") - fun removeFirstOrNull() = q.removeFirstOrNull() + override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O = + verifier(QuiescentConsistencyVerifier::class.java) override fun extractState() = q.map { it } to q.isClosed() - @Test - fun testWithRemoveForQuiescentConsistency() = LCStressOptionsDefault() - .verifier(QuiescentConsistencyVerifier::class.java) - .check(this::class) + override fun ModelCheckingOptions.customize(isStressTest: Boolean) = + checkObstructionFreedom() } -@OpGroupConfig(name = "consumer", nonParallel = false) -internal class MCLockFreeTaskQueueWithRemoveLCStressTest : AbstractLockFreeTaskQueueWithoutRemoveLCStressTest(singleConsumer = false) +internal class MCLockFreeTaskQueueWithRemoveLincheckTest : AbstractLockFreeTaskQueueWithoutRemoveLincheckTest(singleConsumer = false) { + @QuiescentConsistent + @Operation(blocking = true) + fun removeFirstOrNull() = q.removeFirstOrNull() +} @OpGroupConfig(name = "consumer", nonParallel = true) -internal class SCLockFreeTaskQueueWithRemoveLCStressTest : AbstractLockFreeTaskQueueWithoutRemoveLCStressTest(singleConsumer = true)
\ No newline at end of file +internal class SCLockFreeTaskQueueWithRemoveLincheckTest : AbstractLockFreeTaskQueueWithoutRemoveLincheckTest(singleConsumer = true) { + @QuiescentConsistent + @Operation(group = "consumer") + fun removeFirstOrNull() = q.removeFirstOrNull() +}
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt new file mode 100644 index 00000000..a278985f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt @@ -0,0 +1,32 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:Suppress("unused") +package kotlinx.coroutines.lincheck + +import kotlinx.coroutines.* +import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* + +class MutexLincheckTest : AbstractLincheckTest() { + private val mutex = Mutex() + + @Operation + fun tryLock() = mutex.tryLock() + + @Operation(promptCancellation = true) + suspend fun lock() = mutex.lock() + + @Operation(handleExceptionsAsResult = [IllegalStateException::class]) + fun unlock() = mutex.unlock() + + override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O = + actorsBefore(0) + + override fun ModelCheckingOptions.customize(isStressTest: Boolean) = + checkObstructionFreedom() + + override fun extractState() = mutex.isLocked +} diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentListRemoveLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentListRemoveLincheckTest.kt index 5daed998..5a8d7b47 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentListRemoveLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentListRemoveLincheckTest.kt @@ -4,18 +4,16 @@ @file:Suppress("unused") -package kotlinx.coroutines.linearizability +package kotlinx.coroutines.lincheck import kotlinx.coroutines.* import kotlinx.coroutines.internal.* +import org.jetbrains.kotlinx.lincheck.* import org.jetbrains.kotlinx.lincheck.annotations.* -import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.paramgen.* -import org.jetbrains.kotlinx.lincheck.verifier.* -import org.junit.* +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* - -class SegmentListRemoveLCStressTest : VerifierState() { +class SegmentListRemoveLincheckTest : AbstractLincheckTest() { private val q = SegmentBasedQueue<Int>() private val segments: Array<OneElementSegment<Int>> @@ -29,6 +27,9 @@ class SegmentListRemoveLCStressTest : VerifierState() { segments[index].removeSegment() } + override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O = this + .actorsBefore(0).actorsAfter(0) + override fun extractState() = segments.map { it.logicallyRemoved } @Validate @@ -37,9 +38,6 @@ class SegmentListRemoveLCStressTest : VerifierState() { q.checkAllSegmentsAreNotLogicallyRemoved() } - @Test - fun test() = LCStressOptionsDefault() - .actorsBefore(0) - .actorsAfter(0) - .check(this::class) + override fun ModelCheckingOptions.customize(isStressTest: Boolean) = + checkObstructionFreedom() }
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueLincheckTest.kt index 89bf8dfa..76a59e39 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueLincheckTest.kt @@ -3,18 +3,17 @@ */ @file:Suppress("unused") -package kotlinx.coroutines.linearizability +package kotlinx.coroutines.lincheck import kotlinx.coroutines.* import kotlinx.coroutines.internal.SegmentBasedQueue import org.jetbrains.kotlinx.lincheck.annotations.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.paramgen.* -import org.jetbrains.kotlinx.lincheck.verifier.* -import org.junit.* +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* @Param(name = "value", gen = IntGen::class, conf = "1:5") -class SegmentQueueLCStressTest : VerifierState() { +class SegmentQueueLincheckTest : AbstractLincheckTest() { private val q = SegmentBasedQueue<Int>() @Operation @@ -40,6 +39,6 @@ class SegmentQueueLCStressTest : VerifierState() { return elements to closed } - @Test - fun test() = LCStressOptionsDefault().check(this::class) + override fun ModelCheckingOptions.customize(isStressTest: Boolean) = + checkObstructionFreedom() }
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt new file mode 100644 index 00000000..2b471d7f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:Suppress("unused") +package kotlinx.coroutines.lincheck + +import kotlinx.coroutines.* +import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* + +abstract class SemaphoreLincheckTestBase(permits: Int) : AbstractLincheckTest() { + private val semaphore = Semaphore(permits) + + @Operation + fun tryAcquire() = semaphore.tryAcquire() + + @Operation(promptCancellation = true, allowExtraSuspension = true) + suspend fun acquire() = semaphore.acquire() + + @Operation(handleExceptionsAsResult = [IllegalStateException::class]) + fun release() = semaphore.release() + + override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O = + actorsBefore(0) + + override fun extractState() = semaphore.availablePermits + + override fun ModelCheckingOptions.customize(isStressTest: Boolean) = + checkObstructionFreedom() +} + +class Semaphore1LincheckTest : SemaphoreLincheckTestBase(1) +class Semaphore2LincheckTest : SemaphoreLincheckTestBase(2)
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt deleted file mode 100644 index 9542b5d8..00000000 --- a/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -@file:Suppress("unused") -package kotlinx.coroutines.linearizability - -import kotlinx.coroutines.* -import kotlinx.coroutines.sync.* -import org.jetbrains.kotlinx.lincheck.annotations.Operation -import org.jetbrains.kotlinx.lincheck.verifier.* -import org.junit.* - -class MutexLCStressTest : VerifierState() { - private val mutex = Mutex() - - @Operation - fun tryLock() = mutex.tryLock() - - @Operation - suspend fun lock() = mutex.lock() - - @Operation(handleExceptionsAsResult = [IllegalStateException::class]) - fun unlock() = mutex.unlock() - - @Test - fun test() = LCStressOptionsDefault() - .actorsBefore(0) - .check(this::class) - - override fun extractState() = mutex.isLocked -}
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt deleted file mode 100644 index 52902f49..00000000 --- a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -@file:Suppress("unused") -package kotlinx.coroutines.linearizability - -import kotlinx.coroutines.* -import kotlinx.coroutines.sync.* -import org.jetbrains.kotlinx.lincheck.annotations.Operation -import org.jetbrains.kotlinx.lincheck.verifier.* -import org.junit.* - -abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() { - private val semaphore = Semaphore(permits) - - @Operation - fun tryAcquire() = semaphore.tryAcquire() - - @Operation - suspend fun acquire() = semaphore.acquire() - - @Operation(handleExceptionsAsResult = [IllegalStateException::class]) - fun release() = semaphore.release() - - @Test - fun test() = LCStressOptionsDefault() - .actorsBefore(0) - .check(this::class) - - override fun extractState() = semaphore.availablePermits -} - -class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1) -class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2)
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt index 1fe0d838..3a55f8c4 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt @@ -77,4 +77,4 @@ class BlockingCoroutineDispatcherMixedStealingStressTest : SchedulerTestBase() { cpuBlocker.await() } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index f31752c8..fe09440f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -101,7 +101,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { firstBarrier.await() secondBarrier.await() blockingTasks.joinAll() - checkPoolThreadsCreated(21..22) + checkPoolThreadsCreated(21 /* blocking tasks + 1 for CPU */..20 + CORES_COUNT) } @Test @@ -122,7 +122,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { barrier.await() blockingTasks.joinAll() // There may be race when multiple CPU threads are trying to lazily created one more - checkPoolThreadsCreated(104..120) + checkPoolThreadsCreated(101..100 + CORES_COUNT) } @Test diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index bfabf5b2..dd969bdd 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -39,17 +39,9 @@ abstract class SchedulerTestBase : TestBase() { ) } - /** - * Asserts that any number of pool worker threads in [range] exists at the time of method invocation - */ - fun checkPoolThreadsExist(range: IntRange) { - val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count() - assertTrue(threads in range, "Expected threads in $range interval, but has $threads") - } - private fun maxSequenceNumber(): Int? { return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } - .map { sequenceNumber(it.name) }.max() + .map { sequenceNumber(it.name) }.maxOrNull() } private fun sequenceNumber(threadName: String): Int { @@ -105,4 +97,4 @@ abstract class SchedulerTestBase : TestBase() { } } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt b/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt index a5c83d32..233e4420 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt @@ -1,11 +1,11 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.scheduling -internal class TestTimeSource(var time: Long) : TimeSource() { +internal class TestTimeSource(var time: Long) : SchedulerTimeSource() { override fun nanoTime() = time diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt deleted file mode 100644 index 4497bec5..00000000 --- a/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.selects - -import kotlinx.atomicfu.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import org.junit.* -import org.junit.Ignore -import org.junit.Test -import kotlin.math.* -import kotlin.test.* - -/** - * A stress-test on lock-freedom of select sending/receiving into opposite channels. - */ -class SelectDeadlockLFStressTest : TestBase() { - private val env = LockFreedomTestEnvironment("SelectDeadlockLFStressTest", allowSuspendedThreads = 1) - private val nSeconds = 5 * stressTestMultiplier - - private val c1 = Channel<Long>() - private val c2 = Channel<Long>() - - @Test - fun testLockFreedom() = testScenarios( - "s1r2", - "s2r1", - "r1s2", - "r2s1" - ) - - private fun testScenarios(vararg scenarios: String) { - env.onCompletion { - c1.cancel(TestCompleted()) - c2.cancel(TestCompleted()) - } - val t = scenarios.mapIndexed { i, scenario -> - val idx = i + 1L - TestDef(idx, "$idx [$scenario]", scenario) - } - t.forEach { it.test() } - env.performTest(nSeconds) { - t.forEach { println(it) } - } - } - - private inner class TestDef( - var sendIndex: Long = 0L, - val name: String, - scenario: String - ) { - var receiveIndex = 0L - - val clauses: List<SelectBuilder<Unit>.() -> Unit> = ArrayList<SelectBuilder<Unit>.() -> Unit>().apply { - require(scenario.length % 2 == 0) - for (i in scenario.indices step 2) { - val ch = when (val c = scenario[i + 1]) { - '1' -> c1 - '2' -> c2 - else -> error("Channel '$c'") - } - val clause = when (val op = scenario[i]) { - 's' -> fun SelectBuilder<Unit>.() { sendClause(ch) } - 'r' -> fun SelectBuilder<Unit>.() { receiveClause(ch) } - else -> error("Operation '$op'") - } - add(clause) - } - } - - fun test() = env.testThread(name) { - doSendReceive() - } - - private suspend fun doSendReceive() { - try { - select<Unit> { - for (clause in clauses) clause() - } - } catch (e: TestCompleted) { - assertTrue(env.isCompleted) - } - } - - private fun SelectBuilder<Unit>.sendClause(c: Channel<Long>) = - c.onSend(sendIndex) { - sendIndex += 4L - } - - private fun SelectBuilder<Unit>.receiveClause(c: Channel<Long>) = - c.onReceive { i -> - receiveIndex = max(i, receiveIndex) - } - - override fun toString(): String = "$name: send=$sendIndex, received=$receiveIndex" - } - - private class TestCompleted : CancellationException() -}
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt index bb713b25..027f3c51 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt @@ -90,4 +90,21 @@ class MutexStressTest : TestBase() { } } } -}
\ No newline at end of file + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val mutex = Mutex() + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + mutex.lock() + mutex.unlock() + } + mutex.withLock { + job.cancel() + } + job.join() + assertFalse { mutex.isLocked } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt index 374a1e3d..2ceed64b 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt @@ -2,7 +2,7 @@ package kotlinx.coroutines.sync import kotlinx.coroutines.* import org.junit.Test -import kotlin.test.assertEquals +import kotlin.test.* class SemaphoreStressTest : TestBase() { @Test @@ -90,4 +90,21 @@ class SemaphoreStressTest : TestBase() { } } } + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val semaphore = Semaphore(1) + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + semaphore.acquire() + semaphore.release() + } + semaphore.withPermit { + job.cancel() + } + job.join() + assertTrue { semaphore.availablePermits == 1 } + } + } } |