aboutsummaryrefslogtreecommitdiff
path: root/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt
diff options
context:
space:
mode:
Diffstat (limited to 'okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt')
-rw-r--r--okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt197
1 files changed, 125 insertions, 72 deletions
diff --git a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt
index 20778327..a9af3ba8 100644
--- a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt
+++ b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt
@@ -18,6 +18,9 @@ package okio
import java.io.IOException
import java.io.InterruptedIOException
import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.Condition
+import java.util.concurrent.locks.ReentrantLock
+import kotlin.concurrent.withLock
/**
* This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
@@ -25,19 +28,18 @@ import java.util.concurrent.TimeUnit
* writing.
*
* Subclasses should override [timedOut] to take action when a timeout occurs. This method will be
- * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise
+ * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise,
* we risk starving other timeouts from being triggered.
*
* Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the
* timeout to each operation on the wrapped stream.
*
- * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterwards.
+ * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward.
* The return value of [exit] indicates whether a timeout was triggered. Note that the call to
* [timedOut] is asynchronous, and may be called after [exit].
*/
open class AsyncTimeout : Timeout() {
- /** True if this node is currently in the queue. */
- private var inQueue = false
+ private var state = STATE_IDLE
/** The next node in the linked list. */
private var next: AsyncTimeout? = null
@@ -51,12 +53,38 @@ open class AsyncTimeout : Timeout() {
if (timeoutNanos == 0L && !hasDeadline) {
return // No timeout and no deadline? Don't bother with the queue.
}
- scheduleTimeout(this, timeoutNanos, hasDeadline)
+
+ lock.withLock {
+ check(state == STATE_IDLE) { "Unbalanced enter/exit" }
+ state = STATE_IN_QUEUE
+ insertIntoQueue(this, timeoutNanos, hasDeadline)
+ }
}
/** Returns true if the timeout occurred. */
fun exit(): Boolean {
- return cancelScheduledTimeout(this)
+ lock.withLock {
+ val oldState = this.state
+ state = STATE_IDLE
+
+ if (oldState == STATE_IN_QUEUE) {
+ removeFromQueue(this)
+ return false
+ } else {
+ return oldState == STATE_TIMED_OUT
+ }
+ }
+ }
+
+ override fun cancel() {
+ super.cancel()
+
+ lock.withLock {
+ if (state == STATE_IN_QUEUE) {
+ removeFromQueue(this)
+ state = STATE_CANCELED
+ }
+ }
}
/**
@@ -170,7 +198,7 @@ open class AsyncTimeout : Timeout() {
return e
}
- private class Watchdog internal constructor() : Thread("Okio Watchdog") {
+ private class Watchdog : Thread("Okio Watchdog") {
init {
isDaemon = true
}
@@ -178,8 +206,8 @@ open class AsyncTimeout : Timeout() {
override fun run() {
while (true) {
try {
- var timedOut: AsyncTimeout? = null
- synchronized(AsyncTimeout::class.java) {
+ var timedOut: AsyncTimeout?
+ lock.withLock {
timedOut = awaitTimeout()
// The queue is completely empty. Let this thread exit and let another watchdog thread
@@ -198,7 +226,10 @@ open class AsyncTimeout : Timeout() {
}
}
- companion object {
+ private companion object {
+ val lock: ReentrantLock = ReentrantLock()
+ val condition: Condition = lock.newCondition()
+
/**
* Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow
* connections may suffer timeouts even when they're making (slow) progress. Without this,
@@ -210,6 +241,43 @@ open class AsyncTimeout : Timeout() {
private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)
private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)
+ /*
+ * .-------------.
+ * | |
+ * .------------ exit() ------| CANCELED |
+ * | | |
+ * | '-------------'
+ * | ^
+ * | | cancel()
+ * v |
+ * .-------------. .-------------.
+ * | |---- enter() ----->| |
+ * | IDLE | | IN QUEUE |
+ * | |<---- exit() ------| |
+ * '-------------' '-------------'
+ * ^ |
+ * | | time out
+ * | v
+ * | .-------------.
+ * | | |
+ * '------------ exit() ------| TIMED OUT |
+ * | |
+ * '-------------'
+ *
+ * Notes:
+ * * enter() crashes if called from a state other than IDLE.
+ * * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to
+ * track entered but not in the queue.
+ * * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for
+ * early implementations to support cases where enter() as a no-op.
+ * * cancel() is a no-op from every state but IN QUEUE.
+ */
+
+ private const val STATE_IDLE = 0
+ private const val STATE_IN_QUEUE = 1
+ private const val STATE_TIMED_OUT = 2
+ private const val STATE_CANCELED = 3
+
/**
* The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
* triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
@@ -220,86 +288,74 @@ open class AsyncTimeout : Timeout() {
*/
private var head: AsyncTimeout? = null
- private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
- synchronized(AsyncTimeout::class.java) {
- check(!node.inQueue) { "Unbalanced enter/exit" }
- node.inQueue = true
-
- // Start the watchdog thread and create the head node when the first timeout is scheduled.
- if (head == null) {
- head = AsyncTimeout()
- Watchdog().start()
- }
+ private fun insertIntoQueue(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
+ // Start the watchdog thread and create the head node when the first timeout is scheduled.
+ if (head == null) {
+ head = AsyncTimeout()
+ Watchdog().start()
+ }
- val now = System.nanoTime()
- if (timeoutNanos != 0L && hasDeadline) {
- // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
- // around, minOf() is undefined for absolute values, but meaningful for relative ones.
- node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
- } else if (timeoutNanos != 0L) {
- node.timeoutAt = now + timeoutNanos
- } else if (hasDeadline) {
- node.timeoutAt = node.deadlineNanoTime()
- } else {
- throw AssertionError()
- }
+ val now = System.nanoTime()
+ if (timeoutNanos != 0L && hasDeadline) {
+ // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
+ // around, minOf() is undefined for absolute values, but meaningful for relative ones.
+ node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
+ } else if (timeoutNanos != 0L) {
+ node.timeoutAt = now + timeoutNanos
+ } else if (hasDeadline) {
+ node.timeoutAt = node.deadlineNanoTime()
+ } else {
+ throw AssertionError()
+ }
- // Insert the node in sorted order.
- val remainingNanos = node.remainingNanos(now)
- var prev = head!!
- while (true) {
- if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
- node.next = prev.next
- prev.next = node
- if (prev === head) {
- // Wake up the watchdog when inserting at the front.
- (AsyncTimeout::class.java as Object).notify()
- }
- break
+ // Insert the node in sorted order.
+ val remainingNanos = node.remainingNanos(now)
+ var prev = head!!
+ while (true) {
+ if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
+ node.next = prev.next
+ prev.next = node
+ if (prev === head) {
+ // Wake up the watchdog when inserting at the front.
+ condition.signal()
}
- prev = prev.next!!
+ break
}
+ prev = prev.next!!
}
}
/** Returns true if the timeout occurred. */
- private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
- synchronized(AsyncTimeout::class.java) {
- if (!node.inQueue) return false
- node.inQueue = false
-
- // Remove the node from the linked list.
- var prev = head
- while (prev != null) {
- if (prev.next === node) {
- prev.next = node.next
- node.next = null
- return false
- }
- prev = prev.next
+ private fun removeFromQueue(node: AsyncTimeout) {
+ var prev = head
+ while (prev != null) {
+ if (prev.next === node) {
+ prev.next = node.next
+ node.next = null
+ return
}
-
- // The node wasn't found in the linked list: it must have timed out!
- return true
+ prev = prev.next
}
+
+ error("node was not found in the queue")
}
/**
* Removes and returns the node at the head of the list, waiting for it to time out if
* necessary. This returns [head] if there was no node at the head of the list when starting,
* and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a
- * new node was inserted while waiting. Otherwise this returns the node being waited on that has
- * been removed.
+ * new node was inserted while waiting. Otherwise, this returns the node being waited on that
+ * has been removed.
*/
@Throws(InterruptedException::class)
- internal fun awaitTimeout(): AsyncTimeout? {
+ fun awaitTimeout(): AsyncTimeout? {
// Get the next eligible node.
val node = head!!.next
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
val startNanos = System.nanoTime()
- (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
+ condition.await(IDLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)
return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
head // The idle timeout elapsed.
} else {
@@ -307,21 +363,18 @@ open class AsyncTimeout : Timeout() {
}
}
- var waitNanos = node.remainingNanos(System.nanoTime())
+ val waitNanos = node.remainingNanos(System.nanoTime())
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
- // Waiting is made complicated by the fact that we work in nanoseconds,
- // but the API wants (millis, nanos) in two arguments.
- val waitMillis = waitNanos / 1000000L
- waitNanos -= waitMillis * 1000000L
- (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
+ condition.await(waitNanos, TimeUnit.NANOSECONDS)
return null
}
// The head of the queue has timed out. Remove it.
head!!.next = node.next
node.next = null
+ node.state = STATE_TIMED_OUT
return node
}
}