aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoman Elizarov <elizarov@gmail.com>2019-08-22 20:07:36 +0300
committerVsevolod Tolstopyatov <qwwdfsad@gmail.com>2019-08-22 20:07:36 +0300
commit3807a74e34348b42eef13175113e71d26b9cc515 (patch)
tree53d642c4e36d4d158f542f6f6fdf91d560c41bdd
parent0342a0acd46f07a6440cbbccbdd5a8027232b76e (diff)
downloadkotlinx.coroutines-3807a74e34348b42eef13175113e71d26b9cc515.tar.gz
Optimize select expression registration phase (#1445)
* Optimize select expression registration phase There is no need for multi-word atomic performAtomicIfNotSelected operation when enqueuing select node for operation. We can simply enqueue (addLast) xxxSelect node (SendSelect, ReceiveSelect, LockSelect). If the coroutine that rendezvous with this node finds out that the select expression was already selected, then it'll try again. * Removed SelectInstance.performAtomicIfNotSelected function * Removed Mutex.TryEnqueueLockDesc class, simplified onLock * Removed AbstractSendChannel.TryEnqueueSendDesc class, simpler onSend * Removed AbstractChannel.TryEnqueueReceiveDesc class, simpler onReceive * Simplified SelectInstance.disposeOnSelect. It does not have to do atomic addLastIf operation. Can do a simple addLast. * Fixed unlimited channel select onSend on closed channel It was hanging not being able to properly see that the channel was already closed at that send attempt should fail.
-rw-r--r--binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt2
-rw-r--r--kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt218
-rw-r--r--kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt4
-rw-r--r--kotlinx-coroutines-core/common/src/selects/Select.kt31
-rw-r--r--kotlinx-coroutines-core/common/src/sync/Mutex.kt28
-rw-r--r--kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt29
-rw-r--r--kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt23
-rw-r--r--kotlinx-coroutines-core/js/src/internal/LinkedList.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt28
-rw-r--r--kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt35
-rw-r--r--kotlinx-coroutines-core/jvm/test/selects/SelectMemoryLeakStressTest.kt59
-rw-r--r--kotlinx-coroutines-core/native/src/internal/LinkedList.kt6
12 files changed, 228 insertions, 241 deletions
diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
index f680a34a..8a4cf58c 100644
--- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
+++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt
@@ -1046,7 +1046,6 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
public fun isSelected ()Z
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
- public fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
@@ -1069,7 +1068,6 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
public abstract fun disposeOnSelect (Lkotlinx/coroutines/DisposableHandle;)V
public abstract fun getCompletion ()Lkotlin/coroutines/Continuation;
public abstract fun isSelected ()Z
- public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public abstract fun trySelect (Ljava/lang/Object;)Z
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index bed49790..7b8f96b6 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -96,10 +96,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
- queue.addLastIfPrev(SendBuffered(element), { prev ->
+ queue.addLastIfPrev(SendBuffered(element)) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
true
- })
+ }
return null
}
@@ -112,9 +112,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
queue: LockFreeLinkedListHead,
element: E
) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
- return null
+ override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
+ is Closed<*> -> affected
+ is ReceiveOrClosed<*> -> OFFER_FAILED
+ else -> null
}
}
@@ -168,18 +169,23 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
}
private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont ->
- val send = SendElement(element, cont)
loop@ while (true) {
- val enqueueResult = enqueueSend(send)
- when (enqueueResult) {
- null -> { // enqueued successfully
- cont.removeOnCancellation(send)
- return@sc
- }
- is Closed<*> -> {
- helpClose(enqueueResult)
- cont.resumeWithException(enqueueResult.sendException)
- return@sc
+ if (full) {
+ val send = SendElement(element, cont)
+ val enqueueResult = enqueueSend(send)
+ when {
+ enqueueResult == null -> { // enqueued successfully
+ cont.removeOnCancellation(send)
+ return@sc
+ }
+ enqueueResult is Closed<*> -> {
+ helpClose(enqueueResult)
+ cont.resumeWithException(enqueueResult.sendException)
+ return@sc
+ }
+ enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
+ enqueueResult is Receive<*> -> {} // try to offer instead
+ else -> error("enqueueSend returned $enqueueResult")
}
}
// hm... receiver is waiting or buffer is not full. try to offer
@@ -206,12 +212,12 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
*/
- private fun enqueueSend(send: SendElement): Any? {
+ private fun enqueueSend(send: Send): Any? {
if (isBufferAlwaysFull) {
- queue.addLastIfPrev(send, { prev ->
+ queue.addLastIfPrev(send) { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
true
- })
+ }
} else {
if (!queue.addLastIfPrevAndIf(send, { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
@@ -333,10 +339,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
@JvmField var resumeToken: Any? = null
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
- if (affected is Closed<*>) return affected
- return null
+ override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
+ is Closed<*> -> affected
+ !is ReceiveOrClosed<*> -> OFFER_FAILED
+ else -> null
}
override fun validatePrepared(node: ReceiveOrClosed<E>): Boolean {
@@ -346,30 +352,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
}
}
- private inner class TryEnqueueSendDesc<R>(
- element: E,
- select: SelectInstance<R>,
- block: suspend (SendChannel<E>) -> R
- ) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected is ReceiveOrClosed<*>) {
- return affected as? Closed<*> ?: ENQUEUE_FAILED
- }
- return null
- }
-
- override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
- if (!isBufferFull) return ENQUEUE_FAILED
- return super.onPrepare(affected, next)
- }
-
- override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
- super.finishOnSuccess(affected, next)
- // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
- node.disposeOnSelect()
- }
- }
-
final override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
@@ -381,26 +363,36 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
while (true) {
if (select.isSelected) return
if (full) {
- val enqueueOp = TryEnqueueSendDesc(element, select, block)
- val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ val node = SendSelect(element, this, select, block)
+ val enqueueResult = enqueueSend(node)
when {
- enqueueResult === ALREADY_SELECTED -> return
- enqueueResult === ENQUEUE_FAILED -> {} // retry
- enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException)
- else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
- }
- } else {
- val offerResult = offerSelectInternal(element, select)
- when {
- offerResult === ALREADY_SELECTED -> return
- offerResult === OFFER_FAILED -> {} // retry
- offerResult === OFFER_SUCCESS -> {
- block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
+ enqueueResult == null -> { // enqueued successfully
+ select.disposeOnSelect(node)
return
}
- offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
- else -> error("offerSelectInternal returned $offerResult")
+ enqueueResult is Closed<*> -> {
+ helpClose(enqueueResult)
+ throw recoverStackTrace(enqueueResult.sendException)
+ }
+ enqueueResult === ENQUEUE_FAILED -> {} // try to offer
+ enqueueResult is Receive<*> -> {} // try to offer
+ else -> error("enqueueSend returned $enqueueResult ")
+ }
+ }
+ // hm... receiver is waiting or buffer is not full. try to offer
+ val offerResult = offerSelectInternal(element, select)
+ when {
+ offerResult === ALREADY_SELECTED -> return
+ offerResult === OFFER_FAILED -> {} // retry
+ offerResult === OFFER_SUCCESS -> {
+ block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
+ return
+ }
+ offerResult is Closed<*> -> {
+ helpClose(offerResult)
+ throw recoverStackTrace(offerResult.sendException)
}
+ else -> error("offerSelectInternal returned $offerResult")
}
}
}
@@ -443,7 +435,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
@JvmField val channel: SendChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (SendChannel<E>) -> R
- ) : LockFreeLinkedListNode(), Send, DisposableHandle {
+ ) : Send(), DisposableHandle {
override fun tryResumeSend(idempotent: Any?): Any? =
if (select.trySelect(idempotent)) SELECT_STARTED else null
@@ -452,11 +444,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
block.startCoroutine(receiver = channel, completion = select.completion)
}
- fun disposeOnSelect() {
- select.disposeOnSelect(this)
- }
-
- override fun dispose() {
+ override fun dispose() { // invoked on select completion
remove()
}
@@ -470,7 +458,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
internal class SendBuffered<out E>(
@JvmField val element: E
- ) : LockFreeLinkedListNode(), Send {
+ ) : Send() {
override val pollResult: Any? get() = element
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
@@ -556,8 +544,8 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
@Suppress("UNCHECKED_CAST")
- private suspend fun <R> receiveSuspend(onClose: Int): R = suspendAtomicCancellableCoroutine sc@ { cont ->
- val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, onClose)
+ private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutine sc@ { cont ->
+ val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, receiveMode)
while (true) {
if (enqueueReceive(receive)) {
removeReceiveOnCancel(cont, receive)
@@ -578,7 +566,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
private fun enqueueReceive(receive: Receive<E>): Boolean {
val result = if (isBufferAlwaysEmpty)
- queue.addLastIfPrev(receive, { it !is Send }) else
+ queue.addLastIfPrev(receive) { it !is Send } else
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
if (result) onReceiveEnqueued()
return result
@@ -659,10 +647,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@JvmField var resumeToken: Any? = null
@JvmField var pollResult: E? = null
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected is Closed<*>) return affected
- if (affected !is Send) return POLL_FAILED
- return null
+ override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
+ is Closed<*> -> affected
+ !is Send -> POLL_FAILED
+ else -> null
}
@Suppress("UNCHECKED_CAST")
@@ -674,30 +662,6 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}
- private inner class TryEnqueueReceiveDesc<E, R>(
- select: SelectInstance<R>,
- block: suspend (Any?) -> R,
- receiveMode: Int
- ) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, receiveMode)) {
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected is Send) return ENQUEUE_FAILED
- return null
- }
-
- override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
- if (!isBufferEmpty) return ENQUEUE_FAILED
- return super.onPrepare(affected, next)
- }
-
- override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
- super.finishOnSuccess(affected, next)
- // notify the there is one more receiver
- onReceiveEnqueued()
- // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
- node.removeOnSelectCompletion()
- }
- }
-
final override val onReceive: SelectClause1<E>
get() = object : SelectClause1<E> {
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
@@ -710,7 +674,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
- if (registerEnqueueDesc(select, block, RECEIVE_THROWS_ON_CLOSE)) return
+ if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_THROWS_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -738,7 +702,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
- if (registerEnqueueDesc(select, block, RECEIVE_NULL_ON_CLOSE)) return
+ if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_NULL_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -775,7 +739,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
- if (registerEnqueueDesc(select, block, RECEIVE_RESULT)) return
+ if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_RESULT)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -794,18 +758,15 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}
- private fun <R, E> registerEnqueueDesc(
- select: SelectInstance<R>, block: suspend (E) -> R,
+ private fun <R> enqueueReceiveSelect(
+ select: SelectInstance<R>,
+ block: suspend (Any?) -> R,
receiveMode: Int
): Boolean {
- @Suppress("UNCHECKED_CAST")
- val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, receiveMode)
- val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true
- return when {
- enqueueResult === ALREADY_SELECTED -> true
- enqueueResult === ENQUEUE_FAILED -> false // retry
- else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
- }
+ val node = ReceiveSelect(this, select, block, receiveMode)
+ val result = enqueueReceive(node)
+ if (result) select.disposeOnSelect(node)
+ return result
}
// ------ protected ------
@@ -917,7 +878,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
else -> cont.resumeWithException(closed.receiveException)
}
}
- override fun toString(): String = "ReceiveElement[$cont,receiveMode=$receiveMode]"
+ override fun toString(): String = "ReceiveElement[receiveMode=$receiveMode]"
}
private class ReceiveHasNext<E>(
@@ -957,10 +918,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
cont.completeResume(token)
}
}
- override fun toString(): String = "ReceiveHasNext[$cont]"
+ override fun toString(): String = "ReceiveHasNext"
}
- private inner class ReceiveSelect<R, in E>(
+ private class ReceiveSelect<R, E>(
+ @JvmField val channel: AbstractChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
@@ -987,13 +949,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}
- fun removeOnSelectCompletion() {
- select.disposeOnSelect(this)
- }
-
override fun dispose() { // invoked on select completion
if (remove())
- onReceiveDequeued() // notify cancellation of receive
+ channel.onReceiveDequeued() // notify cancellation of receive
}
override fun toString(): String = "ReceiveSelect[$select,receiveMode=$receiveMode]"
@@ -1051,11 +1009,11 @@ internal typealias Handler = (Throwable?) -> Unit
/**
* Represents sending waiter in the queue.
*/
-internal interface Send {
- val pollResult: Any? // E | Closed
- fun tryResumeSend(idempotent: Any?): Any?
- fun completeResumeSend(token: Any)
- fun resumeSendClosed(closed: Closed<*>)
+internal abstract class Send : LockFreeLinkedListNode() {
+ abstract val pollResult: Any? // E | Closed
+ abstract fun tryResumeSend(idempotent: Any?): Any?
+ abstract fun completeResumeSend(token: Any)
+ abstract fun resumeSendClosed(closed: Closed<*>)
}
/**
@@ -1074,11 +1032,11 @@ internal interface ReceiveOrClosed<in E> {
internal class SendElement(
override val pollResult: Any?,
@JvmField val cont: CancellableContinuation<Unit>
-) : LockFreeLinkedListNode(), Send {
+) : Send() {
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
override fun completeResumeSend(token: Any) = cont.completeResume(token)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
- override fun toString(): String = "SendElement($pollResult)[$cont]"
+ override fun toString(): String = "SendElement($pollResult)"
}
/**
@@ -1086,7 +1044,7 @@ internal class SendElement(
*/
internal class Closed<in E>(
@JvmField val closeCause: Throwable?
-) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
+) : Send(), ReceiveOrClosed<E> {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt
index c0533070..fc1c72f0 100644
--- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt
+++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
@@ -66,7 +66,7 @@ public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): Abst
public expect abstract class AbstractAtomicDesc : AtomicDesc {
final override fun prepare(op: AtomicOp<*>): Any?
final override fun complete(op: AtomicOp<*>, failure: Any?)
- protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
+ protected open fun failure(affected: LockFreeLinkedListNode): Any?
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt
index 9555f2b9..4626fe1d 100644
--- a/kotlinx-coroutines-core/common/src/selects/Select.kt
+++ b/kotlinx-coroutines-core/common/src/selects/Select.kt
@@ -111,11 +111,6 @@ public interface SelectInstance<in R> {
public fun performAtomicTrySelect(desc: AtomicDesc): Any?
/**
- * Performs action atomically when [isSelected] is `false`.
- */
- public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
-
- /**
* Returns completion continuation of this select instance.
* This select instance must be _selected_ first.
* All resumption through this instance happen _directly_ without going through dispatcher ([MODE_DIRECT]).
@@ -129,6 +124,7 @@ public interface SelectInstance<in R> {
/**
* Disposes the specified handle when this instance is selected.
+ * Note, that [DisposableHandle.dispose] could be called multiple times.
*/
public fun disposeOnSelect(handle: DisposableHandle)
}
@@ -329,16 +325,14 @@ internal class SelectBuilderImpl<in R>(
override fun disposeOnSelect(handle: DisposableHandle) {
val node = DisposeNode(handle)
- while (true) { // lock-free loop on state
- val state = this.state
- if (state === this) {
- if (addLastIf(node, { this.state === this }))
- return
- } else { // already selected
- handle.dispose()
- return
- }
+ // check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
+ if (!isSelected) {
+ addLast(node) // add handle to list
+ // double-check node after adding
+ if (!isSelected) return // all ok - still not selected
}
+ // already selected
+ handle.dispose()
}
private fun doAfterSelect() {
@@ -368,12 +362,11 @@ internal class SelectBuilderImpl<in R>(
}
}
- override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
- override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
+ override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
+ AtomicSelectOp(desc).perform(null)
private inner class AtomicSelectOp(
- @JvmField val desc: AtomicDesc,
- @JvmField val select: Boolean
+ @JvmField val desc: AtomicDesc
) : AtomicOp<Any?>() {
override fun prepare(affected: Any?): Any? {
// only originator of operation makes preparation move of installing descriptor into this selector's state
@@ -405,7 +398,7 @@ internal class SelectBuilderImpl<in R>(
}
private fun completeSelect(failure: Any?) {
- val selectSuccess = select && failure == null
+ val selectSuccess = failure == null
val update = if (selectSuccess) null else this@SelectBuilderImpl
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
if (selectSuccess)
diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
index fa198e13..3c729153 100644
--- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
@@ -246,16 +246,11 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
}
is LockedQueue -> {
check(state.owner !== owner) { "Already locked by $owner" }
- val enqueueOp = TryEnqueueLockDesc(this, owner, state, select, block)
- val failure = select.performAtomicIfNotSelected(enqueueOp)
- when {
- failure == null -> { // successfully enqueued
- select.disposeOnSelect(enqueueOp.node)
- return
- }
- failure === ALREADY_SELECTED -> return // already selected -- bail out
- failure === ENQUEUE_FAIL -> {} // retry
- else -> error("performAtomicIfNotSelected(TryEnqueueLockDesc) returned $failure")
+ val node = LockSelect(owner, this, select, block)
+ if (state.addLastIf(node) { _state.value === state }) {
+ // successfully enqueued
+ select.disposeOnSelect(node)
+ return
}
}
is OpDescriptor -> state.perform(this) // help
@@ -291,19 +286,6 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
}
}
- private class TryEnqueueLockDesc<R>(
- @JvmField val mutex: MutexImpl,
- owner: Any?,
- queue: LockedQueue,
- select: SelectInstance<R>,
- block: suspend (Mutex) -> R
- ) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, mutex, select, block)) {
- override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
- if (mutex._state.value !== queue) return ENQUEUE_FAIL
- return super.onPrepare(affected, next)
- }
- }
-
public override fun holdsLock(owner: Any) =
_state.value.let { state ->
when (state) {
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt
new file mode 100644
index 00000000..a066f6b3
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.selects
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class SelectLinkedListChannelTest : TestBase() {
+ @Test
+ fun testSelectSendWhenClosed() = runTest {
+ expect(1)
+ val c = Channel<Int>(Channel.UNLIMITED)
+ c.send(1) // enqueue buffered element
+ c.close() // then close
+ assertFailsWith<ClosedSendChannelException> {
+ // select sender should fail
+ expect(2)
+ select {
+ c.onSend(2) {
+ expectUnreached()
+ }
+ }
+ }
+ finish(3)
+ }
+} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
index ed8b8d36..6072cc2c 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
@@ -403,6 +403,29 @@ class SelectRendezvousChannelTest : TestBase() {
finish(10)
}
+ @Test
+ fun testSelectSendWhenClosed() = runTest {
+ expect(1)
+ val c = Channel<Int>(Channel.RENDEZVOUS)
+ val sender = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ c.send(1) // enqueue sender
+ expectUnreached()
+ }
+ c.close() // then close
+ assertFailsWith<ClosedSendChannelException> {
+ // select sender should fail
+ expect(3)
+ select {
+ c.onSend(2) {
+ expectUnreached()
+ }
+ }
+ }
+ sender.cancel()
+ finish(4)
+ }
+
// only for debugging
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
this as SelectBuilderImpl // type assertion
diff --git a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt
index 3f179d1d..60509010 100644
--- a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt
+++ b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
@@ -129,13 +129,13 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() {
actual final override fun prepare(op: AtomicOp<*>): Any? {
val affected = affectedNode
val next = affected._next
- val failure = failure(affected, next)
+ val failure = failure(affected)
if (failure != null) return failure
return onPrepare(affected, next)
}
actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete()
- protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default
+ protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default
protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds
protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
index 7d28de25..d3d168a4 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
@@ -254,26 +254,6 @@ public actual open class LockFreeLinkedListNode {
finishRemove(removed.ref)
}
- public open fun describeRemove() : AtomicDesc? {
- if (isRemoved) return null // fast path if was already removed
- return object : AbstractAtomicDesc() {
- private val _originalNext = atomic<Node?>(null)
- override val affectedNode: Node? get() = this@LockFreeLinkedListNode
- override val originalNext get() = _originalNext.value
- override fun failure(affected: Node, next: Any): Any? =
- if (next is Removed) ALREADY_REMOVED else null
- override fun onPrepare(affected: Node, next: Node): Any? {
- // Note: onPrepare must use CAS to make sure the stale invocation is not
- // going to overwrite the previous decision on successful preparation.
- // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes
- _originalNext.compareAndSet(null, next)
- return null // always success
- }
- override fun updatedNext(affected: Node, next: Node) = next.removed()
- override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
- }
- }
-
public actual fun removeFirstOrNull(): Node? {
while (true) { // try to linearize
val first = next as Node
@@ -376,7 +356,7 @@ public actual open class LockFreeLinkedListNode {
final override val originalNext: Node? get() = _originalNext.value
// check node predicates here, must signal failure if affect is not of type T
- protected override fun failure(affected: Node, next: Any): Any? =
+ protected override fun failure(affected: Node): Any? =
if (affected === queue) LIST_EMPTY else null
// validate the resulting node (return false if it should be deleted)
@@ -408,7 +388,7 @@ public actual open class LockFreeLinkedListNode {
protected abstract val affectedNode: Node?
protected abstract val originalNext: Node?
protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!!
- protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
+ protected open fun failure(affected: Node): Any? = null // next: Node | Removed
protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure
protected abstract fun updatedNext(affected: Node, next: Node): Any
@@ -460,7 +440,7 @@ public actual open class LockFreeLinkedListNode {
continue // and retry
}
// next: Node | Removed
- val failure = failure(affected, next)
+ val failure = failure(affected)
if (failure != null) return failure // signal failure
if (retry(affected, next)) continue // retry operation
val prepareOp = PrepareOp(next as Node, op as AtomicOp<Node>, this)
@@ -684,8 +664,6 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
// just a defensive programming -- makes sure that list head sentinel is never removed
public actual final override fun remove(): Boolean = throw UnsupportedOperationException()
- public final override fun describeRemove(): Nothing = throw UnsupportedOperationException()
-
internal fun validate() {
var prev: Node = this
var cur: Node = next as Node
diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt
index 1400441f..9238681e 100644
--- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
@@ -48,29 +48,6 @@ class LockFreeLinkedListTest {
}
@Test
- fun testRemoveTwoAtomic() {
- val list = LockFreeLinkedListHead()
- val n1 = IntNode(1).apply { list.addLast(this) }
- val n2 = IntNode(2).apply { list.addLast(this) }
- assertContents(list, 1, 2)
- assertFalse(n1.isRemoved)
- assertFalse(n2.isRemoved)
- val remove1Desc = n1.describeRemove()!!
- val remove2Desc = n2.describeRemove()!!
- val operation = object : AtomicOp<Any?>() {
- override fun prepare(affected: Any?): Any? = remove1Desc.prepare(this) ?: remove2Desc.prepare(this)
- override fun complete(affected: Any?, failure: Any?) {
- remove1Desc.complete(this, failure)
- remove2Desc.complete(this, failure)
- }
- }
- assertTrue(operation.perform(null) == null)
- assertTrue(n1.isRemoved)
- assertTrue(n2.isRemoved)
- assertContents(list)
- }
-
- @Test
fun testAtomicOpsSingle() {
val list = LockFreeLinkedListHead()
assertContents(list)
@@ -82,16 +59,6 @@ class LockFreeLinkedListTest {
assertContents(list, 1, 2, 3)
val n4 = IntNode(4).also { single(list.describeAddLast(it)) }
assertContents(list, 1, 2, 3, 4)
- single(n3.describeRemove()!!)
- assertContents(list, 1, 2, 4)
- assertTrue(n3.describeRemove() == null)
- single(list.describeRemoveFirst())
- assertContents(list, 2, 4)
- assertTrue(n1.describeRemove() == null)
- assertTrue(n2.remove())
- assertContents(list, 4)
- assertTrue(n4.remove())
- assertContents(list)
}
private fun single(part: AtomicDesc) {
diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectMemoryLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectMemoryLeakStressTest.kt
new file mode 100644
index 00000000..7f924dba
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/selects/SelectMemoryLeakStressTest.kt
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.selects
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class SelectMemoryLeakStressTest : TestBase() {
+ private val nRepeat = 1_000_000 * stressTestMultiplier
+
+ @Test
+ fun testLeakRegisterSend() = runTest {
+ expect(1)
+ val leak = Channel<String>()
+ val data = Channel<Int>(1)
+ repeat(nRepeat) { value ->
+ data.send(value)
+ val bigValue = bigValue() // new instance
+ select {
+ leak.onSend("LEAK") {
+ println("Capture big value into this lambda: $bigValue")
+ expectUnreached()
+ }
+ data.onReceive { received ->
+ assertEquals(value, received)
+ expect(value + 2)
+ }
+ }
+ }
+ finish(nRepeat + 2)
+ }
+
+ @Test
+ fun testLeakRegisterReceive() = runTest {
+ expect(1)
+ val leak = Channel<String>()
+ val data = Channel<Int>(1)
+ repeat(nRepeat) { value ->
+ val bigValue = bigValue() // new instance
+ select<Unit> {
+ leak.onReceive {
+ println("Capture big value into this lambda: $bigValue")
+ expectUnreached()
+ }
+ data.onSend(value) {
+ expect(value + 2)
+ }
+ }
+ assertEquals(value, data.receive())
+ }
+ finish(nRepeat + 2)
+ }
+
+ // capture big value for fast OOM in case of a bug
+ private fun bigValue(): ByteArray = ByteArray(4096)
+} \ No newline at end of file
diff --git a/kotlinx-coroutines-core/native/src/internal/LinkedList.kt b/kotlinx-coroutines-core/native/src/internal/LinkedList.kt
index 7e9657e2..07fe1a06 100644
--- a/kotlinx-coroutines-core/native/src/internal/LinkedList.kt
+++ b/kotlinx-coroutines-core/native/src/internal/LinkedList.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
@@ -127,13 +127,13 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() {
actual final override fun prepare(op: AtomicOp<*>): Any? {
val affected = affectedNode
val next = affected._next
- val failure = failure(affected, next)
+ val failure = failure(affected)
if (failure != null) return failure
return onPrepare(affected, next)
}
actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete()
- protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default
+ protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default
protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds
protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}