diff options
Diffstat (limited to 'okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt')
-rw-r--r-- | okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt | 197 |
1 files changed, 125 insertions, 72 deletions
diff --git a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt index 20778327..a9af3ba8 100644 --- a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt +++ b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt @@ -18,6 +18,9 @@ package okio import java.io.IOException import java.io.InterruptedIOException import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock /** * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to @@ -25,19 +28,18 @@ import java.util.concurrent.TimeUnit * writing. * * Subclasses should override [timedOut] to take action when a timeout occurs. This method will be - * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise + * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise, * we risk starving other timeouts from being triggered. * * Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the * timeout to each operation on the wrapped stream. * - * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterwards. + * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward. * The return value of [exit] indicates whether a timeout was triggered. Note that the call to * [timedOut] is asynchronous, and may be called after [exit]. */ open class AsyncTimeout : Timeout() { - /** True if this node is currently in the queue. */ - private var inQueue = false + private var state = STATE_IDLE /** The next node in the linked list. */ private var next: AsyncTimeout? = null @@ -51,12 +53,38 @@ open class AsyncTimeout : Timeout() { if (timeoutNanos == 0L && !hasDeadline) { return // No timeout and no deadline? Don't bother with the queue. } - scheduleTimeout(this, timeoutNanos, hasDeadline) + + lock.withLock { + check(state == STATE_IDLE) { "Unbalanced enter/exit" } + state = STATE_IN_QUEUE + insertIntoQueue(this, timeoutNanos, hasDeadline) + } } /** Returns true if the timeout occurred. */ fun exit(): Boolean { - return cancelScheduledTimeout(this) + lock.withLock { + val oldState = this.state + state = STATE_IDLE + + if (oldState == STATE_IN_QUEUE) { + removeFromQueue(this) + return false + } else { + return oldState == STATE_TIMED_OUT + } + } + } + + override fun cancel() { + super.cancel() + + lock.withLock { + if (state == STATE_IN_QUEUE) { + removeFromQueue(this) + state = STATE_CANCELED + } + } } /** @@ -170,7 +198,7 @@ open class AsyncTimeout : Timeout() { return e } - private class Watchdog internal constructor() : Thread("Okio Watchdog") { + private class Watchdog : Thread("Okio Watchdog") { init { isDaemon = true } @@ -178,8 +206,8 @@ open class AsyncTimeout : Timeout() { override fun run() { while (true) { try { - var timedOut: AsyncTimeout? = null - synchronized(AsyncTimeout::class.java) { + var timedOut: AsyncTimeout? + lock.withLock { timedOut = awaitTimeout() // The queue is completely empty. Let this thread exit and let another watchdog thread @@ -198,7 +226,10 @@ open class AsyncTimeout : Timeout() { } } - companion object { + private companion object { + val lock: ReentrantLock = ReentrantLock() + val condition: Condition = lock.newCondition() + /** * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow * connections may suffer timeouts even when they're making (slow) progress. Without this, @@ -210,6 +241,43 @@ open class AsyncTimeout : Timeout() { private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60) private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS) + /* + * .-------------. + * | | + * .------------ exit() ------| CANCELED | + * | | | + * | '-------------' + * | ^ + * | | cancel() + * v | + * .-------------. .-------------. + * | |---- enter() ----->| | + * | IDLE | | IN QUEUE | + * | |<---- exit() ------| | + * '-------------' '-------------' + * ^ | + * | | time out + * | v + * | .-------------. + * | | | + * '------------ exit() ------| TIMED OUT | + * | | + * '-------------' + * + * Notes: + * * enter() crashes if called from a state other than IDLE. + * * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to + * track entered but not in the queue. + * * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for + * early implementations to support cases where enter() as a no-op. + * * cancel() is a no-op from every state but IN QUEUE. + */ + + private const val STATE_IDLE = 0 + private const val STATE_IN_QUEUE = 1 + private const val STATE_TIMED_OUT = 2 + private const val STATE_CANCELED = 3 + /** * The watchdog thread processes a linked list of pending timeouts, sorted in the order to be * triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue. @@ -220,86 +288,74 @@ open class AsyncTimeout : Timeout() { */ private var head: AsyncTimeout? = null - private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { - synchronized(AsyncTimeout::class.java) { - check(!node.inQueue) { "Unbalanced enter/exit" } - node.inQueue = true - - // Start the watchdog thread and create the head node when the first timeout is scheduled. - if (head == null) { - head = AsyncTimeout() - Watchdog().start() - } + private fun insertIntoQueue(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { + // Start the watchdog thread and create the head node when the first timeout is scheduled. + if (head == null) { + head = AsyncTimeout() + Watchdog().start() + } - val now = System.nanoTime() - if (timeoutNanos != 0L && hasDeadline) { - // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap - // around, minOf() is undefined for absolute values, but meaningful for relative ones. - node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now) - } else if (timeoutNanos != 0L) { - node.timeoutAt = now + timeoutNanos - } else if (hasDeadline) { - node.timeoutAt = node.deadlineNanoTime() - } else { - throw AssertionError() - } + val now = System.nanoTime() + if (timeoutNanos != 0L && hasDeadline) { + // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap + // around, minOf() is undefined for absolute values, but meaningful for relative ones. + node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now) + } else if (timeoutNanos != 0L) { + node.timeoutAt = now + timeoutNanos + } else if (hasDeadline) { + node.timeoutAt = node.deadlineNanoTime() + } else { + throw AssertionError() + } - // Insert the node in sorted order. - val remainingNanos = node.remainingNanos(now) - var prev = head!! - while (true) { - if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) { - node.next = prev.next - prev.next = node - if (prev === head) { - // Wake up the watchdog when inserting at the front. - (AsyncTimeout::class.java as Object).notify() - } - break + // Insert the node in sorted order. + val remainingNanos = node.remainingNanos(now) + var prev = head!! + while (true) { + if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) { + node.next = prev.next + prev.next = node + if (prev === head) { + // Wake up the watchdog when inserting at the front. + condition.signal() } - prev = prev.next!! + break } + prev = prev.next!! } } /** Returns true if the timeout occurred. */ - private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean { - synchronized(AsyncTimeout::class.java) { - if (!node.inQueue) return false - node.inQueue = false - - // Remove the node from the linked list. - var prev = head - while (prev != null) { - if (prev.next === node) { - prev.next = node.next - node.next = null - return false - } - prev = prev.next + private fun removeFromQueue(node: AsyncTimeout) { + var prev = head + while (prev != null) { + if (prev.next === node) { + prev.next = node.next + node.next = null + return } - - // The node wasn't found in the linked list: it must have timed out! - return true + prev = prev.next } + + error("node was not found in the queue") } /** * Removes and returns the node at the head of the list, waiting for it to time out if * necessary. This returns [head] if there was no node at the head of the list when starting, * and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a - * new node was inserted while waiting. Otherwise this returns the node being waited on that has - * been removed. + * new node was inserted while waiting. Otherwise, this returns the node being waited on that + * has been removed. */ @Throws(InterruptedException::class) - internal fun awaitTimeout(): AsyncTimeout? { + fun awaitTimeout(): AsyncTimeout? { // Get the next eligible node. val node = head!!.next // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. if (node == null) { val startNanos = System.nanoTime() - (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS) + condition.await(IDLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) { head // The idle timeout elapsed. } else { @@ -307,21 +363,18 @@ open class AsyncTimeout : Timeout() { } } - var waitNanos = node.remainingNanos(System.nanoTime()) + val waitNanos = node.remainingNanos(System.nanoTime()) // The head of the queue hasn't timed out yet. Await that. if (waitNanos > 0) { - // Waiting is made complicated by the fact that we work in nanoseconds, - // but the API wants (millis, nanos) in two arguments. - val waitMillis = waitNanos / 1000000L - waitNanos -= waitMillis * 1000000L - (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt()) + condition.await(waitNanos, TimeUnit.NANOSECONDS) return null } // The head of the queue has timed out. Remove it. head!!.next = node.next node.next = null + node.state = STATE_TIMED_OUT return node } } |