aboutsummaryrefslogtreecommitdiff
path: root/kotlinx-coroutines-core/jvm
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/jvm')
-rw-r--r--kotlinx-coroutines-core/jvm/resources/DebugProbesKt.binbin1728 -> 1714 bytes
-rw-r--r--kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro4
-rw-r--r--kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt (renamed from kotlinx-coroutines-core/jvm/src/TimeSource.kt)24
-rw-r--r--kotlinx-coroutines-core/jvm/src/Builders.kt7
-rw-r--r--kotlinx-coroutines-core/jvm/src/CommonPool.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/src/CompletionHandler.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/CoroutineContext.kt100
-rw-r--r--kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/Debug.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/DebugStrings.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/Dispatchers.kt10
-rw-r--r--kotlinx-coroutines-core/jvm/src/EventLoop.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/Exceptions.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/Executors.kt82
-rw-r--r--kotlinx-coroutines-core/jvm/src/Future.kt16
-rw-r--r--kotlinx-coroutines-core/jvm/src/Interruptible.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/Runnable.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/SchedulerTask.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt45
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/Actor.kt14
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/Channels.kt81
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt10
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt22
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt68
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt12
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt9
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt19
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt45
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt30
-rw-r--r--kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt9
-rw-r--r--kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt10
-rw-r--r--kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt8
-rw-r--r--kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt8
-rw-r--r--kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt41
-rw-r--r--kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt5
-rw-r--r--kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt44
-rw-r--r--kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt12
-rw-r--r--kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt14
-rw-r--r--kotlinx-coroutines-core/jvm/test/FieldWalker.kt11
-rw-r--r--kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt56
-rw-r--r--kotlinx-coroutines-core/jvm/test/JoinStressTest.kt (renamed from kotlinx-coroutines-core/jvm/test/JoinStrTest.kt)2
-rw-r--r--kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt41
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt41
-rw-r--r--kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt13
-rw-r--r--kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/TestBase.kt47
-rw-r--r--kotlinx-coroutines-core/jvm/test/ThreadContextElementRestoreTest.kt198
-rw-r--r--kotlinx-coroutines-core/jvm/test/ThreadContextOrderTest.kt65
-rw-r--r--kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt14
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt31
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt16
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt36
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt107
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt16
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt10
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt31
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt14
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt31
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt53
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt38
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt61
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt44
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt9
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt13
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt10
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt19
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt17
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt18
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt48
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt169
-rw-r--r--kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt51
-rw-r--r--kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt (renamed from kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt)62
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt (renamed from kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt)13
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/LockFreeTaskQueueLincheckTest.kt (renamed from kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt)34
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt32
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/SegmentListRemoveLincheckTest.kt (renamed from kotlinx-coroutines-core/jvm/test/linearizability/SegmentListRemoveLCStressTest.kt)20
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueLincheckTest.kt (renamed from kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt)11
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt35
-rw-r--r--kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt31
-rw-r--r--kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt34
-rw-r--r--kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt12
-rw-r--r--kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt101
-rw-r--r--kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt19
-rw-r--r--kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt19
134 files changed, 1654 insertions, 1128 deletions
diff --git a/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin b/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin
index 76ee4115..397aaf67 100644
--- a/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin
+++ b/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin
Binary files differ
diff --git a/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro b/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro
index 60c8d612..1a9ae1c7 100644
--- a/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro
+++ b/kotlinx-coroutines-core/jvm/resources/META-INF/proguard/coroutines.pro
@@ -3,12 +3,12 @@
-keepnames class kotlinx.coroutines.CoroutineExceptionHandler {}
# Most of volatile fields are updated with AFU and should not be mangled
--keepclassmembernames class kotlinx.** {
+-keepclassmembers class kotlinx.coroutines.** {
volatile <fields>;
}
# Same story for the standard library's SafeContinuation that also uses AtomicReferenceFieldUpdater
--keepclassmembernames class kotlin.coroutines.SafeContinuation {
+-keepclassmembers class kotlin.coroutines.SafeContinuation {
volatile <fields>;
}
diff --git a/kotlinx-coroutines-core/jvm/src/TimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt
index 4b6fd991..3f7ac675 100644
--- a/kotlinx-coroutines-core/jvm/src/TimeSource.kt
+++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
// Need InlineOnly for efficient bytecode on Android
@@ -10,21 +10,21 @@ package kotlinx.coroutines
import java.util.concurrent.locks.*
import kotlin.internal.InlineOnly
-internal interface TimeSource {
- fun currentTimeMillis(): Long
- fun nanoTime(): Long
- fun wrapTask(block: Runnable): Runnable
- fun trackTask()
- fun unTrackTask()
- fun registerTimeLoopThread()
- fun unregisterTimeLoopThread()
- fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0
- fun unpark(thread: Thread)
+internal abstract class AbstractTimeSource {
+ abstract fun currentTimeMillis(): Long
+ abstract fun nanoTime(): Long
+ abstract fun wrapTask(block: Runnable): Runnable
+ abstract fun trackTask()
+ abstract fun unTrackTask()
+ abstract fun registerTimeLoopThread()
+ abstract fun unregisterTimeLoopThread()
+ abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0
+ abstract fun unpark(thread: Thread)
}
// For tests only
// @JvmField: Don't use JvmField here to enable R8 optimizations via "assumenosideeffects"
-internal var timeSource: TimeSource? = null
+internal var timeSource: AbstractTimeSource? = null
@InlineOnly
internal inline fun currentTimeMillis(): Long =
diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt
index e4504ccd..edb43031 100644
--- a/kotlinx-coroutines-core/jvm/src/Builders.kt
+++ b/kotlinx-coroutines-core/jvm/src/Builders.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@@ -63,13 +63,14 @@ private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val blockedThread: Thread,
private val eventLoop: EventLoop?
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, true, true) {
+
override val isScopedCoroutine: Boolean get() = true
override fun afterCompletion(state: Any?) {
// wake up blocked thread
if (Thread.currentThread() != blockedThread)
- LockSupport.unpark(blockedThread)
+ unpark(blockedThread)
}
@Suppress("UNCHECKED_CAST")
diff --git a/kotlinx-coroutines-core/jvm/src/CommonPool.kt b/kotlinx-coroutines-core/jvm/src/CommonPool.kt
index 22033131..502630b0 100644
--- a/kotlinx-coroutines-core/jvm/src/CommonPool.kt
+++ b/kotlinx-coroutines-core/jvm/src/CommonPool.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -28,7 +28,7 @@ internal object CommonPool : ExecutorCoroutineDispatcher() {
* Note that until Java 10, if an application is run within a container,
* `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores.
*/
- public const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
+ private const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
override val executor: Executor
get() = pool ?: getOrCreatePoolSync()
@@ -62,7 +62,7 @@ internal object CommonPool : ExecutorCoroutineDispatcher() {
?: return createPlainPool() // Fallback to plain thread pool
// Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode
if (!usePrivatePool && requestedParallelism < 0) {
- Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
+ Try { fjpClass.getMethod("commonPool").invoke(null) as? ExecutorService }
?.takeIf { isGoodCommonPool(fjpClass, it) }
?.let { return it }
}
diff --git a/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt b/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt
index 706f6c49..4835f796 100644
--- a/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt
+++ b/kotlinx-coroutines-core/jvm/src/CompletionHandler.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
index 5a69d48a..e91bb9fd 100644
--- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
+++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
@@ -1,13 +1,13 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.scheduling.*
-import java.util.concurrent.atomic.*
import kotlin.coroutines.*
+import kotlin.coroutines.jvm.internal.CoroutineStackFrame
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
@@ -48,6 +48,102 @@ internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, c
}
}
+/**
+ * Executes a block using a context of a given continuation.
+ */
+internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
+ val context = continuation.context
+ val oldValue = updateThreadContext(context, countOrElement)
+ val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
+ // Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
+ continuation.updateUndispatchedCompletion(context, oldValue)
+ } else {
+ null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
+ }
+ try {
+ return block()
+ } finally {
+ if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext()) {
+ restoreThreadContext(context, oldValue)
+ }
+ }
+}
+
+internal fun Continuation<*>.updateUndispatchedCompletion(context: CoroutineContext, oldValue: Any?): UndispatchedCoroutine<*>? {
+ if (this !is CoroutineStackFrame) return null
+ /*
+ * Fast-path to detect whether we have unispatched coroutine at all in our stack.
+ *
+ * Implementation note.
+ * If we ever find that stackwalking for thread-locals is way too slow, here is another idea:
+ * 1) Store undispatched coroutine right in the `UndispatchedMarker` instance
+ * 2) To avoid issues with cross-dispatch boundary, remove `UndispatchedMarker`
+ * from the context when creating dispatched coroutine in `withContext`.
+ * Another option is to "unmark it" instead of removing to save an allocation.
+ * Both options should work, but it requires more careful studying of the performance
+ * and, mostly, maintainability impact.
+ */
+ val potentiallyHasUndispatchedCorotuine = context[UndispatchedMarker] !== null
+ if (!potentiallyHasUndispatchedCorotuine) return null
+ val completion = undispatchedCompletion()
+ completion?.saveThreadContext(context, oldValue)
+ return completion
+}
+
+internal tailrec fun CoroutineStackFrame.undispatchedCompletion(): UndispatchedCoroutine<*>? {
+ // Find direct completion of this continuation
+ val completion: CoroutineStackFrame = when (this) {
+ is DispatchedCoroutine<*> -> return null
+ else -> callerFrame ?: return null // something else -- not supported
+ }
+ if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
+ return completion.undispatchedCompletion() // walk up the call stack with tail call
+}
+
+/**
+ * Marker indicating that [UndispatchedCoroutine] exists somewhere up in the stack.
+ * Used as a performance optimization to avoid stack walking where it is not nesessary.
+ */
+private object UndispatchedMarker: CoroutineContext.Element, CoroutineContext.Key<UndispatchedMarker> {
+ override val key: CoroutineContext.Key<*>
+ get() = this
+}
+
+// Used by withContext when context changes, but dispatcher stays the same
+internal actual class UndispatchedCoroutine<in T>actual constructor (
+ context: CoroutineContext,
+ uCont: Continuation<T>
+) : ScopeCoroutine<T>(if (context[UndispatchedMarker] == null) context + UndispatchedMarker else context, uCont) {
+
+ private var savedContext: CoroutineContext? = null
+ private var savedOldValue: Any? = null
+
+ fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
+ savedContext = context
+ savedOldValue = oldValue
+ }
+
+ fun clearThreadContext(): Boolean {
+ if (savedContext == null) return false
+ savedContext = null
+ savedOldValue = null
+ return true
+ }
+
+ override fun afterResume(state: Any?) {
+ savedContext?.let { context ->
+ restoreThreadContext(context, savedOldValue)
+ savedContext = null
+ savedOldValue = null
+ }
+ // resume undispatched -- update context but stay on the same dispatcher
+ val result = recoverResult(state, uCont)
+ withContinuationContext(uCont, null) {
+ uCont.resumeWith(result)
+ }
+ }
+}
+
internal actual val CoroutineContext.coroutineName: String? get() {
if (!DEBUG) return null
val coroutineId = this[CoroutineId] ?: return null
diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt b/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt
index af37e73c..6d069692 100644
--- a/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt
+++ b/kotlinx-coroutines-core/jvm/src/CoroutineExceptionHandlerImpl.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/Debug.kt b/kotlinx-coroutines-core/jvm/src/Debug.kt
index 8108d235..911914a0 100644
--- a/kotlinx-coroutines-core/jvm/src/Debug.kt
+++ b/kotlinx-coroutines-core/jvm/src/Debug.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
// Need InlineOnly for efficient bytecode on Android
diff --git a/kotlinx-coroutines-core/jvm/src/DebugStrings.kt b/kotlinx-coroutines-core/jvm/src/DebugStrings.kt
index 2ccfebc6..32bd07a7 100644
--- a/kotlinx-coroutines-core/jvm/src/DebugStrings.kt
+++ b/kotlinx-coroutines-core/jvm/src/DebugStrings.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
index 787cbf9c..fe020276 100644
--- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
+++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt
index 8033fb38..d82598ea 100644
--- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt
+++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
@@ -8,7 +8,6 @@ package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.scheduling.*
-import java.util.*
import kotlin.coroutines.*
/**
@@ -108,9 +107,10 @@ public actual object Dispatchers {
*
* ### Implementation note
*
- * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using
- * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread &mdash;
- * typically execution continues in the same thread.
+ * This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using
+ * `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default]
+ * dispatcher does not lead to an actual switching to another thread &mdash; typically execution
+ * continues in the same thread.
* As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used)
* during operations over IO dispatcher.
*/
diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt
index d86f632a..e49c7dc7 100644
--- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt
+++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/Exceptions.kt b/kotlinx-coroutines-core/jvm/src/Exceptions.kt
index 0684ce23..007a0c98 100644
--- a/kotlinx-coroutines-core/jvm/src/Exceptions.kt
+++ b/kotlinx-coroutines-core/jvm/src/Exceptions.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("FunctionName")
diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt
index 8ffc22d8..7ea3cc68 100644
--- a/kotlinx-coroutines-core/jvm/src/Executors.kt
+++ b/kotlinx-coroutines-core/jvm/src/Executors.kt
@@ -1,9 +1,10 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
+import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
@@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
+ * ## Interaction with [delay] and time-based coroutines.
+ *
+ * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
+ * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
+ * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
+ * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
+ *
+ * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
+ * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
+ * to reduce the memory pressure of cancelled coroutines.
+ *
+ * If the executor service is neither of this types, the separate internal thread will be used to
+ * _track_ the delay and time-related executions, but the coroutine itself will still be executed
+ * on top of the given executor.
+ *
+ * ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
@@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
+ * ## Interaction with [delay] and time-based coroutines.
+ *
+ * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
+ * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
+ * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
+ * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
+ *
+ * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
+ * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
+ * to reduce the memory pressure of cancelled coroutines.
+ *
+ * If the executor is neither of this types, the separate internal thread will be used to
+ * _track_ the delay and time-related executions, but the coroutine itself will still be executed
+ * on top of the given executor.
+ *
+ * ## Rejected execution
+ *
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
@@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)
override fun toString(): String = dispatcher.toString()
}
-private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
- init {
- initFutureCancellation()
- }
-}
+internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
-internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
-
- private var removesFutureOnCancellation: Boolean = false
-
- internal fun initFutureCancellation() {
- removesFutureOnCancellation = removeFutureOnCancel(executor)
+ /*
+ * Attempts to reflectively (to be Java 6 compatible) invoke
+ * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
+ * internal scheduler queue on cancellation.
+ */
+ init {
+ removeFutureOnCancel(executor)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
@@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
}
- /*
- * removesFutureOnCancellation is required to avoid memory leak.
- * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
- * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
- */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- val future = if (removesFutureOnCancellation) {
- scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
- } else {
- null
- }
+ val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
+ ResumeUndispatchedRunnable(this, continuation),
+ continuation.context,
+ timeMillis
+ )
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
@@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
- val future = if (removesFutureOnCancellation) {
- scheduleBlock(block, context, timeMillis)
- } else {
- null
- }
+ val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}
- private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
+ private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
- (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
+ schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
@@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
override fun toString(): String = executor.toString()
- override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
+ override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}
diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt
index bd16f49a..b27a9708 100644
--- a/kotlinx-coroutines-core/jvm/src/Future.kt
+++ b/kotlinx-coroutines-core/jvm/src/Future.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@@ -13,42 +13,40 @@ import java.util.concurrent.*
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
- * invokeOnCompletion { future.cancel(false) }
+ * invokeOnCompletion { if (it != null) future.cancel(false) }
* ```
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
- invokeOnCompletion(handler = CancelFutureOnCompletion(this, future)) // TODO make it work only on cancellation as well?
+ invokeOnCompletion(handler = CancelFutureOnCompletion(future))
/**
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
- * invokeOnCancellation { future.cancel(false) }
+ * invokeOnCancellation { if (it != null) future.cancel(false) }
* ```
*/
public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit =
invokeOnCancellation(handler = CancelFutureOnCancel(future))
private class CancelFutureOnCompletion(
- job: Job,
private val future: Future<*>
-) : JobNode<Job>(job) {
+) : JobNode() {
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
- future.cancel(false)
+ if (cause != null) future.cancel(false)
}
- override fun toString() = "CancelFutureOnCompletion[$future]"
}
private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler() {
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
- future.cancel(false)
+ if (cause != null) future.cancel(false)
}
override fun toString() = "CancelFutureOnCancel[$future]"
}
diff --git a/kotlinx-coroutines-core/jvm/src/Interruptible.kt b/kotlinx-coroutines-core/jvm/src/Interruptible.kt
index 070aa624..b873eadf 100644
--- a/kotlinx-coroutines-core/jvm/src/Interruptible.kt
+++ b/kotlinx-coroutines-core/jvm/src/Interruptible.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/Runnable.kt b/kotlinx-coroutines-core/jvm/src/Runnable.kt
index 14d01105..844f9fca 100644
--- a/kotlinx-coroutines-core/jvm/src/Runnable.kt
+++ b/kotlinx-coroutines-core/jvm/src/Runnable.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt b/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt
index 478df822..6a00f45f 100644
--- a/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt
+++ b/kotlinx-coroutines-core/jvm/src/SchedulerTask.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt b/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
index 1fd85110..37fd70a2 100644
--- a/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
+++ b/kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
index aa18cd38..99e3b46c 100644
--- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
+++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
@@ -1,13 +1,11 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
-import kotlinx.coroutines.internal.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
-import kotlin.coroutines.*
/**
* Creates a coroutine execution context using a single thread with built-in [yield] support.
@@ -61,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
@ObsoleteCoroutinesApi
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
- return ThreadPoolDispatcher(nThreads, name)
-}
-
-internal class PoolThread(
- @JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
- target: Runnable, name: String
-) : Thread(target, name) {
- init { isDaemon = true }
-}
-
-/**
- * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
- * created with [newSingleThreadContext] and [newFixedThreadPoolContext].
- */
-internal class ThreadPoolDispatcher internal constructor(
- private val nThreads: Int,
- private val name: String
-) : ExecutorCoroutineDispatcherBase() {
- private val threadNo = AtomicInteger()
-
- override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
- PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
- }
-
- init {
- initFutureCancellation()
+ val threadNo = AtomicInteger()
+ val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
+ val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
+ t.isDaemon = true
+ t
}
-
- /**
- * Closes this dispatcher -- shuts down all threads in this pool and releases resources.
- */
- public override fun close() {
- (executor as ExecutorService).shutdown()
- }
-
- override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
+ return executor.asCoroutineDispatcher()
}
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
index a9054265..4657bc7d 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -127,7 +127,11 @@ private open class ActorCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>,
active: Boolean
-) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
+) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> {
+
+ init {
+ initParentJob(parentContext[Job])
+ }
override fun onCancelling(cause: Throwable?) {
_channel.cancel(cause?.let {
@@ -159,11 +163,17 @@ private class LazyActorCoroutine<E>(
return super.send(element)
}
+ @Suppress("DEPRECATION")
override fun offer(element: E): Boolean {
start()
return super.offer(element)
}
+ override fun trySend(element: E): ChannelResult<Unit> {
+ start()
+ return super.trySend(element)
+ }
+
override fun close(cause: Throwable?): Boolean {
// close the channel _first_
val closed = super.close(cause)
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
index 2c949959..0df8278b 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmMultifileClass
@@ -10,18 +10,87 @@ package kotlinx.coroutines.channels
import kotlinx.coroutines.*
/**
- * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
- * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
+ * **Deprecated** blocking variant of send.
+ * This method is deprecated in the favour of [trySendBlocking].
*
- * This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
- * so this function should not be used from coroutine.
+ * `sendBlocking` is a dangerous primitive &mdash; it throws an exception
+ * if the channel was closed or, more commonly, cancelled.
+ * Cancellation exceptions in non-blocking code are unexpected and frequently
+ * trigger internal failures.
+ *
+ * These bugs are hard-to-spot during code review and they forced users to write
+ * their own wrappers around `sendBlocking`.
+ * So this function is deprecated and replaced with a more explicit primitive.
+ *
+ * The real-world example of broken usage with Firebase:
+ *
+ * ```kotlin
+ * callbackFlow {
+ * val listener = object : ValueEventListener {
+ * override fun onDataChange(snapshot: DataSnapshot) {
+ * // This line may fail and crash the app when the downstream flow is cancelled
+ * sendBlocking(DataSnapshot(snapshot))
+ * }
+ *
+ * override fun onCancelled(error: DatabaseError) {
+ * close(error.toException())
+ * }
+ * }
+ *
+ * firebaseQuery.addValueEventListener(listener)
+ * awaitClose { firebaseQuery.removeEventListener(listener) }
+ * }
+ * ```
*/
+@Deprecated(
+ level = DeprecationLevel.WARNING,
+ message = "Deprecated in the favour of 'trySendBlocking'. " +
+ "Consider handling the result of 'trySendBlocking' explicitly and rethrow exception if necessary",
+ replaceWith = ReplaceWith("trySendBlocking(element)")
+)
public fun <E> SendChannel<E>.sendBlocking(element: E) {
// fast path
- if (offer(element))
+ if (trySend(element).isSuccess)
return
// slow path
runBlocking {
send(element)
}
}
+
+/**
+ * Adds [element] into to this channel, **blocking** the caller while this channel is full,
+ * and returning either [successful][ChannelResult.isSuccess] result when the element was added, or
+ * failed result representing closed channel with a corresponding exception.
+ *
+ * This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching,
+ * so this function should not be used from coroutine.
+ *
+ * Example of usage:
+ *
+ * ```
+ * // From callback API
+ * channel.trySendBlocking(element)
+ * .onSuccess { /* request next element or debug log */ }
+ * .onFailure { t: Throwable? -> /* throw or log */ }
+ * ```
+ *
+ * For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it.
+ *
+ * @throws [InterruptedException] if the current thread is interrupted during the blocking send operation.
+ */
+@Throws(InterruptedException::class)
+public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> {
+ /*
+ * Sent successfully -- bail out.
+ * But failure may indicate either that the channel it full or that
+ * it is close. Go to slow path on failure to simplify the successful path and
+ * to materialize default exception.
+ */
+ trySend(element).onSuccess { return ChannelResult.success(Unit) }
+ return runBlocking {
+ val r = runCatching { send(element) }
+ if (r.isSuccess) ChannelResult.success(Unit)
+ else ChannelResult.closed(r.exceptionOrNull())
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
index 1e6797ac..6c23982e 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -21,13 +21,13 @@ public enum class TickerMode {
* ```
* val channel = ticker(delay = 100)
* delay(350) // 250 ms late
- * println(channel.poll()) // prints Unit
- * println(channel.poll()) // prints null
+ * println(channel.tryReceive().getOrNull()) // prints Unit
+ * println(channel.tryReceive().getOrNull()) // prints null
*
* delay(50)
- * println(channel.poll()) // prints Unit, delay was adjusted
+ * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted
* delay(50)
- * println(channel.poll()) // prints null, we'are not late relatively to previous element
+ * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element
* ```
*/
FIXED_PERIOD,
diff --git a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt
index 5a1a1ed1..8ef0c182 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/AgentPremain.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.debug
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt
index 79f024cc..ffb9c2da 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/ConcurrentWeakMap.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.debug.internal
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt
index 9d9fa3fb..6c353929 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfo.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.debug.internal
@@ -27,4 +27,4 @@ internal class DebugCoroutineInfo(
public val lastObservedFrame: CoroutineStackFrame? = source.lastObservedFrame // field is used as of 1.4-M3
@get:JvmName("lastObservedStackTrace") // method with this name is used as of 1.4-M3
public val lastObservedStackTrace: List<StackTraceElement> = source.lastObservedStackTrace()
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt
index cf007bb9..07b9419f 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugCoroutineInfoImpl.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.debug.internal
@@ -72,7 +72,7 @@ internal class DebugCoroutineInfoImpl(
private fun creationStackTrace(): List<StackTraceElement> {
val bottom = creationStackBottom ?: return emptyList()
// Skip "Coroutine creation stacktrace" frame
- return sequence<StackTraceElement> { yieldFrames(bottom.callerFrame) }.toList()
+ return sequence { yieldFrames(bottom.callerFrame) }.toList()
}
private tailrec suspend fun SequenceScope<StackTraceElement>.yieldFrames(frame: CoroutineStackFrame?) {
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt
new file mode 100644
index 00000000..8dc5b7c2
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("unused")
+
+package kotlinx.coroutines.debug.internal
+
+import kotlin.coroutines.*
+
+/*
+ * This class is used by ByteBuddy from kotlinx-coroutines-debug as kotlin.coroutines.jvm.internal.DebugProbesKt replacement.
+ * In theory, it should belong to kotlinx-coroutines-debug, but placing it here significantly simplifies the
+ * Android AS debugger that does on-load DEX transformation
+ */
+
+// Stubs which are injected as coroutine probes. Require direct match of signatures
+internal fun probeCoroutineResumed(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineResumed(frame)
+
+internal fun probeCoroutineSuspended(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineSuspended(frame)
+internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> =
+ DebugProbesImpl.probeCoroutineCreated(completion)
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
index 83bc02c6..05befc1a 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.debug.internal
@@ -282,7 +282,7 @@ internal object DebugProbesImpl {
it.fileName == "ContinuationImpl.kt"
}
- val (continuationStartFrame, frameSkipped) = findContinuationStartIndex(
+ val (continuationStartFrame, delta) = findContinuationStartIndex(
indexOfResumeWith,
actualTrace,
coroutineTrace
@@ -290,7 +290,6 @@ internal object DebugProbesImpl {
if (continuationStartFrame == -1) return coroutineTrace
- val delta = if (frameSkipped) 1 else 0
val expectedSize = indexOfResumeWith + coroutineTrace.size - continuationStartFrame - 1 - delta
val result = ArrayList<StackTraceElement>(expectedSize)
for (index in 0 until indexOfResumeWith - delta) {
@@ -312,16 +311,22 @@ internal object DebugProbesImpl {
* If method above `resumeWith` has no line number (thus it is `stateMachine.invokeSuspend`),
* it's skipped and attempt to match next one is made because state machine could have been missing in the original coroutine stacktrace.
*
- * Returns index of such frame (or -1) and flag indicating whether frame with state machine was skipped
+ * Returns index of such frame (or -1) and number of skipped frames (up to 2, for state machine and for access$).
*/
private fun findContinuationStartIndex(
indexOfResumeWith: Int,
actualTrace: Array<StackTraceElement>,
coroutineTrace: List<StackTraceElement>
- ): Pair<Int, Boolean> {
- val result = findIndexOfFrame(indexOfResumeWith - 1, actualTrace, coroutineTrace)
- if (result == -1) return findIndexOfFrame(indexOfResumeWith - 2, actualTrace, coroutineTrace) to true
- return result to false
+ ): Pair<Int, Int> {
+ /*
+ * Since Kotlin 1.5.0 we have these access$ methods that we have to skip.
+ * So we have to test next frame for invokeSuspend, for $access and for actual suspending call.
+ */
+ repeat(3) {
+ val result = findIndexOfFrame(indexOfResumeWith - 1 - it, actualTrace, coroutineTrace)
+ if (result != -1) return result to it
+ }
+ return -1 to 0
}
private fun findIndexOfFrame(
@@ -477,33 +482,40 @@ internal object DebugProbesImpl {
/*
* Trim intervals of internal methods from the stacktrace (bounds are excluded from trimming)
- * E.g. for sequence [e, i1, i2, i3, e, i4, e, i5, i6, e7]
+ * E.g. for sequence [e, i1, i2, i3, e, i4, e, i5, i6, i7]
* output will be [e, i1, i3, e, i4, e, i5, i7]
+ *
+ * If an interval of internal methods ends in a synthetic method, the outermost non-synthetic method in that
+ * interval will also be included.
*/
val result = ArrayList<StackTraceElement>(size - probeIndex + 1)
result += createArtificialFrame(ARTIFICIAL_FRAME_MESSAGE)
- var includeInternalFrame = true
- for (i in (probeIndex + 1) until size - 1) {
- val element = stackTrace[i]
- if (!element.isInternalMethod) {
- includeInternalFrame = true
- result += element
- continue
- }
-
- if (includeInternalFrame) {
- result += element
- includeInternalFrame = false
- } else if (stackTrace[i + 1].isInternalMethod) {
- continue
+ var i = probeIndex + 1
+ while (i < size) {
+ if (stackTrace[i].isInternalMethod) {
+ result += stackTrace[i] // we include the boundary of the span in any case
+ // first index past the end of the span of internal methods that starts from `i`
+ var j = i + 1
+ while (j < size && stackTrace[j].isInternalMethod) {
+ ++j
+ }
+ // index of the last non-synthetic internal methods in this span, or `i` if there are no such methods
+ var k = j - 1
+ while (k > i && stackTrace[k].fileName == null) {
+ k -= 1
+ }
+ if (k > i && k < j - 1) {
+ /* there are synthetic internal methods at the end of this span, but there is a non-synthetic method
+ after `i`, so we include it. */
+ result += stackTrace[k]
+ }
+ result += stackTrace[j - 1] // we include the other boundary of this span in any case, too
+ i = j
} else {
- result += element
- includeInternalFrame = true
+ result += stackTrace[i]
+ ++i
}
-
}
-
- result += stackTrace[size - 1]
return result
}
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt
index 3e9533b0..0b63cd23 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebuggerInfo.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("UNUSED")
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt
index 37c60eeb..40101192 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/StackTraceFrame.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.debug.internal
@@ -14,4 +14,4 @@ internal class StackTraceFrame(
private val stackTraceElement: StackTraceElement
) : CoroutineStackFrame {
override fun getStackTraceElement(): StackTraceElement = stackTraceElement
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt
index 4fb958ac..d178060d 100644
--- a/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/FlowExceptions.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow.internal
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
index ab42b634..ea973287 100644
--- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow.internal
@@ -36,7 +36,7 @@ internal actual class SafeCollector<T> actual constructor(
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext
- override fun invokeSuspend(result: Result<Any?>): Any? {
+ override fun invokeSuspend(result: Result<Any?>): Any {
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
diff --git a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt
index 75b668a3..050b9747 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
@@ -9,7 +9,7 @@ import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.withLock as withLockJvm
-internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteArrayList<E>()
+internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteArrayList()
@Suppress("ACTUAL_WITHOUT_EXPECT")
internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock
diff --git a/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt b/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt
index a03163db..60328ebd 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/ExceptionsConstuctor.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
diff --git a/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt b/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt
index 30cd09ef..e93de2aa 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
diff --git a/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt
index f508749e..9dda30b5 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/LocalAtomics.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
index 97f99781..9bbc6dc9 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("NO_EXPLICIT_VISIBILITY_IN_API_MODE")
@@ -11,13 +11,13 @@ import kotlinx.coroutines.*
private typealias Node = LockFreeLinkedListNode
@PublishedApi
-internal const val UNDECIDED = 0
+internal const val UNDECIDED: Int = 0
@PublishedApi
-internal const val SUCCESS = 1
+internal const val SUCCESS: Int = 1
@PublishedApi
-internal const val FAILURE = 2
+internal const val FAILURE: Int = 2
@PublishedApi
internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE")
@@ -259,7 +259,7 @@ public actual open class LockFreeLinkedListNode {
// Helps with removal of this node
public actual fun helpRemove() {
// Note: this node must be already removed
- (next as Removed).ref.correctPrev(null)
+ (next as Removed).ref.helpRemovePrev()
}
// Helps with removal of nodes that are previous to this
@@ -322,7 +322,7 @@ public actual open class LockFreeLinkedListNode {
private val _affectedNode = atomic<Node?>(null)
final override val affectedNode: Node? get() = _affectedNode.value
- final override val originalNext: Node? get() = queue
+ final override val originalNext: Node get() = queue
override fun retry(affected: Node, next: Any): Boolean = next !== queue
diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
index 5b2b9ff6..2d447413 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
@@ -35,7 +35,7 @@ internal object MainDispatcherLoader {
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
- factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
+ factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
@@ -67,7 +67,10 @@ public fun MainCoroutineDispatcher.isMissing(): Boolean = this is MissingMainCor
@Suppress("MayBeConstant")
private val SUPPORT_MISSING = true
-@Suppress("ConstantConditionIf")
+@Suppress(
+ "ConstantConditionIf",
+ "IMPLICIT_NOTHING_TYPE_ARGUMENT_AGAINST_NOT_NOTHING_EXPECTED_TYPE" // KT-47626
+)
private fun createMissingDispatcher(cause: Throwable? = null, errorHint: String? = null) =
if (SUPPORT_MISSING) MissingMainCoroutineDispatcher(cause, errorHint) else
cause?.let { throw it } ?: throwMissingMainDispatcherException()
diff --git a/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt b/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt
index 2f4d1e05..48e01fbe 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/ProbesSupport.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("NOTHING_TO_INLINE", "INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
diff --git a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
index 208d3f2e..6153862e 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
@@ -1,8 +1,8 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-@file:Suppress("UNCHECKED_CAST", "NO_EXPLICIT_VISIBILITY_IN_API_MODE")
+@file:Suppress("UNCHECKED_CAST")
package kotlinx.coroutines.internal
@@ -29,7 +29,7 @@ private val stackTraceRecoveryClassName = runCatching {
internal actual fun <E : Throwable> recoverStackTrace(exception: E): E {
if (!RECOVER_STACK_TRACES) return exception
// No unwrapping on continuation-less path: exception is not reported multiple times via slow paths
- val copy = tryCopyException(exception) ?: return exception
+ val copy = tryCopyAndVerify(exception) ?: return exception
return copy.sanitizeStackTrace()
}
@@ -66,9 +66,7 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co
val (cause, recoveredStacktrace) = exception.causeAndStacktrace()
// Try to create an exception of the same type and get stacktrace from continuation
- val newException = tryCopyException(cause) ?: return exception
- // Verify that the new exception has the same message as the original one (bail out if not, see #1631)
- if (newException.message != cause.message) return exception
+ val newException = tryCopyAndVerify(cause) ?: return exception
// Update stacktrace
val stacktrace = createStackTrace(continuation)
if (stacktrace.isEmpty()) return exception
@@ -80,6 +78,14 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co
return createFinalException(cause, newException, stacktrace)
}
+private fun <E : Throwable> tryCopyAndVerify(exception: E): E? {
+ val newException = tryCopyException(exception) ?: return null
+ // Verify that the new exception has the same message as the original one (bail out if not, see #1631)
+ // CopyableThrowable has control over its message and thus can modify it the way it wants
+ if (exception !is CopyableThrowable<*> && newException.message != exception.message) return null
+ return newException
+}
+
/*
* Here we partially copy original exception stackTrace to make current one much prettier.
* E.g. for
@@ -210,6 +216,7 @@ internal actual typealias CoroutineStackFrame = kotlin.coroutines.jvm.internal.C
@Suppress("ACTUAL_WITHOUT_EXPECT")
internal actual typealias StackTraceElement = java.lang.StackTraceElement
+@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
internal actual fun Throwable.initCause(cause: Throwable) {
// Resolved to member, verified by test
initCause(cause)
diff --git a/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt b/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt
index 2b57b26c..6284f3a0 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/Synchronized.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
diff --git a/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt b/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt
index bf34c1a9..73853720 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/SystemProps.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:JvmName("SystemPropsKt")
diff --git a/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt b/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt
index 9d9d30e4..8536cef6 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
@@ -7,17 +7,26 @@ package kotlinx.coroutines.internal
import kotlinx.coroutines.*
import kotlin.coroutines.*
-
-private val ZERO = Symbol("ZERO")
+@JvmField
+internal val NO_THREAD_ELEMENTS = Symbol("NO_THREAD_ELEMENTS")
// Used when there are >= 2 active elements in the context
-private class ThreadState(val context: CoroutineContext, n: Int) {
- private var a = arrayOfNulls<Any>(n)
+@Suppress("UNCHECKED_CAST")
+private class ThreadState(@JvmField val context: CoroutineContext, n: Int) {
+ private val values = arrayOfNulls<Any>(n)
+ private val elements = arrayOfNulls<ThreadContextElement<Any?>>(n)
private var i = 0
- fun append(value: Any?) { a[i++] = value }
- fun take() = a[i++]
- fun start() { i = 0 }
+ fun append(element: ThreadContextElement<*>, value: Any?) {
+ values[i] = value
+ elements[i++] = element as ThreadContextElement<Any?>
+ }
+
+ fun restore(context: CoroutineContext) {
+ for (i in elements.indices.reversed()) {
+ elements[i]!!.restoreThreadContext(context, values[i])
+ }
+ }
}
// Counts ThreadContextElements in the context
@@ -42,17 +51,7 @@ private val findOne =
private val updateState =
fun (state: ThreadState, element: CoroutineContext.Element): ThreadState {
if (element is ThreadContextElement<*>) {
- state.append(element.updateThreadContext(state.context))
- }
- return state
- }
-
-// Restores state for all ThreadContextElements in the context from the given ThreadState
-private val restoreState =
- fun (state: ThreadState, element: CoroutineContext.Element): ThreadState {
- @Suppress("UNCHECKED_CAST")
- if (element is ThreadContextElement<*>) {
- (element as ThreadContextElement<Any?>).restoreThreadContext(state.context, state.take())
+ state.append(element, element.updateThreadContext(state.context))
}
return state
}
@@ -60,12 +59,13 @@ private val restoreState =
internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!!
// countOrElement is pre-cached in dispatched continuation
+// returns NO_THREAD_ELEMENTS if the contest does not have any ThreadContextElements
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
@Suppress("NAME_SHADOWING")
val countOrElement = countOrElement ?: threadContextElements(context)
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
return when {
- countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements
+ countOrElement === 0 -> NO_THREAD_ELEMENTS // very fast path when there are no active ThreadContextElements
// ^^^ identity comparison for speed, we know zero always has the same identity
countOrElement is Int -> {
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
@@ -82,11 +82,10 @@ internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?
internal fun restoreThreadContext(context: CoroutineContext, oldState: Any?) {
when {
- oldState === ZERO -> return // very fast path when there are no ThreadContextElements
+ oldState === NO_THREAD_ELEMENTS -> return // very fast path when there are no ThreadContextElements
oldState is ThreadState -> {
// slow path with multiple stored ThreadContextElements
- oldState.start()
- context.fold(oldState, restoreState)
+ oldState.restore(context)
}
else -> {
// fast path for one ThreadContextElement, but need to find it
diff --git a/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt b/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt
index ff0970a2..0207334a 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/ThreadLocal.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
index 62cf80f7..84d9d9f8 100644
--- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
+++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.scheduling
@@ -146,7 +146,7 @@ internal class CoroutineScheduler(
*
* Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
*/
- internal fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
+ fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
parkedWorkersStack.loop { top ->
val index = (top and PARKED_INDEX_MASK).toInt()
val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
@@ -169,12 +169,12 @@ internal class CoroutineScheduler(
* It does nothing is this worker is already physically linked to the stack.
* This method is invoked only from the worker thread itself.
* This invocation always precedes [LockSupport.parkNanos].
- * See [Worker.doPark].
+ * See [Worker.tryPark].
*
* Returns `true` if worker was added to the stack by this invocation, `false` if it was already
* registered in the stack.
*/
- internal fun parkedWorkersStackPush(worker: Worker): Boolean {
+ fun parkedWorkersStackPush(worker: Worker): Boolean {
if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out
/*
* The below loop can be entered only if this worker was not in the stack and, since no other thread
@@ -403,7 +403,7 @@ internal class CoroutineScheduler(
}
}
- internal fun createTask(block: Runnable, taskContext: TaskContext): Task {
+ fun createTask(block: Runnable, taskContext: TaskContext): Task {
val nanoTime = schedulerTimeSource.nanoTime()
if (block is Task) {
block.submissionTime = nanoTime
@@ -422,7 +422,7 @@ internal class CoroutineScheduler(
tryUnpark() // Try unpark again in case there was race between permit release and parking
}
- internal fun signalCpuWork() {
+ fun signalCpuWork() {
if (tryUnpark()) return
if (tryCreateWorker()) return
tryUnpark()
@@ -654,7 +654,7 @@ internal class CoroutineScheduler(
* Releases CPU token if worker has any and changes state to [newState].
* Returns `true` if CPU permit was returned to the pool
*/
- internal fun tryReleaseCpu(newState: WorkerState): Boolean {
+ fun tryReleaseCpu(newState: WorkerState): Boolean {
val previousState = state
val hadCpu = previousState == WorkerState.CPU_ACQUIRED
if (hadCpu) releaseCpuPermit()
@@ -721,7 +721,19 @@ internal class CoroutineScheduler(
}
assert { localQueue.size == 0 }
workerCtl.value = PARKED // Update value once
- while (inStack()) { // Prevent spurious wakeups
+ /*
+ * inStack() prevents spurious wakeups, while workerCtl.value == PARKED
+ * prevents the following race:
+ *
+ * - T2 scans the queue, adds itself to the stack, goes to rescan
+ * - T2 suspends in 'workerCtl.value = PARKED' line
+ * - T1 pops T2 from the stack, claims workerCtl, suspends
+ * - T2 fails 'while (inStack())' check, goes to full rescan
+ * - T2 adds itself to the stack, parks
+ * - T1 unparks T2, bails out with success
+ * - T2 unparks and loops in 'while (inStack())'
+ */
+ while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
if (isTerminated || state == WorkerState.TERMINATED) break
tryReleaseCpu(WorkerState.PARKING)
interrupted() // Cleanup interruptions
@@ -762,7 +774,7 @@ internal class CoroutineScheduler(
* Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
* ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
*/
- internal fun nextInt(upperBound: Int): Int {
+ fun nextInt(upperBound: Int): Int {
var r = rngState
r = r xor (r shl 13)
r = r xor (r shr 17)
diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
index 202c6e1d..7227b07c 100644
--- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
+++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.scheduling
diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
index a317b975..da867c98 100644
--- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
+++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.scheduling
@@ -52,7 +52,7 @@ internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
)
@JvmField
-internal var schedulerTimeSource: TimeSource = NanoTimeSource
+internal var schedulerTimeSource: SchedulerTimeSource = NanoTimeSource
/**
* Marker indicating that task is CPU-bound and will not block
@@ -108,10 +108,11 @@ internal class TaskImpl(
// Open for tests
internal class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false)
-internal abstract class TimeSource {
+// Was previously TimeSource, renamed due to KT-42625 and KT-23727
+internal abstract class SchedulerTimeSource {
abstract fun nanoTime(): Long
}
-internal object NanoTimeSource : TimeSource() {
+internal object NanoTimeSource : SchedulerTimeSource() {
override fun nanoTime() = System.nanoTime()
}
diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
index 354b3a1b..6a9a8a5a 100644
--- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
+++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.scheduling
diff --git a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt
index 649c9537..8526ca21 100644
--- a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt
+++ b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.test
diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt
deleted file mode 100644
index cfed5af4..00000000
--- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350
- (Coroutine boundary)
- at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:170)
- at kotlinx.coroutines.channels.ChannelCoroutine.offer(ChannelCoroutine.kt)
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:153)
-Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350
- at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:599)
- at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:164)
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:151)
- at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt
deleted file mode 100644
index ac8f5f4e..00000000
--- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt
+++ /dev/null
@@ -1,8 +0,0 @@
-kotlinx.coroutines.RecoverableTestException
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43)
- (Coroutine boundary)
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest.channelReceiveOrNull(StackTraceRecoveryChannelsTest.kt:70)
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:44)
-Caused by: kotlinx.coroutines.RecoverableTestException
- at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43)
- at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt
index 2d480861..3bfd08e5 100644
--- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt
+++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt
@@ -1,7 +1,7 @@
kotlinx.coroutines.RecoverableTestException
- at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$$inlined$select$lambda$1.invokeSuspend(StackTraceRecoverySelectTest.kt:33)
+ at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$2$1.invokeSuspend(StackTraceRecoverySelectTest.kt)
(Coroutine boundary)
- at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$testSelectJoin$1.invokeSuspend(StackTraceRecoverySelectTest.kt:20)
+ at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$testSelectJoin$1.invokeSuspend(StackTraceRecoverySelectTest.kt)
Caused by: kotlinx.coroutines.RecoverableTestException
- at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$$inlined$select$lambda$1.invokeSuspend(StackTraceRecoverySelectTest.kt:33)
- at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) \ No newline at end of file
+ at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$2$1.invokeSuspend(StackTraceRecoverySelectTest.kt)
+ at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt)
diff --git a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt
new file mode 100644
index 00000000..89bbbfd7
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+package kotlinx.coroutines
+
+import org.jetbrains.kotlinx.lincheck.*
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
+import org.jetbrains.kotlinx.lincheck.strategy.stress.*
+import org.jetbrains.kotlinx.lincheck.verifier.*
+import org.junit.*
+
+abstract class AbstractLincheckTest : VerifierState() {
+ open fun <O: Options<O, *>> O.customize(isStressTest: Boolean): O = this
+ open fun ModelCheckingOptions.customize(isStressTest: Boolean): ModelCheckingOptions = this
+ open fun StressOptions.customize(isStressTest: Boolean): StressOptions = this
+
+ @Test
+ fun modelCheckingTest() = ModelCheckingOptions()
+ .iterations(if (isStressTest) 100 else 20)
+ .invocationsPerIteration(if (isStressTest) 10_000 else 1_000)
+ .commonConfiguration()
+ .customize(isStressTest)
+ .check(this::class)
+
+ @Test
+ fun stressTest() = StressOptions()
+ .iterations(if (isStressTest) 100 else 20)
+ .invocationsPerIteration(if (isStressTest) 10_000 else 1_000)
+ .commonConfiguration()
+ .customize(isStressTest)
+ .check(this::class)
+
+ private fun <O : Options<O, *>> O.commonConfiguration(): O = this
+ .actorsBefore(if (isStressTest) 3 else 1)
+ .threads(3)
+ .actorsPerThread(if (isStressTest) 4 else 2)
+ .actorsAfter(if (isStressTest) 3 else 0)
+ .customize(isStressTest)
+
+ override fun extractState(): Any = error("Not implemented")
+}
diff --git a/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt b/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt
index 02675208..59ff76a5 100644
--- a/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/AsyncJvmTest.kt
@@ -8,7 +8,10 @@ import kotlin.test.*
class AsyncJvmTest : TestBase() {
- // This must be a common test but it fails on JS because of KT-21961
+ // We have the same test in common module, but the maintainer uses this particular file
+ // and semi-automatically types cmd+N + AsyncJvm in order to duck-tape any JVM samples/repros,
+ // please do not remove this test
+
@Test
fun testAsyncWithFinally() = runTest {
expect(1)
diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
index 55c05c55..c7c2c04e 100644
--- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() {
private fun keepMe(a: ByteArray) {
// does nothing, makes sure the variable is kept in state-machine
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt
new file mode 100644
index 00000000..dbe9cb37
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import org.junit.Test
+import java.lang.Runnable
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ExecutorAsCoroutineDispatcherDelayTest : TestBase() {
+
+ private var callsToSchedule = 0
+
+ private inner class STPE : ScheduledThreadPoolExecutor(1) {
+ override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
+ if (delay != 0L) ++callsToSchedule
+ return super.schedule(command, delay, unit)
+ }
+ }
+
+ private inner class SES : ScheduledExecutorService by STPE()
+
+ @Test
+ fun testScheduledThreadPool() = runTest {
+ val executor = STPE()
+ withContext(executor.asCoroutineDispatcher()) {
+ delay(100)
+ }
+ executor.shutdown()
+ assertEquals(1, callsToSchedule)
+ }
+
+ @Test
+ fun testScheduledExecutorService() = runTest {
+ val executor = SES()
+ withContext(executor.asCoroutineDispatcher()) {
+ delay(100)
+ }
+ executor.shutdown()
+ assertEquals(1, callsToSchedule)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt
index 15cb83ce..8a7878c9 100644
--- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt
@@ -70,8 +70,18 @@ class FailFastOnStartTest : TestBase() {
val actor = actor<Int>(Dispatchers.Main, start = CoroutineStart.LAZY) { fail() }
actor.send(1)
}
-
+
private fun mainException(e: Throwable): Boolean {
return e is IllegalStateException && e.message?.contains("Module with the Main dispatcher is missing") ?: false
}
+
+ @Test
+ fun testProduceNonChild() = runTest(expected = ::mainException) {
+ produce<Int>(Job() + Dispatchers.Main) { fail() }
+ }
+
+ @Test
+ fun testAsyncNonChild() = runTest(expected = ::mainException) {
+ async<Int>(Job() + Dispatchers.Main) { fail() }
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt
index c9f722a5..04b0ba54 100644
--- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt
@@ -33,7 +33,7 @@ class FailingCoroutinesMachineryTest(
private var caught: Throwable? = null
private val latch = CountDownLatch(1)
- private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t;latch.countDown() }
+ private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t; latch.countDown() }
private val lazyOuterDispatcher = lazy { newFixedThreadPoolContext(1, "") }
private object FailingUpdate : ThreadContextElement<Unit> {
@@ -115,14 +115,20 @@ class FailingCoroutinesMachineryTest(
@Test
fun testElement() = runTest {
- launch(NonCancellable + dispatcher.value + exceptionHandler + element) {}
+ // Top-level throwing dispatcher may rethrow an exception right here
+ runCatching {
+ launch(NonCancellable + dispatcher.value + exceptionHandler + element) {}
+ }
checkException()
}
@Test
fun testNestedElement() = runTest {
- launch(NonCancellable + dispatcher.value + exceptionHandler) {
- launch(element) { }
+ // Top-level throwing dispatcher may rethrow an exception right here
+ runCatching {
+ launch(NonCancellable + dispatcher.value + exceptionHandler) {
+ launch(element) { }
+ }
}
checkException()
}
diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
index e8079ebd..179b2e5e 100644
--- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
+++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -11,7 +11,6 @@ import java.util.*
import java.util.Collections.*
import java.util.concurrent.atomic.*
import java.util.concurrent.locks.*
-import kotlin.collections.ArrayList
import kotlin.test.*
object FieldWalker {
@@ -56,7 +55,7 @@ object FieldWalker {
* Reflectively starts to walk through object graph and map to all the reached object to their path
* in from root. Use [showPath] do display a path if needed.
*/
- private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
+ private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
val visited = IdentityHashMap<Any, Ref>()
if (root == null) return visited
visited[root] = Ref.RootRef
@@ -90,6 +89,7 @@ object FieldWalker {
cur = ref.parent
path += "[${ref.index}]"
}
+ else -> error("Should not be reached")
}
}
path.reverse()
@@ -154,8 +154,9 @@ object FieldWalker {
while (true) {
val fields = type.declaredFields.filter {
!it.type.isPrimitive
- && (statics || !Modifier.isStatic(it.modifiers))
- && !(it.type.isArray && it.type.componentType.isPrimitive)
+ && (statics || !Modifier.isStatic(it.modifiers))
+ && !(it.type.isArray && it.type.componentType.isPrimitive)
+ && it.name != "previousOut" // System.out from TestBase that we store in a field to restore later
}
fields.forEach { it.isAccessible = true } // make them all accessible
result.addAll(fields)
diff --git a/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt
new file mode 100644
index 00000000..6bbfdd1b
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import org.junit.Test
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class IntellijIdeaDebuggerEvaluatorCompatibilityTest {
+
+ /*
+ * This test verifies that our CoroutineScope is accessible to IDEA debugger.
+ *
+ * Consider the following scenario:
+ * ```
+ * runBlocking<Unit> { // this: CoroutineScope
+ * println("runBlocking")
+ * }
+ * ```
+ * user puts breakpoint to `println` line, opens "Evaluate" window
+ * and executes `launch { println("launch") }`. They (obviously) expect it to work, but
+ * it won't: `{}` in `runBlocking` is `SuspendLambda` and `this` is an unused implicit receiver
+ * that is removed by the compiler (because it's unused).
+ *
+ * But we still want to provide consistent user experience for functions with `CoroutineScope` receiver,
+ * for that IDEA debugger tries to retrieve the scope via `kotlin.coroutines.coroutineContext[Job] as? CoroutineScope`
+ * and with this test we're fixing this behaviour.
+ *
+ * Note that this behaviour is not carved in stone: IDEA fallbacks to `kotlin.coroutines.coroutineContext` for the context if necessary.
+ */
+
+ @Test
+ fun testScopeIsAccessible() = runBlocking<Unit> {
+ verify()
+
+ withContext(Job()) {
+ verify()
+ }
+
+ coroutineScope {
+ verify()
+ }
+
+ supervisorScope {
+ verify()
+ }
+
+ }
+
+ private suspend fun verify() {
+ val ctx = coroutineContext
+ assertTrue { ctx.job is CoroutineScope }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt
index 5090e7c0..6d474185 100644
--- a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt b/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt
deleted file mode 100644
index 62ded9f9..00000000
--- a/kotlinx-coroutines-core/jvm/test/LCStressOptionsDefault.kt
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-package kotlinx.coroutines
-
-import org.jetbrains.kotlinx.lincheck.*
-import org.jetbrains.kotlinx.lincheck.strategy.stress.*
-import kotlin.reflect.*
-
-class LCStressOptionsDefault : StressOptions() {
- init {
- iterations(100 * stressTestMultiplierCbrt)
- invocationsPerIteration(1000 * stressTestMultiplierCbrt)
- actorsBefore(if (isStressTest) 3 else 0)
- threads(3)
- actorsPerThread(if (isStressTest) 3 else 2)
- }
-}
-
-fun Options<*,*>.check(testClass: KClass<*>) = LinChecker.check(testClass.java, this) \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt
new file mode 100644
index 00000000..8a20e084
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.channels.*
+import org.junit.Test
+import kotlin.test.*
+
+class ReusableCancellableContinuationLeakStressTest : TestBase() {
+
+ @Suppress("UnnecessaryVariable")
+ private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
+ val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
+ return r
+ }
+
+ private val iterations = 100_000 * stressTestMultiplier
+
+ class Leak(val i: Int)
+
+ @Test // Simplified version of #2564
+ fun testReusableContinuationLeak() = runTest {
+ val channel = produce(capacity = 1) { // from the main thread
+ (0 until iterations).forEach {
+ send(Leak(it))
+ }
+ }
+
+ launch(Dispatchers.Default) {
+ repeat (iterations) {
+ val value = channel.receiveBatch()
+ assertEquals(it, value.i)
+ }
+ (channel as Job).join()
+
+ FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
index 56f1e283..06839f4a 100644
--- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
@@ -39,7 +39,7 @@ class ReusableCancellableContinuationTest : TestBase() {
repeat(iterations) {
suspender {
- assertTrue(channel.offer(it))
+ assertTrue(channel.trySend(it).isSuccess)
}
}
channel.close()
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt
new file mode 100644
index 00000000..a256815d
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.flow.*
+import org.junit.*
+
+class ReusableContinuationStressTest : TestBase() {
+
+ private val iterations = 1000 * stressTestMultiplierSqrt
+
+ @Test // Originally reported by @denis-bezrukov in #2736
+ fun testDebounceWithStateFlow() = runBlocking<Unit> {
+ withContext(Dispatchers.Default) {
+ repeat(iterations) {
+ launch { // <- load the dispatcher and OS scheduler
+ runStressTestOnce(1, 1)
+ }
+ }
+ }
+ }
+
+ private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope {
+ val stateFlow = MutableStateFlow(0)
+ val emitter = launch {
+ repeat(1000) { i ->
+ stateFlow.emit(i)
+ delay(delay.toLong())
+ }
+ }
+ var last = 0
+ stateFlow.debounce(debounce.toLong()).take(100).collect { i ->
+ if (i - last > 100) {
+ last = i
+ }
+ }
+ emitter.cancel()
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
index e20362ff..de38df6b 100644
--- a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -171,4 +171,15 @@ class RunBlockingTest : TestBase() {
}
rb.hashCode() // unused
}
+
+ @Test
+ fun testCancelledParent() {
+ val job = Job()
+ job.cancel()
+ assertFailsWith<CancellationException> {
+ runBlocking(job) {
+ expectUnreached()
+ }
+ }
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
index e755b17d..49c93c7f 100644
--- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
@@ -41,7 +41,7 @@ class RunInterruptibleTest : TestBase() {
val job = launch {
runInterruptible(Dispatchers.IO) {
expect(2)
- latch.offer(Unit)
+ latch.trySend(Unit)
try {
Thread.sleep(10_000L)
expectUnreached()
diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt
index 17238e87..61a2c8b8 100644
--- a/kotlinx-coroutines-core/jvm/test/TestBase.kt
+++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt
@@ -7,11 +7,10 @@ package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.scheduling.*
import org.junit.*
-import java.lang.Math.*
+import java.io.*
import java.util.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
-import kotlin.math.*
import kotlin.test.*
private val VERBOSE = systemProp("test.verbose", false)
@@ -23,12 +22,15 @@ public actual val isStressTest = System.getProperty("stressTest")?.toBoolean() ?
public val stressTestMultiplierSqrt = if (isStressTest) 5 else 1
+private const val SHUTDOWN_TIMEOUT = 1_000L // 1s at most to wait per thread
+
/**
* Multiply various constants in stress tests by this factor, so that they run longer during nightly stress test.
*/
public actual val stressTestMultiplier = stressTestMultiplierSqrt * stressTestMultiplierSqrt
-public val stressTestMultiplierCbrt = cbrt(stressTestMultiplier.toDouble()).roundToInt()
+@Suppress("ACTUAL_WITHOUT_EXPECT")
+public actual typealias TestResult = Unit
/**
* Base class for tests, so that tests for predictable scheduling of actions in multiple coroutines sharing a single
@@ -49,7 +51,11 @@ public val stressTestMultiplierCbrt = cbrt(stressTestMultiplier.toDouble()).roun
* }
* ```
*/
-public actual open class TestBase actual constructor() {
+public actual open class TestBase(private var disableOutCheck: Boolean) {
+
+ actual constructor(): this(false)
+
+ public actual val isBoundByJsTestTimeout = false
private var actionIndex = AtomicInteger()
private var finished = AtomicBoolean()
private var error = AtomicReference<Throwable>()
@@ -58,9 +64,15 @@ public actual open class TestBase actual constructor() {
private lateinit var threadsBefore: Set<Thread>
private val uncaughtExceptions = Collections.synchronizedList(ArrayList<Throwable>())
private var originalUncaughtExceptionHandler: Thread.UncaughtExceptionHandler? = null
- private val SHUTDOWN_TIMEOUT = 1_000L // 1s at most to wait per thread
+ /*
+ * System.out that we redefine in order to catch any debugging/diagnostics
+ * 'println' from main source set.
+ * NB: We do rely on the name 'previousOut' in the FieldWalker in order to skip its
+ * processing
+ */
+ private lateinit var previousOut: PrintStream
- /**
+ /**
* Throws [IllegalStateException] like `error` in stdlib, but also ensures that the test will not
* complete successfully even if this exception is consumed somewhere in the test.
*/
@@ -113,7 +125,7 @@ public actual open class TestBase actual constructor() {
}
/**
- * Asserts that this it the last action in the test. It must be invoked by any test that used [expect].
+ * Asserts that this is the last action in the test. It must be invoked by any test that used [expect].
*/
public actual fun finish(index: Int) {
expect(index)
@@ -133,6 +145,16 @@ public actual open class TestBase actual constructor() {
finished.set(false)
}
+ private object TestOutputStream : PrintStream(object : OutputStream() {
+ override fun write(b: Int) {
+ error("Detected unexpected call to 'println' from source code")
+ }
+ })
+
+ fun println(message: Any?) {
+ previousOut.println(message)
+ }
+
@Before
fun before() {
initPoolsBeforeTest()
@@ -143,6 +165,10 @@ public actual open class TestBase actual constructor() {
e.printStackTrace()
uncaughtExceptions.add(e)
}
+ if (!disableOutCheck) {
+ previousOut = System.out
+ System.setOut(TestOutputStream)
+ }
}
@After
@@ -154,7 +180,7 @@ public actual open class TestBase actual constructor() {
}
// Shutdown all thread pools
shutdownPoolsAfterTest()
- // Check that that are now leftover threads
+ // Check that are now leftover threads
runCatching {
checkTestThreads(threadsBefore)
}.onFailure {
@@ -162,6 +188,9 @@ public actual open class TestBase actual constructor() {
}
// Restore original uncaught exception handler
Thread.setDefaultUncaughtExceptionHandler(originalUncaughtExceptionHandler)
+ if (!disableOutCheck) {
+ System.setOut(previousOut)
+ }
if (uncaughtExceptions.isNotEmpty()) {
makeError("Expected no uncaught exceptions, but got $uncaughtExceptions")
}
@@ -187,7 +216,7 @@ public actual open class TestBase actual constructor() {
expected: ((Throwable) -> Boolean)? = null,
unhandled: List<(Throwable) -> Boolean> = emptyList(),
block: suspend CoroutineScope.() -> Unit
- ) {
+ ): TestResult {
var exCount = 0
var ex: Throwable? = null
try {
diff --git a/kotlinx-coroutines-core/jvm/test/ThreadContextElementRestoreTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadContextElementRestoreTest.kt
new file mode 100644
index 00000000..e2ab4d72
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ThreadContextElementRestoreTest.kt
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import org.junit.Test
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class ThreadContextElementRestoreTest : TestBase() {
+ private val tl = ThreadLocal<String?>()
+
+ // Checks that ThreadLocal context is properly restored after executing the given block inside
+ // withContext(tl.asContextElement("OK")) code running in different outer contexts
+ private inline fun check(crossinline block: suspend () -> Unit) = runTest {
+ val mainDispatcher = coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
+ // Scenario #1: withContext(ThreadLocal) direct from runTest
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ // Scenario #2: withContext(ThreadLocal) from coroutineScope
+ coroutineScope {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ // Scenario #3: withContext(ThreadLocal) from undispatched withContext
+ withContext(CoroutineName("NAME")) {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ // Scenario #4: withContext(ThreadLocal) from dispatched withContext
+ withContext(wrapperDispatcher()) {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ // Scenario #5: withContext(ThreadLocal) from withContext(ThreadLocal)
+ withContext(tl.asContextElement(null)) {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ // Scenario #6: withContext(ThreadLocal) from withTimeout
+ withTimeout(1000) {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ // Scenario #7: withContext(ThreadLocal) from withContext(Unconfined)
+ withContext(Dispatchers.Unconfined) {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ // Scenario #8: withContext(ThreadLocal) from withContext(Default)
+ withContext(Dispatchers.Default) {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ // Scenario #9: withContext(ThreadLocal) from withContext(mainDispatcher)
+ withContext(mainDispatcher) {
+ withContext(tl.asContextElement("OK")) {
+ block()
+ assertEquals("OK", tl.get())
+ }
+ assertEquals(null, tl.get())
+ }
+ }
+
+ @Test
+ fun testSimpleNoSuspend() =
+ check {}
+
+ @Test
+ fun testSimpleDelay() = check {
+ delay(1)
+ }
+
+ @Test
+ fun testSimpleYield() = check {
+ yield()
+ }
+
+ private suspend fun deepDelay() {
+ deepDelay2(); deepDelay2()
+ }
+
+ private suspend fun deepDelay2() {
+ delay(1); delay(1)
+ }
+
+ @Test
+ fun testDeepDelay() = check {
+ deepDelay()
+ }
+
+ private suspend fun deepYield() {
+ deepYield2(); deepYield2()
+ }
+
+ private suspend fun deepYield2() {
+ yield(); yield()
+ }
+
+ @Test
+ fun testDeepYield() = check {
+ deepYield()
+ }
+
+ @Test
+ fun testCoroutineScopeDelay() = check {
+ coroutineScope {
+ delay(1)
+ }
+ }
+
+ @Test
+ fun testCoroutineScopeYield() = check {
+ coroutineScope {
+ yield()
+ }
+ }
+
+ @Test
+ fun testWithContextUndispatchedDelay() = check {
+ withContext(CoroutineName("INNER")) {
+ delay(1)
+ }
+ }
+
+ @Test
+ fun testWithContextUndispatchedYield() = check {
+ withContext(CoroutineName("INNER")) {
+ yield()
+ }
+ }
+
+ @Test
+ fun testWithContextDispatchedDelay() = check {
+ withContext(wrapperDispatcher()) {
+ delay(1)
+ }
+ }
+
+ @Test
+ fun testWithContextDispatchedYield() = check {
+ withContext(wrapperDispatcher()) {
+ yield()
+ }
+ }
+
+ @Test
+ fun testWithTimeoutDelay() = check {
+ withTimeout(1000) {
+ delay(1)
+ }
+ }
+
+ @Test
+ fun testWithTimeoutYield() = check {
+ withTimeout(1000) {
+ yield()
+ }
+ }
+
+ @Test
+ fun testWithUnconfinedContextDelay() = check {
+ withContext(Dispatchers.Unconfined) {
+ delay(1)
+ }
+ }
+ @Test
+ fun testWithUnconfinedContextYield() = check {
+ withContext(Dispatchers.Unconfined) {
+ yield()
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/ThreadContextOrderTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadContextOrderTest.kt
new file mode 100644
index 00000000..49f4a12e
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ThreadContextOrderTest.kt
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.internal.*
+import org.junit.Test
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class ThreadContextOrderTest : TestBase() {
+ /*
+ * The test verifies that two thread context elements are correctly nested:
+ * The restoration order is the reverse of update order.
+ */
+ private val transactionalContext = ThreadLocal<String>()
+ private val loggingContext = ThreadLocal<String>()
+
+ private val transactionalElement = object : ThreadContextElement<String> {
+ override val key = ThreadLocalKey(transactionalContext)
+
+ override fun updateThreadContext(context: CoroutineContext): String {
+ assertEquals("test", loggingContext.get())
+ val previous = transactionalContext.get()
+ transactionalContext.set("tr coroutine")
+ return previous
+ }
+
+ override fun restoreThreadContext(context: CoroutineContext, oldState: String) {
+ assertEquals("test", loggingContext.get())
+ assertEquals("tr coroutine", transactionalContext.get())
+ transactionalContext.set(oldState)
+ }
+ }
+
+ private val loggingElement = object : ThreadContextElement<String> {
+ override val key = ThreadLocalKey(loggingContext)
+
+ override fun updateThreadContext(context: CoroutineContext): String {
+ val previous = loggingContext.get()
+ loggingContext.set("log coroutine")
+ return previous
+ }
+
+ override fun restoreThreadContext(context: CoroutineContext, oldState: String) {
+ assertEquals("log coroutine", loggingContext.get())
+ assertEquals("tr coroutine", transactionalContext.get())
+ loggingContext.set(oldState)
+ }
+ }
+
+ @Test
+ fun testCorrectOrder() = runTest {
+ transactionalContext.set("test")
+ loggingContext.set("test")
+ launch(transactionalElement + loggingElement) {
+ assertEquals("log coroutine", loggingContext.get())
+ assertEquals("tr coroutine", transactionalContext.get())
+ }
+ assertEquals("test", loggingContext.get())
+ assertEquals("test", transactionalContext.get())
+
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
index ca399f53..bd9a185f 100644
--- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
+++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -42,7 +42,7 @@ private const val REAL_PARK_NANOS = 10_000_000L // 10 ms -- park for a little to
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal class VirtualTimeSource(
private val log: PrintStream?
-) : TimeSource {
+) : AbstractTimeSource() {
private val mainThread: Thread = Thread.currentThread()
private var checkpointNanos: Long = System.nanoTime()
@@ -142,7 +142,7 @@ internal class VirtualTimeSource(
}
private fun minParkedTill(): Long =
- threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED
+ threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED
@Synchronized
fun shutdown() {
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt
index ae95e694..d3b2ff12 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -78,4 +78,14 @@ class ActorLazyTest : TestBase() {
job.join()
finish(5)
}
-} \ No newline at end of file
+
+ @Test
+ fun testCancelledParent() = runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ actor<Int>(start = CoroutineStart.LAZY) {
+ expectUnreached()
+ }
+ finish(2)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt
index bdca5039..5a2778d5 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -69,11 +69,11 @@ class ActorTest(private val capacity: Int) : TestBase() {
@Test
fun testCloseWithoutCause() = runTest {
val actor = actor<Int>(capacity = capacity) {
- val element = channel.receiveOrNull()
+ val element = channel.receive()
expect(2)
assertEquals(42, element)
- val next = channel.receiveOrNull()
- assertNull(next)
+ val next = channel.receiveCatching()
+ assertNull(next.exceptionOrNull())
expect(3)
}
@@ -88,11 +88,11 @@ class ActorTest(private val capacity: Int) : TestBase() {
@Test
fun testCloseWithCause() = runTest {
val actor = actor<Int>(capacity = capacity) {
- val element = channel.receiveOrNull()
+ val element = channel.receive()
expect(2)
- require(element!! == 42)
+ require(element == 42)
try {
- channel.receiveOrNull()
+ channel.receive()
} catch (e: IOException) {
expect(3)
}
@@ -111,7 +111,7 @@ class ActorTest(private val capacity: Int) : TestBase() {
val job = async {
actor<Int>(capacity = capacity) {
expect(1)
- channel.receiveOrNull()
+ channel.receive()
expectUnreached()
}
}
@@ -173,11 +173,24 @@ class ActorTest(private val capacity: Int) : TestBase() {
fun testCloseFreshActor() = runTest {
for (start in CoroutineStart.values()) {
val job = launch {
- val actor = actor<Int>(start = start) { for (i in channel) {} }
+ val actor = actor<Int>(start = start) {
+ for (i in channel) {
+ }
+ }
actor.close()
}
job.join()
}
}
+
+ @Test
+ fun testCancelledParent() = runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ actor<Int> {
+ expectUnreached()
+ }
+ finish(2)
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
index 2e73b243..8c9777b4 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -67,10 +67,10 @@ class BroadcastChannelMultiReceiveStressTest(
val channel = broadcast.openSubscription()
when (receiverIndex % 5) {
0 -> doReceive(channel, receiverIndex)
- 1 -> doReceiveOrNull(channel, receiverIndex)
+ 1 -> doReceiveCatching(channel, receiverIndex)
2 -> doIterator(channel, receiverIndex)
3 -> doReceiveSelect(channel, receiverIndex)
- 4 -> doReceiveSelectOrNull(channel, receiverIndex)
+ 4 -> doReceiveCatchingSelect(channel, receiverIndex)
}
channel.cancel()
}
@@ -124,9 +124,9 @@ class BroadcastChannelMultiReceiveStressTest(
}
}
- private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ private suspend fun doReceiveCatching(channel: ReceiveChannel<Long>, receiverIndex: Int) {
while (true) {
- val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ val stop = doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break)
if (stop) break
}
}
@@ -148,11 +148,11 @@ class BroadcastChannelMultiReceiveStressTest(
}
}
- private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ private suspend fun doReceiveCatchingSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) {
while (true) {
- val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break
+ val event = select<Long?> { channel.onReceiveCatching { it.getOrNull() } } ?: break
val stop = doReceived(receiverIndex, event)
if (stop) break
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
index 76713aa1..86adfee0 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -15,7 +15,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
// total counters
private var sendCnt = 0
- private var offerFailedCnt = 0
+ private var trySendFailedCnt = 0
private var receivedCnt = 0
private var undeliveredCnt = 0
@@ -23,7 +23,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
private var lastReceived = 0
private var dSendCnt = 0
private var dSendExceptionCnt = 0
- private var dOfferFailedCnt = 0
+ private var dTrySendFailedCnt = 0
private var dReceivedCnt = 0
private val dUndeliveredCnt = AtomicInteger()
@@ -43,30 +43,30 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
joinAll(j1, j2)
// All elements must be either received or undelivered (IN every run)
- if (dSendCnt - dOfferFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
+ if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
println(" Send: $dSendCnt")
- println("Send Exception: $dSendExceptionCnt")
- println(" Offer failed: $dOfferFailedCnt")
+ println("Send exception: $dSendExceptionCnt")
+ println("trySend failed: $dTrySendFailedCnt")
println(" Received: $dReceivedCnt")
println(" Undelivered: ${dUndeliveredCnt.get()}")
error("Failed")
}
- offerFailedCnt += dOfferFailedCnt
+ trySendFailedCnt += dTrySendFailedCnt
receivedCnt += dReceivedCnt
undeliveredCnt += dUndeliveredCnt.get()
// clear for next run
dSendCnt = 0
dSendExceptionCnt = 0
- dOfferFailedCnt = 0
+ dTrySendFailedCnt = 0
dReceivedCnt = 0
dUndeliveredCnt.set(0)
}
// Stats
- println(" Send: $sendCnt")
- println(" Offer failed: $offerFailedCnt")
- println(" Received: $receivedCnt")
- println(" Undelivered: $undeliveredCnt")
- assertEquals(sendCnt - offerFailedCnt, receivedCnt + undeliveredCnt)
+ println(" Send: $sendCnt")
+ println("trySend failed: $trySendFailedCnt")
+ println(" Received: $receivedCnt")
+ println(" Undelivered: $undeliveredCnt")
+ assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt)
}
private suspend fun sendOne(channel: Channel<Int>) {
@@ -75,11 +75,11 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
try {
when (Random.nextInt(2)) {
0 -> channel.send(i)
- 1 -> if (!channel.offer(i)) {
- dOfferFailedCnt++
+ 1 -> if (!channel.trySend(i).isSuccess) {
+ dTrySendFailedCnt++
}
}
- } catch(e: Throwable) {
+ } catch (e: Throwable) {
assertTrue(e is CancellationException) // the only exception possible in this test
dSendExceptionCnt++
throw e
@@ -89,7 +89,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
private suspend fun receiveOne(channel: Channel<Int>) {
val received = when (Random.nextInt(3)) {
0 -> channel.receive()
- 1 -> channel.receiveOrNull() ?: error("Cannot be closed yet")
+ 1 -> channel.receiveCatching().getOrElse { error("Cannot be closed yet") }
2 -> select {
channel.onReceive { it }
}
@@ -99,4 +99,4 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
dReceivedCnt++
lastReceived = received
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt
deleted file mode 100644
index 256ef621..00000000
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelLFStressTest.kt
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.channels
-
-import kotlinx.atomicfu.*
-import kotlinx.coroutines.*
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.atomic.AtomicLongArray
-import kotlin.math.*
-import kotlin.test.*
-
-/**
- * Tests lock-freedom of send and receive operations on rendezvous and conflated channels.
- * There is a single channel with two sender and two receiver threads.
- * When one sender or receiver gets suspended at most one other operation is allowed to cease having progress
- * (`allowSuspendedThreads = 1`).
- *
- * **Note**: In the current implementation buffered channels are not lock-free, so this test would fail
- * if channel is created with a buffer.
- */
-class ChannelLFStressTest : TestBase() {
- private val nSeconds = 5 * stressTestMultiplier
- private val env = LockFreedomTestEnvironment("ChannelLFStressTest", allowSuspendedThreads = 1)
- private lateinit var channel: Channel<Long>
-
- private val sendIndex = AtomicLong()
- private val receiveCount = AtomicLong()
- private val duplicateCount = AtomicLong()
-
- private val nCheckedSize = 10_000_000
- private val nChecked = (nCheckedSize * Long.SIZE_BITS).toLong()
- private val receivedBits = AtomicLongArray(nCheckedSize) // bit set of received values
-
- @Test
- fun testRendezvousLockFreedom() {
- channel = Channel()
- performLockFreedomTest()
- // ensure that all sent were received
- checkAllReceived()
- }
-
- private fun performLockFreedomTest() {
- env.onCompletion {
- // We must cancel the channel to abort both senders & receivers
- channel.cancel(TestCompleted())
- }
- repeat(2) { env.testThread("sender-$it") { sender() } }
- repeat(2) { env.testThread("receiver-$it") { receiver() } }
- env.performTest(nSeconds) {
- println("Sent: $sendIndex, Received: $receiveCount, dups: $duplicateCount")
- }
- // ensure no duplicates
- assertEquals(0L, duplicateCount.get())
- }
-
- private fun checkAllReceived() {
- for (i in 0 until min(sendIndex.get(), nChecked)) {
- assertTrue(isReceived(i))
- }
- }
-
- private suspend fun sender() {
- val value = sendIndex.getAndIncrement()
- try {
- channel.send(value)
- } catch (e: TestCompleted) {
- check(env.isCompleted) // expected when test was completed
- markReceived(value) // fake received (actually failed to send)
- }
- }
-
- private suspend fun receiver() {
- val value = try {
- channel.receive()
- } catch (e: TestCompleted) {
- check(env.isCompleted) // expected when test was completed
- return
- }
- receiveCount.incrementAndGet()
- markReceived(value)
- }
-
- private fun markReceived(value: Long) {
- if (value >= nChecked) return // too big
- val index = (value / Long.SIZE_BITS).toInt()
- val mask = 1L shl (value % Long.SIZE_BITS).toInt()
- while (true) {
- val bits = receivedBits.get(index)
- if (bits and mask != 0L) {
- duplicateCount.incrementAndGet()
- break
- }
- if (receivedBits.compareAndSet(index, bits, bits or mask)) break
- }
- }
-
- private fun isReceived(value: Long): Boolean {
- val index = (value / Long.SIZE_BITS).toInt()
- val mask = 1L shl (value % Long.SIZE_BITS).toInt()
- val bits = receivedBits.get(index)
- return bits and mask != 0L
- }
-
- private class TestCompleted : CancellationException()
-}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt
index f414c333..a6345cc5 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -60,10 +60,10 @@ class ChannelSendReceiveStressTest(
launch(pool + CoroutineName("receiver$receiverIndex")) {
when (receiverIndex % 5) {
0 -> doReceive(receiverIndex)
- 1 -> doReceiveOrNull(receiverIndex)
+ 1 -> doReceiveCatching(receiverIndex)
2 -> doIterator(receiverIndex)
3 -> doReceiveSelect(receiverIndex)
- 4 -> doReceiveSelectOrNull(receiverIndex)
+ 4 -> doReceiveCatchingSelect(receiverIndex)
}
receiversCompleted.incrementAndGet()
}
@@ -152,9 +152,9 @@ class ChannelSendReceiveStressTest(
}
}
- private suspend fun doReceiveOrNull(receiverIndex: Int) {
+ private suspend fun doReceiveCatching(receiverIndex: Int) {
while (true) {
- doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break)
}
}
@@ -173,10 +173,10 @@ class ChannelSendReceiveStressTest(
}
}
- private suspend fun doReceiveSelectOrNull(receiverIndex: Int) {
+ private suspend fun doReceiveCatchingSelect(receiverIndex: Int) {
while (true) {
- val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break
+ val event = select<Int?> { channel.onReceiveCatching { it.getOrNull() } } ?: break
doReceived(receiverIndex, event)
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
index 1188329a..12334326 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -68,7 +68,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T
try {
block()
} finally {
- if (!done.offer(true))
+ if (!done.trySend(true).isSuccess)
error(IllegalStateException("failed to offer to done channel"))
}
}
@@ -188,9 +188,9 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T
val receivedData = when (receiveMode) {
1 -> channel.receive()
2 -> select { channel.onReceive { it } }
- 3 -> channel.receiveOrNull() ?: error("Should not be closed")
- 4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } }
- 5 -> channel.receiveOrClosed().value
+ 3 -> channel.receiveCatching().getOrElse { error("Should not be closed") }
+ 4 -> select { channel.onReceiveCatching { it.getOrElse { error("Should not be closed") } } }
+ 5 -> channel.receiveCatching().getOrThrow()
6 -> {
val iterator = channel.iterator()
check(iterator.hasNext()) { "Should not be closed" }
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
index da20f0c5..8512aebc 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
@@ -11,7 +11,7 @@ import kotlin.test.*
class ChannelsJvmTest : TestBase() {
@Test
- fun testBlocking() {
+ fun testTrySendBlocking() {
val ch = Channel<Int>()
val sum = GlobalScope.async {
var sum = 0
@@ -19,9 +19,36 @@ class ChannelsJvmTest : TestBase() {
sum
}
repeat(10) {
- ch.sendBlocking(it)
+ assertTrue(ch.trySendBlocking(it).isSuccess)
}
ch.close()
assertEquals(45, runBlocking { sum.await() })
}
+
+ @Test
+ fun testTrySendBlockingClosedChannel() {
+ run {
+ val channel = Channel<Unit>().also { it.close() }
+ channel.trySendBlocking(Unit)
+ .onSuccess { expectUnreached() }
+ .onFailure { assertTrue(it is ClosedSendChannelException) }
+ .also { assertTrue { it.isClosed } }
+ }
+
+ run {
+ val channel = Channel<Unit>().also { it.close(TestException()) }
+ channel.trySendBlocking(Unit)
+ .onSuccess { expectUnreached() }
+ .onFailure { assertTrue(it is TestException) }
+ .also { assertTrue { it.isClosed } }
+ }
+
+ run {
+ val channel = Channel<Unit>().also { it.cancel(TestCancellationException()) }
+ channel.trySendBlocking(Unit)
+ .onSuccess { expectUnreached() }
+ .onFailure { assertTrue(it is TestCancellationException) }
+ .also { assertTrue { it.isClosed } }
+ }
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
index eb7be575..2b3c05bc 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
@@ -29,7 +29,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
launch(Dispatchers.Default + CoroutineName("Sender$senderId")) {
repeat(nEvents) { i ->
if (i % nSenders == senderId) {
- broadcast.offer(i)
+ broadcast.trySend(i)
sentTotal.incrementAndGet()
yield()
}
@@ -63,7 +63,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
try {
withTimeout(timeLimit) {
senders.forEach { it.join() }
- broadcast.offer(nEvents) // last event to signal receivers termination
+ broadcast.trySend(nEvents) // last event to signal receivers termination
receivers.forEach { it.join() }
}
} catch (e: CancellationException) {
@@ -86,4 +86,4 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
cancel()
value
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
index 316b3785..793d7e44 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
@@ -1,15 +1,12 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
-import org.junit.After
-import org.junit.Test
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.atomic.AtomicReference
-import kotlin.coroutines.*
+import org.junit.*
+import java.util.concurrent.atomic.*
class ConflatedChannelCloseStressTest : TestBase() {
@@ -37,12 +34,9 @@ class ConflatedChannelCloseStressTest : TestBase() {
var x = senderId
try {
while (isActive) {
- try {
- curChannel.get().offer(x)
+ curChannel.get().trySend(x).onSuccess {
x += nSenders
sent.incrementAndGet()
- } catch (e: ClosedSendChannelException) {
- // ignore
}
}
} finally {
@@ -64,7 +58,9 @@ class ConflatedChannelCloseStressTest : TestBase() {
}
val receiver = async(pool + NonCancellable) {
while (isActive) {
- curChannel.get().receiveOrNull()
+ curChannel.get().receiveCatching().getOrElse {
+ it?.let { throw it }
+ }
received.incrementAndGet()
}
}
@@ -110,4 +106,4 @@ class ConflatedChannelCloseStressTest : TestBase() {
}
class StopException : Exception()
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
index 51789078..fbc28a18 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -48,7 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
delayChannel.cancel()
delay(5100)
- assertFailsWith<CancellationException> { delayChannel.poll() }
+ assertFailsWith<CancellationException> { delayChannel.tryReceive().getOrThrow() }
}
}
@@ -112,13 +112,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
var sum = 0
var n = 0
whileSelect {
- this@averageInTimeWindow.onReceiveOrClosed {
+ this@averageInTimeWindow.onReceiveCatching {
if (it.isClosed) {
// Send leftovers and bail out
if (n != 0) send(sum / n.toDouble())
false
} else {
- sum += it.value
+ sum += it.getOrThrow()
++n
true
}
@@ -159,9 +159,9 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
}
}
-fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll())
+fun ReceiveChannel<Unit>.checkEmpty() = assertNull(tryReceive().getOrNull())
fun ReceiveChannel<Unit>.checkNotEmpty() {
- assertNotNull(poll())
- assertNull(poll())
+ assertNotNull(tryReceive().getOrNull())
+ assertNull(tryReceive().getOrNull())
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
index f52f8b5b..2d8c0ebc 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.exceptions
@@ -38,13 +38,6 @@ class StackTraceRecoveryChannelsTest : TestBase() {
}
@Test
- fun testReceiveOrNullFromClosedChannel() = runTest {
- val channel = Channel<Int>()
- channel.close(RecoverableTestException())
- channelReceiveOrNull(channel)
- }
-
- @Test
fun testSendToClosedChannel() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
@@ -67,7 +60,6 @@ class StackTraceRecoveryChannelsTest : TestBase() {
}
private suspend fun channelReceive(channel: Channel<Int>) = channelOp { channel.receive() }
- private suspend fun channelReceiveOrNull(channel: Channel<Int>) = channelOp { channel.receiveOrNull() }
private suspend inline fun channelOp(block: () -> Unit) {
try {
@@ -145,25 +137,6 @@ class StackTraceRecoveryChannelsTest : TestBase() {
deferred.await()
}
- // See https://github.com/Kotlin/kotlinx.coroutines/issues/950
- @Test
- fun testCancelledOffer() = runTest {
- expect(1)
- val job = Job()
- val actor = actor<Int>(job, Channel.UNLIMITED) {
- consumeEach {
- expectUnreached() // is cancelled before offer
- }
- }
- job.cancel()
- try {
- actor.offer(1)
- } catch (e: Exception) {
- verifyStackTrace("channels/${name.methodName}", e)
- finish(2)
- }
- }
-
private suspend fun Channel<Int>.sendWithContext(ctx: CoroutineContext) = withContext(ctx) {
sendInChannel()
yield() // TCE
@@ -177,4 +150,4 @@ class StackTraceRecoveryChannelsTest : TestBase() {
private suspend fun Channel<Int>.sendFromScope() = coroutineScope {
sendWithContext(wrapperDispatcher(coroutineContext))
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt
index 70336659..dba738a8 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.exceptions
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import kotlin.test.*
@@ -71,4 +72,56 @@ class StackTraceRecoveryCustomExceptionsTest : TestBase() {
assertEquals("custom", cause.message)
}
}
+
+ class WrongMessageException(token: String) : RuntimeException("Token $token")
+
+ @Test
+ fun testWrongMessageException() = runTest {
+ val result = runCatching {
+ coroutineScope<Unit> {
+ throw WrongMessageException("OK")
+ }
+ }
+ val ex = result.exceptionOrNull() ?: error("Expected to fail")
+ assertTrue(ex is WrongMessageException)
+ assertEquals("Token OK", ex.message)
+ }
+
+ @Test
+ fun testWrongMessageExceptionInChannel() = runTest {
+ val result = produce<Unit>(SupervisorJob() + Dispatchers.Unconfined) {
+ throw WrongMessageException("OK")
+ }
+ val ex = runCatching {
+ @Suppress("ControlFlowWithEmptyBody")
+ for (unit in result) {
+ // Iterator has a special code path
+ }
+ }.exceptionOrNull() ?: error("Expected to fail")
+ assertTrue(ex is WrongMessageException)
+ assertEquals("Token OK", ex.message)
+ }
+
+ class CopyableWithCustomMessage(
+ message: String?,
+ cause: Throwable? = null
+ ) : RuntimeException(message, cause),
+ CopyableThrowable<CopyableWithCustomMessage> {
+
+ override fun createCopy(): CopyableWithCustomMessage {
+ return CopyableWithCustomMessage("Recovered: [$message]", cause)
+ }
+ }
+
+ @Test
+ fun testCustomCopyableMessage() = runTest {
+ val result = runCatching {
+ coroutineScope<Unit> {
+ throw CopyableWithCustomMessage("OK")
+ }
+ }
+ val ex = result.exceptionOrNull() ?: error("Expected to fail")
+ assertTrue(ex is CopyableWithCustomMessage)
+ assertEquals("Recovered: [OK]", ex.message)
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt
index bea18a43..a85bb7a2 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt
@@ -86,7 +86,6 @@ class StackTraceRecoveryNestedScopesTest : TestBase() {
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callWithTimeout\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:37)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callCoroutineScope\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:43)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:68)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest.verifyAwait(StackTraceRecoveryNestedScopesTest.kt:76)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:71)\n" +
"Caused by: kotlinx.coroutines.RecoverableTestException\n" +
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt
index 5073b7fd..02607c03 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt
@@ -58,7 +58,7 @@ class StackTraceRecoveryNestedTest : TestBase() {
try {
rootAsync.awaitRootLevel()
} catch (e: RecoverableTestException) {
- e.verifyException("await\$suspendImpl", "awaitRootLevel")
+ e.verifyException("awaitRootLevel")
finish(8)
}
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt
index 290420e4..0d7648c5 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt
@@ -45,9 +45,9 @@ class StackTraceRecoverySelectTest : TestBase() {
private suspend fun doSelectAwait(deferred: Deferred<Unit>): Int {
return select {
deferred.onAwait {
- yield() // Hide the stackstrace
+ yield() // Hide the frame
42
}
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
index dbbd77c4..0a8b6530 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
@@ -34,14 +34,13 @@ class StackTraceRecoveryTest : TestBase() {
val deferred = createDeferred(3)
val traces = listOf(
"java.util.concurrent.ExecutionException\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:49)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:44)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:17)\n",
"Caused by: java.util.concurrent.ExecutionException\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" +
"\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n"
)
nestedMethod(deferred, *traces.toTypedArray())
@@ -59,7 +58,6 @@ class StackTraceRecoveryTest : TestBase() {
"java.util.concurrent.ExecutionException\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:44)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:81)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:75)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:71)",
@@ -94,7 +92,6 @@ class StackTraceRecoveryTest : TestBase() {
"kotlinx.coroutines.RecoverableTestException\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testWithContext\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.outerMethod(StackTraceRecoveryTest.kt:150)\n" +
@@ -132,7 +129,6 @@ class StackTraceRecoveryTest : TestBase() {
"kotlinx.coroutines.RecoverableTestException\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCoroutineScope\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2\$1.invokeSuspend(StackTraceRecoveryTest.kt:193)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" +
@@ -228,13 +224,13 @@ class StackTraceRecoveryTest : TestBase() {
val e = exception
assertNotNull(e)
verifyStackTrace(e, "kotlinx.coroutines.RecoverableTestException\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" +
- "\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" +
- "Caused by: kotlinx.coroutines.RecoverableTestException")
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.access\$throws(StackTraceRecoveryTest.kt:20)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" +
+ "\t(Coroutine boundary)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" +
+ "Caused by: kotlinx.coroutines.RecoverableTestException")
}
private class Callback(val cont: CancellableContinuation<*>)
@@ -261,22 +257,8 @@ class StackTraceRecoveryTest : TestBase() {
private suspend fun awaitCallback(channel: Channel<Callback>) {
suspendCancellableCoroutine<Unit> { cont ->
- channel.offer(Callback(cont))
+ channel.trySend(Callback(cont))
}
yield() // nop to make sure it is not a tail call
}
-
- @Test
- fun testWrongMessageException() = runTest {
- val result = runCatching {
- coroutineScope<Unit> {
- throw WrongMessageException("OK")
- }
- }
- val ex = result.exceptionOrNull() ?: error("Expected to fail")
- assertTrue(ex is WrongMessageException)
- assertEquals("Token OK", ex.message)
- }
-
- public class WrongMessageException(token: String) : RuntimeException("Token $token")
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt
index 6034fccb..edce175d 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.exceptions
@@ -15,8 +15,8 @@ class SuppressionTests : TestBase() {
@Test
fun testNotificationsWithException() = runTest {
expect(1)
- val coroutineContext = kotlin.coroutines.coroutineContext // workaround for KT-22984
- val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
+ val coroutineContext = kotlin.coroutines.coroutineContext + NonCancellable // workaround for KT-22984
+ val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) {
override fun onStart() {
expect(3)
}
@@ -82,4 +82,4 @@ class SuppressionTests : TestBase() {
assertTrue(e.cause!!.suppressed.isEmpty())
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
index e3db2626..f1be284c 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
@@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() {
fun testThrowingConsumer() = runTest {
var i = 0
val api = CallbackApi {
- runCatching { it.offer(++i) }
+ it.trySend(++i)
}
val flow = callbackFlow<Int> {
@@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() {
var i = 0
val api = CallbackApi {
if (i < 5) {
- it.offer(++i)
+ it.trySend(++i)
} else {
it.close(RuntimeException())
}
}
- val flow = callbackFlow<Int>() {
+ val flow = callbackFlow<Int> {
api.start(channel)
awaitClose {
api.stop()
diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt
new file mode 100644
index 00000000..98240fc9
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import org.junit.*
+
+/**
+ * Tests that shared flows keep strong reference to their source flows.
+ * See https://github.com/Kotlin/kotlinx.coroutines/issues/2557
+ */
+@OptIn(DelicateCoroutinesApi::class)
+class SharingReferenceTest : TestBase() {
+ private val token = object {}
+
+ /*
+ * Single-threaded executor that we are using to ensure that the flow being sharing actually
+ * suspended (spilled its locals, attached to parent), so we can verify reachability.
+ * Without that, it's possible to have a situation where target flow is still
+ * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails.
+ */
+ @get:Rule
+ val executor = ExecutorRule(1)
+
+ private val weakEmitter = flow {
+ emit(null)
+ // suspend forever without keeping a strong reference to continuation -- this is a model of
+ // a callback API that does not keep a strong reference it is listeners, but works
+ suspendCancellableCoroutine<Unit> { }
+ // using the token here to make it easily traceable
+ emit(token)
+ }
+
+ @Test
+ fun testShareInReference() {
+ val flow = weakEmitter.shareIn(ContextScope(executor), SharingStarted.Eagerly, 0)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ @Test
+ fun testStateInReference() {
+ val flow = weakEmitter.stateIn(ContextScope(executor), SharingStarted.Eagerly, null)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ @Test
+ fun testStateInSuspendingReference() = runTest {
+ val flow = weakEmitter.stateIn(GlobalScope)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ private fun linearize() {
+ runBlocking(executor) { }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
index dc3cd43c..e55eaad1 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
@@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() {
for (second in 1..nSeconds) {
delay(1000)
val cs = collected.map { it.sum() }
- println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}")
+ println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}")
}
emitters.cancelAndJoin()
collectors.cancelAndJoin()
@@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() {
@Test
fun testTenEmittersAndCollectors() = stress(10, 10)
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt
new file mode 100644
index 00000000..660ed0aa
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import org.junit.*
+import kotlin.test.*
+import kotlin.test.Test
+
+class StateFlowUpdateStressTest : TestBase() {
+ private val iterations = 1_000_000 * stressTestMultiplier
+
+ @get:Rule
+ public val executor = ExecutorRule(2)
+
+ @Test
+ fun testUpdate() = doTest { update { it + 1 } }
+
+ @Test
+ fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } }
+
+ @Test
+ fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } }
+
+ private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest {
+ val flow = MutableStateFlow(0)
+ val j1 = launch(Dispatchers.Default) {
+ repeat(iterations / 2) {
+ flow.increment()
+ }
+ }
+
+ val j2 = launch(Dispatchers.Default) {
+ repeat(iterations / 2) {
+ flow.increment()
+ }
+ }
+
+ joinAll(j1, j2)
+ assertEquals(iterations, flow.value)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt
index 15e12614..529f8817 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt
@@ -2,16 +2,15 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
+// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleBasic01
import kotlinx.coroutines.*
-fun main() {
- GlobalScope.launch { // launch a new coroutine in background and continue
+fun main() = runBlocking { // this: CoroutineScope
+ launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
- println("Hello,") // main thread continues while coroutine is delayed
- Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
+ println("Hello") // main coroutine continues while a previous one is delayed
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt
index 4f178ca6..6bf2af4c 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt
@@ -2,18 +2,18 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
+// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleBasic02
import kotlinx.coroutines.*
-fun main() {
- GlobalScope.launch { // launch a new coroutine in background and continue
- delay(1000L)
- println("World!")
- }
- println("Hello,") // main thread continues here immediately
- runBlocking { // but this expression blocks the main thread
- delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
- }
+fun main() = runBlocking { // this: CoroutineScope
+ launch { doWorld() }
+ println("Hello")
+}
+
+// this is your first suspending function
+suspend fun doWorld() {
+ delay(1000L)
+ println("World!")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
index f80113c5..67b6894a 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
@@ -2,16 +2,19 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
+// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleBasic03
import kotlinx.coroutines.*
-fun main() = runBlocking<Unit> { // start main coroutine
- GlobalScope.launch { // launch a new coroutine in background and continue
+fun main() = runBlocking {
+ doWorld()
+}
+
+suspend fun doWorld() = coroutineScope { // this: CoroutineScope
+ launch {
delay(1000L)
println("World!")
}
- println("Hello,") // main coroutine continues here immediately
- delay(2000L) // delaying for 2 seconds to keep JVM alive
+ println("Hello")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt
index 33c928a0..efac7085 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt
@@ -2,16 +2,26 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
+// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleBasic04
import kotlinx.coroutines.*
+// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
- val job = GlobalScope.launch { // launch a new coroutine and keep a reference to its Job
+ doWorld()
+ println("Done")
+}
+
+// Concurrently executes both sections
+suspend fun doWorld() = coroutineScope { // this: CoroutineScope
+ launch {
+ delay(2000L)
+ println("World 2")
+ }
+ launch {
delay(1000L)
- println("World!")
+ println("World 1")
}
- println("Hello,")
- job.join() // wait until child coroutine completes
+ println("Hello")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt
index 52b490d2..193f2cc3 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt
@@ -2,15 +2,17 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
+// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleBasic05
import kotlinx.coroutines.*
-fun main() = runBlocking { // this: CoroutineScope
- launch { // launch a new coroutine in the scope of runBlocking
+fun main() = runBlocking {
+ val job = launch { // launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
- println("Hello,")
+ println("Hello")
+ job.join() // wait until child coroutine completes
+ println("Done")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt
index a1b5bc5f..24b890a0 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt
@@ -2,26 +2,16 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
+// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleBasic06
import kotlinx.coroutines.*
-fun main() = runBlocking { // this: CoroutineScope
- launch {
- delay(200L)
- println("Task from runBlocking")
- }
-
- coroutineScope { // Creates a coroutine scope
+fun main() = runBlocking {
+ repeat(100_000) { // launch a lot of coroutines
launch {
- delay(500L)
- println("Task from nested launch")
+ delay(5000L)
+ print(".")
}
-
- delay(100L)
- println("Task from coroutine scope") // This line will be printed before the nested launch
}
-
- println("Coroutine scope is over") // This line is not printed until the nested launch completes
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt
deleted file mode 100644
index 32c02b86..00000000
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
-package kotlinx.coroutines.guide.exampleBasic07
-
-import kotlinx.coroutines.*
-
-fun main() = runBlocking {
- launch { doWorld() }
- println("Hello,")
-}
-
-// this is your first suspending function
-suspend fun doWorld() {
- delay(1000L)
- println("World!")
-}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt
deleted file mode 100644
index bb7786f2..00000000
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
-package kotlinx.coroutines.guide.exampleBasic08
-
-import kotlinx.coroutines.*
-
-fun main() = runBlocking {
- repeat(100_000) { // launch a lot of coroutines
- launch {
- delay(5000L)
- print(".")
- }
- }
-}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt
deleted file mode 100644
index 9f998b52..00000000
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
-package kotlinx.coroutines.guide.exampleBasic09
-
-import kotlinx.coroutines.*
-
-fun main() = runBlocking {
- GlobalScope.launch {
- repeat(1000) { i ->
- println("I'm sleeping $i ...")
- delay(500L)
- }
- }
- delay(1300L) // just quit after delay
-}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
index 312dc72b..35536a7d 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
@@ -23,10 +23,12 @@ fun main() {
println("Completed in $time ms")
}
+@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne()
}
+@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
doSomethingUsefulTwo()
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
index e23eaf25..c6ad4516 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
@@ -10,9 +10,9 @@ import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
- // it spawns two other jobs, one with GlobalScope
- GlobalScope.launch {
- println("job1: I run in GlobalScope and execute independently!")
+ // it spawns two other jobs
+ launch(Job()) {
+ println("job1: I run in my own Job and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt
index e08ddd08..24cbabe0 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions01
import kotlinx.coroutines.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val job = GlobalScope.launch { // root coroutine with launch
println("Throwing exception from launch")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
index 67fdaa71..c3ab68a5 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions02
import kotlinx.coroutines.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
index 9c9b43d2..b966c1ea 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions04
import kotlinx.coroutines.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt
index 04f9385f..5f1f3d89 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt
@@ -10,6 +10,7 @@ import kotlinx.coroutines.exceptions.*
import kotlinx.coroutines.*
import java.io.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
index 5a5b276b..bc9f77b9 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
@@ -8,6 +8,7 @@ package kotlinx.coroutines.guide.exampleExceptions06
import kotlinx.coroutines.*
import java.io.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt
index 57fe6382..22380d3a 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt
@@ -11,17 +11,21 @@ import kotlinx.coroutines.selects.*
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
- a.onReceiveOrNull { value ->
- if (value == null)
- "Channel 'a' is closed"
- else
+ a.onReceiveCatching { it ->
+ val value = it.getOrNull()
+ if (value != null) {
"a -> '$value'"
+ } else {
+ "Channel 'a' is closed"
+ }
}
- b.onReceiveOrNull { value ->
- if (value == null)
- "Channel 'b' is closed"
- else
+ b.onReceiveCatching { it ->
+ val value = it.getOrNull()
+ if (value != null) {
"b -> '$value'"
+ } else {
+ "Channel 'b' is closed"
+ }
}
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt
index 464e9b20..68b44564 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt
@@ -13,12 +13,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) =
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
- input.onReceiveOrNull { update ->
- update // replaces next value to wait
+ input.onReceiveCatching { update ->
+ update.getOrNull()
}
- current.onAwait { value ->
+ current.onAwait { value ->
send(value) // send value that current deferred has produced
- input.receiveOrNull() // and use the next deferred from the input channel
+ input.receiveCatching().getOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt
index ea5003b0..7e54fb1d 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt
@@ -2,7 +2,7 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-// This file was automatically generated from basics.md by Knit tool. Do not edit.
+// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.test
import kotlinx.coroutines.knit.*
@@ -12,7 +12,7 @@ class BasicsGuideTest {
@Test
fun testExampleBasic01() {
test("ExampleBasic01") { kotlinx.coroutines.guide.exampleBasic01.main() }.verifyLines(
- "Hello,",
+ "Hello",
"World!"
)
}
@@ -20,7 +20,7 @@ class BasicsGuideTest {
@Test
fun testExampleBasic02() {
test("ExampleBasic02") { kotlinx.coroutines.guide.exampleBasic02.main() }.verifyLines(
- "Hello,",
+ "Hello",
"World!"
)
}
@@ -28,7 +28,7 @@ class BasicsGuideTest {
@Test
fun testExampleBasic03() {
test("ExampleBasic03") { kotlinx.coroutines.guide.exampleBasic03.main() }.verifyLines(
- "Hello,",
+ "Hello",
"World!"
)
}
@@ -36,50 +36,26 @@ class BasicsGuideTest {
@Test
fun testExampleBasic04() {
test("ExampleBasic04") { kotlinx.coroutines.guide.exampleBasic04.main() }.verifyLines(
- "Hello,",
- "World!"
+ "Hello",
+ "World 1",
+ "World 2",
+ "Done"
)
}
@Test
fun testExampleBasic05() {
test("ExampleBasic05") { kotlinx.coroutines.guide.exampleBasic05.main() }.verifyLines(
- "Hello,",
- "World!"
+ "Hello",
+ "World!",
+ "Done"
)
}
@Test
fun testExampleBasic06() {
- test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.verifyLines(
- "Task from coroutine scope",
- "Task from runBlocking",
- "Task from nested launch",
- "Coroutine scope is over"
- )
- }
-
- @Test
- fun testExampleBasic07() {
- test("ExampleBasic07") { kotlinx.coroutines.guide.exampleBasic07.main() }.verifyLines(
- "Hello,",
- "World!"
- )
- }
-
- @Test
- fun testExampleBasic08() {
- test("ExampleBasic08") { kotlinx.coroutines.guide.exampleBasic08.main() }.also { lines ->
+ test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.also { lines ->
check(lines.size == 1 && lines[0] == ".".repeat(100_000))
}
}
-
- @Test
- fun testExampleBasic09() {
- test("ExampleBasic09") { kotlinx.coroutines.guide.exampleBasic09.main() }.verifyLines(
- "I'm sleeping 0 ...",
- "I'm sleeping 1 ...",
- "I'm sleeping 2 ..."
- )
- }
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt
index d6f1c21d..1a84fb94 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt
@@ -57,7 +57,7 @@ class DispatcherGuideTest {
@Test
fun testExampleContext06() {
test("ExampleContext06") { kotlinx.coroutines.guide.exampleContext06.main() }.verifyLines(
- "job1: I run in GlobalScope and execute independently!",
+ "job1: I run in my own Job and execute independently!",
"job2: I am a child of the request coroutine",
"job1: I am not affected by cancellation of the request",
"main: Who has survived request cancellation?"
diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt
deleted file mode 100644
index 225b8481..00000000
--- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.internal
-
-import kotlinx.atomicfu.LockFreedomTestEnvironment
-import kotlinx.coroutines.stressTestMultiplier
-import org.junit.Test
-import java.util.*
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.atomic.AtomicReference
-import kotlin.test.*
-
-/**
- * This stress test has 4 threads adding randomly to the list and them immediately undoing
- * this addition by remove, and 4 threads trying to remove nodes from two lists simultaneously (atomically).
- */
-class LockFreeLinkedListAtomicLFStressTest {
- private val env = LockFreedomTestEnvironment("LockFreeLinkedListAtomicLFStressTest")
-
- private data class Node(val i: Long) : LockFreeLinkedListNode()
-
- private val nSeconds = 5 * stressTestMultiplier
-
- private val nLists = 4
- private val nAdderThreads = 4
- private val nRemoverThreads = 4
-
- private val lists = Array(nLists) { LockFreeLinkedListHead() }
-
- private val undone = AtomicLong()
- private val missed = AtomicLong()
- private val removed = AtomicLong()
- private val error = AtomicReference<Throwable>()
- private val index = AtomicLong()
-
- @Test
- fun testStress() {
- repeat(nAdderThreads) { threadId ->
- val rnd = Random()
- env.testThread(name = "adder-$threadId") {
- when (rnd.nextInt(4)) {
- 0 -> {
- val list = lists[rnd.nextInt(nLists)]
- val node = Node(index.incrementAndGet())
- addLastOp(list, node)
- randomSpinWaitIntermission()
- tryRemoveOp(node)
- }
- 1 -> {
- // just to test conditional add
- val list = lists[rnd.nextInt(nLists)]
- val node = Node(index.incrementAndGet())
- addLastIfTrueOp(list, node)
- randomSpinWaitIntermission()
- tryRemoveOp(node)
- }
- 2 -> {
- // just to test failed conditional add and burn some time
- val list = lists[rnd.nextInt(nLists)]
- val node = Node(index.incrementAndGet())
- addLastIfFalseOp(list, node)
- }
- 3 -> {
- // add two atomically
- val idx1 = rnd.nextInt(nLists - 1)
- val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
- check(idx1 < idx2) // that is our global order
- val list1 = lists[idx1]
- val list2 = lists[idx2]
- val node1 = Node(index.incrementAndGet())
- val node2 = Node(index.incrementAndGet())
- addTwoOp(list1, node1, list2, node2)
- randomSpinWaitIntermission()
- tryRemoveOp(node1)
- randomSpinWaitIntermission()
- tryRemoveOp(node2)
- }
- else -> error("Cannot happen")
- }
- }
- }
- repeat(nRemoverThreads) { threadId ->
- val rnd = Random()
- env.testThread(name = "remover-$threadId") {
- val idx1 = rnd.nextInt(nLists - 1)
- val idx2 = idx1 + 1 + rnd.nextInt(nLists - idx1 - 1)
- check(idx1 < idx2) // that is our global order
- val list1 = lists[idx1]
- val list2 = lists[idx2]
- removeTwoOp(list1, list2)
- }
- }
- env.performTest(nSeconds) {
- val undone = undone.get()
- val missed = missed.get()
- val removed = removed.get()
- println(" Adders undone $undone node additions")
- println(" Adders missed $missed nodes")
- println("Remover removed $removed nodes")
- }
- error.get()?.let { throw it }
- assertEquals(missed.get(), removed.get())
- assertTrue(undone.get() > 0)
- assertTrue(missed.get() > 0)
- lists.forEach { it.validate() }
- }
-
- private fun addLastOp(list: LockFreeLinkedListHead, node: Node) {
- list.addLast(node)
- }
-
- private fun addLastIfTrueOp(list: LockFreeLinkedListHead, node: Node) {
- assertTrue(list.addLastIf(node) { true })
- }
-
- private fun addLastIfFalseOp(list: LockFreeLinkedListHead, node: Node) {
- assertFalse(list.addLastIf(node) { false })
- }
-
- private fun addTwoOp(list1: LockFreeLinkedListHead, node1: Node, list2: LockFreeLinkedListHead, node2: Node) {
- val add1 = list1.describeAddLast(node1)
- val add2 = list2.describeAddLast(node2)
- val op = object : AtomicOp<Any?>() {
- init {
- add1.atomicOp = this
- add2.atomicOp = this
- }
- override fun prepare(affected: Any?): Any? =
- add1.prepare(this) ?:
- add2.prepare(this)
-
- override fun complete(affected: Any?, failure: Any?) {
- add1.complete(this, failure)
- add2.complete(this, failure)
- }
- }
- assertTrue(op.perform(null) == null)
- }
-
- private fun tryRemoveOp(node: Node) {
- if (node.remove())
- undone.incrementAndGet()
- else
- missed.incrementAndGet()
- }
-
- private fun removeTwoOp(list1: LockFreeLinkedListHead, list2: LockFreeLinkedListHead) {
- val remove1 = list1.describeRemoveFirst()
- val remove2 = list2.describeRemoveFirst()
- val op = object : AtomicOp<Any?>() {
- init {
- remove1.atomicOp = this
- remove2.atomicOp = this
- }
- override fun prepare(affected: Any?): Any? =
- remove1.prepare(this) ?:
- remove2.prepare(this)
-
- override fun complete(affected: Any?, failure: Any?) {
- remove1.complete(this, failure)
- remove2.complete(this, failure)
- }
- }
- val success = op.perform(null) == null
- if (success) removed.addAndGet(2)
- }
-} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt
new file mode 100644
index 00000000..30fbfee2
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly
+
+import java.util.concurrent.*
+import java.util.concurrent.atomic.*
+import kotlin.coroutines.*
+
+internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name)
+
+internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher =
+ ClosedAfterGuideTestDispatcher(nThreads, name)
+
+internal class PoolThread(
+ @JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests
+ target: Runnable, name: String
+) : Thread(target, name) {
+ init {
+ isDaemon = true
+ }
+}
+
+private class ClosedAfterGuideTestDispatcher(
+ private val nThreads: Int,
+ private val name: String
+) : ExecutorCoroutineDispatcher() {
+ private val threadNo = AtomicInteger()
+
+ override val executor: Executor =
+ Executors.newScheduledThreadPool(nThreads, object : ThreadFactory {
+ override fun newThread(target: java.lang.Runnable): Thread {
+ return PoolThread(
+ this@ClosedAfterGuideTestDispatcher,
+ target,
+ if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()
+ )
+ }
+ })
+
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ executor.execute(wrapTask(block))
+ }
+
+ override fun close() {
+ (executor as ExecutorService).shutdown()
+ }
+
+ override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
+}
diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
index 7eda9043..2e61ec6b 100644
--- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
+++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.knit
@@ -11,8 +11,6 @@ import kotlinx.knit.test.*
import java.util.concurrent.*
import kotlin.test.*
-fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block)
-
// helper function to dump exception to stdout for ease of debugging failed tests
private inline fun <T> outputException(name: String, block: () -> T): T =
try { block() }
@@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) {
}
throw t
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
index 8836fdc7..74cc1783 100644
--- a/kotlinx-coroutines-core/jvm/test/linearizability/ChannelsLCStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
@@ -3,7 +3,7 @@
*/
@file:Suppress("unused")
-package kotlinx.coroutines.linearizability
+package kotlinx.coroutines.lincheck
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
@@ -11,37 +11,37 @@ import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.selects.*
+import org.jetbrains.kotlinx.lincheck.*
import org.jetbrains.kotlinx.lincheck.annotations.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.paramgen.*
import org.jetbrains.kotlinx.lincheck.verifier.*
-import org.junit.*
-class RendezvousChannelLCStressTest : ChannelLCStressTestBase(
+class RendezvousChannelLincheckTest : ChannelLincheckTestBase(
c = Channel(RENDEZVOUS),
sequentialSpecification = SequentialRendezvousChannel::class.java
)
class SequentialRendezvousChannel : SequentialIntChannelBase(RENDEZVOUS)
-class Array1ChannelLCStressTest : ChannelLCStressTestBase(
+class Array1ChannelLincheckTest : ChannelLincheckTestBase(
c = Channel(1),
sequentialSpecification = SequentialArray1RendezvousChannel::class.java
)
class SequentialArray1RendezvousChannel : SequentialIntChannelBase(1)
-class Array2ChannelLCStressTest : ChannelLCStressTestBase(
+class Array2ChannelLincheckTest : ChannelLincheckTestBase(
c = Channel(2),
sequentialSpecification = SequentialArray2RendezvousChannel::class.java
)
class SequentialArray2RendezvousChannel : SequentialIntChannelBase(2)
-class UnlimitedChannelLCStressTest : ChannelLCStressTestBase(
+class UnlimitedChannelLincheckTest : ChannelLincheckTestBase(
c = Channel(UNLIMITED),
sequentialSpecification = SequentialUnlimitedChannel::class.java
)
class SequentialUnlimitedChannel : SequentialIntChannelBase(UNLIMITED)
-class ConflatedChannelLCStressTest : ChannelLCStressTestBase(
+class ConflatedChannelLincheckTest : ChannelLincheckTestBase(
c = Channel(CONFLATED),
sequentialSpecification = SequentialConflatedChannel::class.java
)
@@ -51,8 +51,11 @@ class SequentialConflatedChannel : SequentialIntChannelBase(CONFLATED)
Param(name = "value", gen = IntGen::class, conf = "1:5"),
Param(name = "closeToken", gen = IntGen::class, conf = "1:3")
)
-abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val sequentialSpecification: Class<*>) {
- @Operation
+abstract class ChannelLincheckTestBase(
+ private val c: Channel<Int>,
+ private val sequentialSpecification: Class<*>
+) : AbstractLincheckTest() {
+ @Operation(promptCancellation = true)
suspend fun send(@Param(name = "value") value: Int): Any = try {
c.send(value)
} catch (e: NumberedCancellationException) {
@@ -60,11 +63,12 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val
}
@Operation
- fun offer(@Param(name = "value") value: Int): Any = try {
- c.offer(value)
- } catch (e: NumberedCancellationException) {
- e.testResult
- }
+ fun trySend(@Param(name = "value") value: Int): Any = c.trySend(value)
+ .onSuccess { return true }
+ .onFailure {
+ return if (it is NumberedCancellationException) it.testResult
+ else false
+ }
// TODO: this operation should be (and can be!) linearizable, but is not
// @Operation
@@ -74,7 +78,7 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val
e.testResult
}
- @Operation
+ @Operation(promptCancellation = true)
suspend fun receive(): Any = try {
c.receive()
} catch (e: NumberedCancellationException) {
@@ -82,11 +86,10 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val
}
@Operation
- fun poll(): Any? = try {
- c.poll()
- } catch (e: NumberedCancellationException) {
- e.testResult
- }
+ fun tryReceive(): Any? =
+ c.tryReceive()
+ .onSuccess { return it }
+ .onFailure { return if (it is NumberedCancellationException) it.testResult else null }
// TODO: this operation should be (and can be!) linearizable, but is not
// @Operation
@@ -96,7 +99,7 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val
e.testResult
}
- @Operation
+ @Operation(causesBlocking = true)
fun close(@Param(name = "closeToken") token: Int): Boolean = c.close(NumberedCancellationException(token))
// TODO: this operation should be (and can be!) linearizable, but is not
@@ -113,11 +116,8 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val
// @Operation
fun isEmpty() = c.isEmpty
- @Test
- fun test() = LCStressOptionsDefault()
- .actorsBefore(0)
- .sequentialSpecification(sequentialSpecification)
- .check(this::class)
+ override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O =
+ actorsBefore(0).sequentialSpecification(sequentialSpecification)
}
private class NumberedCancellationException(number: Int) : CancellationException() {
@@ -131,7 +131,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
private val buffer = ArrayList<Int>()
private var closedMessage: String? = null
- suspend fun send(x: Int): Any = when (val offerRes = offer(x)) {
+ suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) {
true -> Unit
false -> suspendCancellableCoroutine { cont ->
senders.add(cont to x)
@@ -139,7 +139,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
else -> offerRes
}
- fun offer(element: Int): Any {
+ fun trySend(element: Int): Any {
if (closedMessage !== null) return closedMessage!!
if (capacity == CONFLATED) {
if (resumeFirstReceiver(element)) return true
@@ -163,11 +163,11 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
return false
}
- suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont ->
+ suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont ->
receivers.add(cont)
}
- fun poll(): Any? {
+ fun tryReceive(): Any? {
if (buffer.isNotEmpty()) {
val el = buffer.removeAt(0)
resumeFirstSender().also {
@@ -221,4 +221,4 @@ private fun <T> CancellableContinuation<T>.resume(res: T): Boolean {
val token = tryResume(res) ?: return false
completeResume(token)
return true
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt
index 5f91c640..4f1bb6ad 100644
--- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeListLCStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt
@@ -3,18 +3,17 @@
*/
@file:Suppress("unused")
-package kotlinx.coroutines.linearizability
+package kotlinx.coroutines.lincheck
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import org.jetbrains.kotlinx.lincheck.annotations.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.paramgen.*
-import org.jetbrains.kotlinx.lincheck.verifier.*
-import kotlin.test.*
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
@Param(name = "value", gen = IntGen::class, conf = "1:5")
-class LockFreeListLCStressTest : VerifierState() {
+class LockFreeListLincheckTest : AbstractLincheckTest() {
class Node(val value: Int): LockFreeLinkedListNode()
private val q: LockFreeLinkedListHead = LockFreeLinkedListHead()
@@ -43,12 +42,12 @@ class LockFreeListLCStressTest : VerifierState() {
private fun Any.isSame(value: Int) = this is Node && this.value == value
- @Test
- fun testAddRemoveLinearizability() = LCStressOptionsDefault().check(this::class)
-
override fun extractState(): Any {
val elements = ArrayList<Int>()
q.forEach<Node> { elements.add(it.value) }
return elements
}
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeTaskQueueLincheckTest.kt
index de494cc1..2a9164e1 100644
--- a/kotlinx-coroutines-core/jvm/test/linearizability/LockFreeTaskQueueLCStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeTaskQueueLincheckTest.kt
@@ -3,19 +3,21 @@
*/
@file:Suppress("unused")
-package kotlinx.coroutines.linearizability
+package kotlinx.coroutines.lincheck
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
+import org.jetbrains.kotlinx.lincheck.*
import org.jetbrains.kotlinx.lincheck.annotations.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.paramgen.*
-import org.jetbrains.kotlinx.lincheck.verifier.*
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
import org.jetbrains.kotlinx.lincheck.verifier.quiescent.*
-import kotlin.test.*
@Param(name = "value", gen = IntGen::class, conf = "1:3")
-internal abstract class AbstractLockFreeTaskQueueWithoutRemoveLCStressTest protected constructor(singleConsumer: Boolean) : VerifierState() {
+internal abstract class AbstractLockFreeTaskQueueWithoutRemoveLincheckTest(
+ val singleConsumer: Boolean
+) : AbstractLincheckTest() {
@JvmField
protected val q = LockFreeTaskQueue<Int>(singleConsumer = singleConsumer)
@@ -25,20 +27,24 @@ internal abstract class AbstractLockFreeTaskQueueWithoutRemoveLCStressTest prote
@Operation
fun addLast(@Param(name = "value") value: Int) = q.addLast(value)
- @QuiescentConsistent
- @Operation(group = "consumer")
- fun removeFirstOrNull() = q.removeFirstOrNull()
+ override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O =
+ verifier(QuiescentConsistencyVerifier::class.java)
override fun extractState() = q.map { it } to q.isClosed()
- @Test
- fun testWithRemoveForQuiescentConsistency() = LCStressOptionsDefault()
- .verifier(QuiescentConsistencyVerifier::class.java)
- .check(this::class)
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
}
-@OpGroupConfig(name = "consumer", nonParallel = false)
-internal class MCLockFreeTaskQueueWithRemoveLCStressTest : AbstractLockFreeTaskQueueWithoutRemoveLCStressTest(singleConsumer = false)
+internal class MCLockFreeTaskQueueWithRemoveLincheckTest : AbstractLockFreeTaskQueueWithoutRemoveLincheckTest(singleConsumer = false) {
+ @QuiescentConsistent
+ @Operation(blocking = true)
+ fun removeFirstOrNull() = q.removeFirstOrNull()
+}
@OpGroupConfig(name = "consumer", nonParallel = true)
-internal class SCLockFreeTaskQueueWithRemoveLCStressTest : AbstractLockFreeTaskQueueWithoutRemoveLCStressTest(singleConsumer = true) \ No newline at end of file
+internal class SCLockFreeTaskQueueWithRemoveLincheckTest : AbstractLockFreeTaskQueueWithoutRemoveLincheckTest(singleConsumer = true) {
+ @QuiescentConsistent
+ @Operation(group = "consumer")
+ fun removeFirstOrNull() = q.removeFirstOrNull()
+} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt
new file mode 100644
index 00000000..a278985f
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("unused")
+package kotlinx.coroutines.lincheck
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.sync.*
+import org.jetbrains.kotlinx.lincheck.*
+import org.jetbrains.kotlinx.lincheck.annotations.Operation
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
+
+class MutexLincheckTest : AbstractLincheckTest() {
+ private val mutex = Mutex()
+
+ @Operation
+ fun tryLock() = mutex.tryLock()
+
+ @Operation(promptCancellation = true)
+ suspend fun lock() = mutex.lock()
+
+ @Operation(handleExceptionsAsResult = [IllegalStateException::class])
+ fun unlock() = mutex.unlock()
+
+ override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O =
+ actorsBefore(0)
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+
+ override fun extractState() = mutex.isLocked
+}
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentListRemoveLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentListRemoveLincheckTest.kt
index 5daed998..5a8d7b47 100644
--- a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentListRemoveLCStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentListRemoveLincheckTest.kt
@@ -4,18 +4,16 @@
@file:Suppress("unused")
-package kotlinx.coroutines.linearizability
+package kotlinx.coroutines.lincheck
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
+import org.jetbrains.kotlinx.lincheck.*
import org.jetbrains.kotlinx.lincheck.annotations.*
-import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.paramgen.*
-import org.jetbrains.kotlinx.lincheck.verifier.*
-import org.junit.*
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
-
-class SegmentListRemoveLCStressTest : VerifierState() {
+class SegmentListRemoveLincheckTest : AbstractLincheckTest() {
private val q = SegmentBasedQueue<Int>()
private val segments: Array<OneElementSegment<Int>>
@@ -29,6 +27,9 @@ class SegmentListRemoveLCStressTest : VerifierState() {
segments[index].removeSegment()
}
+ override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O = this
+ .actorsBefore(0).actorsAfter(0)
+
override fun extractState() = segments.map { it.logicallyRemoved }
@Validate
@@ -37,9 +38,6 @@ class SegmentListRemoveLCStressTest : VerifierState() {
q.checkAllSegmentsAreNotLogicallyRemoved()
}
- @Test
- fun test() = LCStressOptionsDefault()
- .actorsBefore(0)
- .actorsAfter(0)
- .check(this::class)
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueLincheckTest.kt
index 89bf8dfa..76a59e39 100644
--- a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueLCStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueLincheckTest.kt
@@ -3,18 +3,17 @@
*/
@file:Suppress("unused")
-package kotlinx.coroutines.linearizability
+package kotlinx.coroutines.lincheck
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.SegmentBasedQueue
import org.jetbrains.kotlinx.lincheck.annotations.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.paramgen.*
-import org.jetbrains.kotlinx.lincheck.verifier.*
-import org.junit.*
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
@Param(name = "value", gen = IntGen::class, conf = "1:5")
-class SegmentQueueLCStressTest : VerifierState() {
+class SegmentQueueLincheckTest : AbstractLincheckTest() {
private val q = SegmentBasedQueue<Int>()
@Operation
@@ -40,6 +39,6 @@ class SegmentQueueLCStressTest : VerifierState() {
return elements to closed
}
- @Test
- fun test() = LCStressOptionsDefault().check(this::class)
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
new file mode 100644
index 00000000..2b471d7f
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("unused")
+package kotlinx.coroutines.lincheck
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.sync.*
+import org.jetbrains.kotlinx.lincheck.*
+import org.jetbrains.kotlinx.lincheck.annotations.Operation
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
+
+abstract class SemaphoreLincheckTestBase(permits: Int) : AbstractLincheckTest() {
+ private val semaphore = Semaphore(permits)
+
+ @Operation
+ fun tryAcquire() = semaphore.tryAcquire()
+
+ @Operation(promptCancellation = true, allowExtraSuspension = true)
+ suspend fun acquire() = semaphore.acquire()
+
+ @Operation(handleExceptionsAsResult = [IllegalStateException::class])
+ fun release() = semaphore.release()
+
+ override fun <O : Options<O, *>> O.customize(isStressTest: Boolean): O =
+ actorsBefore(0)
+
+ override fun extractState() = semaphore.availablePermits
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+}
+
+class Semaphore1LincheckTest : SemaphoreLincheckTestBase(1)
+class Semaphore2LincheckTest : SemaphoreLincheckTestBase(2) \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt
deleted file mode 100644
index 9542b5d8..00000000
--- a/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-@file:Suppress("unused")
-package kotlinx.coroutines.linearizability
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.sync.*
-import org.jetbrains.kotlinx.lincheck.annotations.Operation
-import org.jetbrains.kotlinx.lincheck.verifier.*
-import org.junit.*
-
-class MutexLCStressTest : VerifierState() {
- private val mutex = Mutex()
-
- @Operation
- fun tryLock() = mutex.tryLock()
-
- @Operation
- suspend fun lock() = mutex.lock()
-
- @Operation(handleExceptionsAsResult = [IllegalStateException::class])
- fun unlock() = mutex.unlock()
-
- @Test
- fun test() = LCStressOptionsDefault()
- .actorsBefore(0)
- .check(this::class)
-
- override fun extractState() = mutex.isLocked
-} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt
deleted file mode 100644
index 52902f49..00000000
--- a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-@file:Suppress("unused")
-package kotlinx.coroutines.linearizability
-
-import kotlinx.coroutines.*
-import kotlinx.coroutines.sync.*
-import org.jetbrains.kotlinx.lincheck.annotations.Operation
-import org.jetbrains.kotlinx.lincheck.verifier.*
-import org.junit.*
-
-abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() {
- private val semaphore = Semaphore(permits)
-
- @Operation
- fun tryAcquire() = semaphore.tryAcquire()
-
- @Operation
- suspend fun acquire() = semaphore.acquire()
-
- @Operation(handleExceptionsAsResult = [IllegalStateException::class])
- fun release() = semaphore.release()
-
- @Test
- fun test() = LCStressOptionsDefault()
- .actorsBefore(0)
- .check(this::class)
-
- override fun extractState() = semaphore.availablePermits
-}
-
-class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1)
-class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2) \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt
index 1fe0d838..3a55f8c4 100644
--- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherMixedStealingStressTest.kt
@@ -77,4 +77,4 @@ class BlockingCoroutineDispatcherMixedStealingStressTest : SchedulerTestBase() {
cpuBlocker.await()
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt
index f31752c8..fe09440f 100644
--- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt
@@ -101,7 +101,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
firstBarrier.await()
secondBarrier.await()
blockingTasks.joinAll()
- checkPoolThreadsCreated(21..22)
+ checkPoolThreadsCreated(21 /* blocking tasks + 1 for CPU */..20 + CORES_COUNT)
}
@Test
@@ -122,7 +122,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
barrier.await()
blockingTasks.joinAll()
// There may be race when multiple CPU threads are trying to lazily created one more
- checkPoolThreadsCreated(104..120)
+ checkPoolThreadsCreated(101..100 + CORES_COUNT)
}
@Test
diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt
index bfabf5b2..dd969bdd 100644
--- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt
+++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt
@@ -39,17 +39,9 @@ abstract class SchedulerTestBase : TestBase() {
)
}
- /**
- * Asserts that any number of pool worker threads in [range] exists at the time of method invocation
- */
- fun checkPoolThreadsExist(range: IntRange) {
- val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count()
- assertTrue(threads in range, "Expected threads in $range interval, but has $threads")
- }
-
private fun maxSequenceNumber(): Int? {
return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }
- .map { sequenceNumber(it.name) }.max()
+ .map { sequenceNumber(it.name) }.maxOrNull()
}
private fun sequenceNumber(threadName: String): Int {
@@ -105,4 +97,4 @@ abstract class SchedulerTestBase : TestBase() {
}
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt b/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt
index a5c83d32..233e4420 100644
--- a/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt
+++ b/kotlinx-coroutines-core/jvm/test/scheduling/TestTimeSource.kt
@@ -1,11 +1,11 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.scheduling
-internal class TestTimeSource(var time: Long) : TimeSource() {
+internal class TestTimeSource(var time: Long) : SchedulerTimeSource() {
override fun nanoTime() = time
diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt
deleted file mode 100644
index 4497bec5..00000000
--- a/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.selects
-
-import kotlinx.atomicfu.*
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import org.junit.*
-import org.junit.Ignore
-import org.junit.Test
-import kotlin.math.*
-import kotlin.test.*
-
-/**
- * A stress-test on lock-freedom of select sending/receiving into opposite channels.
- */
-class SelectDeadlockLFStressTest : TestBase() {
- private val env = LockFreedomTestEnvironment("SelectDeadlockLFStressTest", allowSuspendedThreads = 1)
- private val nSeconds = 5 * stressTestMultiplier
-
- private val c1 = Channel<Long>()
- private val c2 = Channel<Long>()
-
- @Test
- fun testLockFreedom() = testScenarios(
- "s1r2",
- "s2r1",
- "r1s2",
- "r2s1"
- )
-
- private fun testScenarios(vararg scenarios: String) {
- env.onCompletion {
- c1.cancel(TestCompleted())
- c2.cancel(TestCompleted())
- }
- val t = scenarios.mapIndexed { i, scenario ->
- val idx = i + 1L
- TestDef(idx, "$idx [$scenario]", scenario)
- }
- t.forEach { it.test() }
- env.performTest(nSeconds) {
- t.forEach { println(it) }
- }
- }
-
- private inner class TestDef(
- var sendIndex: Long = 0L,
- val name: String,
- scenario: String
- ) {
- var receiveIndex = 0L
-
- val clauses: List<SelectBuilder<Unit>.() -> Unit> = ArrayList<SelectBuilder<Unit>.() -> Unit>().apply {
- require(scenario.length % 2 == 0)
- for (i in scenario.indices step 2) {
- val ch = when (val c = scenario[i + 1]) {
- '1' -> c1
- '2' -> c2
- else -> error("Channel '$c'")
- }
- val clause = when (val op = scenario[i]) {
- 's' -> fun SelectBuilder<Unit>.() { sendClause(ch) }
- 'r' -> fun SelectBuilder<Unit>.() { receiveClause(ch) }
- else -> error("Operation '$op'")
- }
- add(clause)
- }
- }
-
- fun test() = env.testThread(name) {
- doSendReceive()
- }
-
- private suspend fun doSendReceive() {
- try {
- select<Unit> {
- for (clause in clauses) clause()
- }
- } catch (e: TestCompleted) {
- assertTrue(env.isCompleted)
- }
- }
-
- private fun SelectBuilder<Unit>.sendClause(c: Channel<Long>) =
- c.onSend(sendIndex) {
- sendIndex += 4L
- }
-
- private fun SelectBuilder<Unit>.receiveClause(c: Channel<Long>) =
- c.onReceive { i ->
- receiveIndex = max(i, receiveIndex)
- }
-
- override fun toString(): String = "$name: send=$sendIndex, received=$receiveIndex"
- }
-
- private class TestCompleted : CancellationException()
-} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt
index bb713b25..027f3c51 100644
--- a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt
@@ -90,4 +90,21 @@ class MutexStressTest : TestBase() {
}
}
}
-} \ No newline at end of file
+
+ @Test
+ fun testShouldBeUnlockedOnCancellation() = runTest {
+ val mutex = Mutex()
+ val n = 1000 * stressTestMultiplier
+ repeat(n) {
+ val job = launch(Dispatchers.Default) {
+ mutex.lock()
+ mutex.unlock()
+ }
+ mutex.withLock {
+ job.cancel()
+ }
+ job.join()
+ assertFalse { mutex.isLocked }
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
index 374a1e3d..2ceed64b 100644
--- a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
@@ -2,7 +2,7 @@ package kotlinx.coroutines.sync
import kotlinx.coroutines.*
import org.junit.Test
-import kotlin.test.assertEquals
+import kotlin.test.*
class SemaphoreStressTest : TestBase() {
@Test
@@ -90,4 +90,21 @@ class SemaphoreStressTest : TestBase() {
}
}
}
+
+ @Test
+ fun testShouldBeUnlockedOnCancellation() = runTest {
+ val semaphore = Semaphore(1)
+ val n = 1000 * stressTestMultiplier
+ repeat(n) {
+ val job = launch(Dispatchers.Default) {
+ semaphore.acquire()
+ semaphore.release()
+ }
+ semaphore.withPermit {
+ job.cancel()
+ }
+ job.join()
+ assertTrue { semaphore.availablePermits == 1 }
+ }
+ }
}