diff options
Diffstat (limited to 'okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt')
-rw-r--r-- | okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt new file mode 100644 index 00000000..20778327 --- /dev/null +++ b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt @@ -0,0 +1,328 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio + +import java.io.IOException +import java.io.InterruptedIOException +import java.util.concurrent.TimeUnit + +/** + * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to + * implement timeouts where they aren't supported natively, such as to sockets that are blocked on + * writing. + * + * Subclasses should override [timedOut] to take action when a timeout occurs. This method will be + * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise + * we risk starving other timeouts from being triggered. + * + * Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the + * timeout to each operation on the wrapped stream. + * + * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterwards. + * The return value of [exit] indicates whether a timeout was triggered. Note that the call to + * [timedOut] is asynchronous, and may be called after [exit]. + */ +open class AsyncTimeout : Timeout() { + /** True if this node is currently in the queue. */ + private var inQueue = false + + /** The next node in the linked list. */ + private var next: AsyncTimeout? = null + + /** If scheduled, this is the time that the watchdog should time this out. */ + private var timeoutAt = 0L + + fun enter() { + val timeoutNanos = timeoutNanos() + val hasDeadline = hasDeadline() + if (timeoutNanos == 0L && !hasDeadline) { + return // No timeout and no deadline? Don't bother with the queue. + } + scheduleTimeout(this, timeoutNanos, hasDeadline) + } + + /** Returns true if the timeout occurred. */ + fun exit(): Boolean { + return cancelScheduledTimeout(this) + } + + /** + * Returns the amount of time left until the time out. This will be negative if the timeout has + * elapsed and the timeout should occur immediately. + */ + private fun remainingNanos(now: Long) = timeoutAt - now + + /** + * Invoked by the watchdog thread when the time between calls to [enter] and [exit] has exceeded + * the timeout. + */ + protected open fun timedOut() {} + + /** + * Returns a new sink that delegates to [sink], using this to implement timeouts. This works + * best if [timedOut] is overridden to interrupt [sink]'s current operation. + */ + fun sink(sink: Sink): Sink { + return object : Sink { + override fun write(source: Buffer, byteCount: Long) { + checkOffsetAndCount(source.size, 0, byteCount) + + var remaining = byteCount + while (remaining > 0L) { + // Count how many bytes to write. This loop guarantees we split on a segment boundary. + var toWrite = 0L + var s = source.head!! + while (toWrite < TIMEOUT_WRITE_SIZE) { + val segmentSize = s.limit - s.pos + toWrite += segmentSize.toLong() + if (toWrite >= remaining) { + toWrite = remaining + break + } + s = s.next!! + } + + // Emit one write. Only this section is subject to the timeout. + withTimeout { sink.write(source, toWrite) } + remaining -= toWrite + } + } + + override fun flush() { + withTimeout { sink.flush() } + } + + override fun close() { + withTimeout { sink.close() } + } + + override fun timeout() = this@AsyncTimeout + + override fun toString() = "AsyncTimeout.sink($sink)" + } + } + + /** + * Returns a new source that delegates to [source], using this to implement timeouts. This works + * best if [timedOut] is overridden to interrupt [source]'s current operation. + */ + fun source(source: Source): Source { + return object : Source { + override fun read(sink: Buffer, byteCount: Long): Long { + return withTimeout { source.read(sink, byteCount) } + } + + override fun close() { + withTimeout { source.close() } + } + + override fun timeout() = this@AsyncTimeout + + override fun toString() = "AsyncTimeout.source($source)" + } + } + + /** + * Surrounds [block] with calls to [enter] and [exit], throwing an exception from + * [newTimeoutException] if a timeout occurred. + */ + inline fun <T> withTimeout(block: () -> T): T { + var throwOnTimeout = false + enter() + try { + val result = block() + throwOnTimeout = true + return result + } catch (e: IOException) { + throw if (!exit()) e else `access$newTimeoutException`(e) + } finally { + val timedOut = exit() + if (timedOut && throwOnTimeout) throw `access$newTimeoutException`(null) + } + } + + @PublishedApi // Binary compatible trampoline function + internal fun `access$newTimeoutException`(cause: IOException?) = newTimeoutException(cause) + + /** + * Returns an [IOException] to represent a timeout. By default this method returns + * [InterruptedIOException]. If [cause] is non-null it is set as the cause of the + * returned exception. + */ + protected open fun newTimeoutException(cause: IOException?): IOException { + val e = InterruptedIOException("timeout") + if (cause != null) { + e.initCause(cause) + } + return e + } + + private class Watchdog internal constructor() : Thread("Okio Watchdog") { + init { + isDaemon = true + } + + override fun run() { + while (true) { + try { + var timedOut: AsyncTimeout? = null + synchronized(AsyncTimeout::class.java) { + timedOut = awaitTimeout() + + // The queue is completely empty. Let this thread exit and let another watchdog thread + // get created on the next call to scheduleTimeout(). + if (timedOut === head) { + head = null + return + } + } + + // Close the timed out node, if one was found. + timedOut?.timedOut() + } catch (ignored: InterruptedException) { + } + } + } + } + + companion object { + /** + * Don't write more than 64 KiB of data at a time, give or take a segment. Otherwise slow + * connections may suffer timeouts even when they're making (slow) progress. Without this, + * writing a single 1 MiB buffer may never succeed on a sufficiently slow connection. + */ + private const val TIMEOUT_WRITE_SIZE = 64 * 1024 + + /** Duration for the watchdog thread to be idle before it shuts itself down. */ + private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60) + private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS) + + /** + * The watchdog thread processes a linked list of pending timeouts, sorted in the order to be + * triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue. + * + * Head's 'next' points to the first element of the linked list. The first element is the next + * node to time out, or null if the queue is empty. The head is null until the watchdog thread + * is started and also after being idle for [AsyncTimeout.IDLE_TIMEOUT_MILLIS]. + */ + private var head: AsyncTimeout? = null + + private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { + synchronized(AsyncTimeout::class.java) { + check(!node.inQueue) { "Unbalanced enter/exit" } + node.inQueue = true + + // Start the watchdog thread and create the head node when the first timeout is scheduled. + if (head == null) { + head = AsyncTimeout() + Watchdog().start() + } + + val now = System.nanoTime() + if (timeoutNanos != 0L && hasDeadline) { + // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap + // around, minOf() is undefined for absolute values, but meaningful for relative ones. + node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now) + } else if (timeoutNanos != 0L) { + node.timeoutAt = now + timeoutNanos + } else if (hasDeadline) { + node.timeoutAt = node.deadlineNanoTime() + } else { + throw AssertionError() + } + + // Insert the node in sorted order. + val remainingNanos = node.remainingNanos(now) + var prev = head!! + while (true) { + if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) { + node.next = prev.next + prev.next = node + if (prev === head) { + // Wake up the watchdog when inserting at the front. + (AsyncTimeout::class.java as Object).notify() + } + break + } + prev = prev.next!! + } + } + } + + /** Returns true if the timeout occurred. */ + private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean { + synchronized(AsyncTimeout::class.java) { + if (!node.inQueue) return false + node.inQueue = false + + // Remove the node from the linked list. + var prev = head + while (prev != null) { + if (prev.next === node) { + prev.next = node.next + node.next = null + return false + } + prev = prev.next + } + + // The node wasn't found in the linked list: it must have timed out! + return true + } + } + + /** + * Removes and returns the node at the head of the list, waiting for it to time out if + * necessary. This returns [head] if there was no node at the head of the list when starting, + * and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a + * new node was inserted while waiting. Otherwise this returns the node being waited on that has + * been removed. + */ + @Throws(InterruptedException::class) + internal fun awaitTimeout(): AsyncTimeout? { + // Get the next eligible node. + val node = head!!.next + + // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. + if (node == null) { + val startNanos = System.nanoTime() + (AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS) + return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) { + head // The idle timeout elapsed. + } else { + null // The situation has changed. + } + } + + var waitNanos = node.remainingNanos(System.nanoTime()) + + // The head of the queue hasn't timed out yet. Await that. + if (waitNanos > 0) { + // Waiting is made complicated by the fact that we work in nanoseconds, + // but the API wants (millis, nanos) in two arguments. + val waitMillis = waitNanos / 1000000L + waitNanos -= waitMillis * 1000000L + (AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt()) + return null + } + + // The head of the queue has timed out. Remove it. + head!!.next = node.next + node.next = null + return node + } + } +} |