summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJesse Wilson <jwilson@squareup.com>2024-04-28 20:33:04 -0400
committerGitHub <noreply@github.com>2024-04-28 20:33:04 -0400
commitd0b6a464d8f660941a47b40fb8ed67508550921c (patch)
tree36e39ddd2d962e0663f121f20cf3a5656e900566
parentaede7c57f3e03fa1c8338268675ccc711eeeeefa (diff)
downloadokhttp4-upstream-master.tar.gz
Start fewer threads in TaskRunner (#8391)upstream-master
We've got a race where we'll start a thread when we need one, even if we've already started a thread. This changes TaskRunner's behavior to never add a thread if we're still waiting for a recently-added one to start running. This is intended to reduce the number of threads contenting for the TaskRunner lock as reported in this issue: https://github.com/square/okhttp/issues/8388
-rw-r--r--okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt16
-rw-r--r--okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt33
-rw-r--r--okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt90
3 files changed, 129 insertions, 10 deletions
diff --git a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt
index 88dfd7936..1874cd9ca 100644
--- a/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt
+++ b/okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt
@@ -63,29 +63,32 @@ class TaskFaker : Closeable {
/**
* True if this task faker has ever had multiple tasks scheduled to run concurrently. Guarded by
- * [taskRunner].
+ * [TaskRunner.lock].
*/
var isParallel = false
+ /** Number of calls to [TaskRunner.Backend.execute]. Guarded by [TaskRunner.lock]. */
+ var executeCallCount = 0
+
/** Guarded by [taskRunner]. */
var nanoTime = 0L
private set
- /** Backlog of tasks to run. Only one task runs at a time. Guarded by [taskRunner]. */
+ /** Backlog of tasks to run. Only one task runs at a time. Guarded by [TaskRunner.lock]. */
private val serialTaskQueue = ArrayDeque<SerialTask>()
- /** The task that's currently executing. Guarded by [taskRunner]. */
+ /** The task that's currently executing. Guarded by [TaskRunner.lock]. */
private var currentTask: SerialTask = TestThreadSerialTask
- /** The coordinator task if it's waiting, and how it will resume. Guarded by [taskRunner]. */
+ /** The coordinator task if it's waiting, and how it will resume. Guarded by [TaskRunner.lock]. */
private var waitingCoordinatorTask: SerialTask? = null
private var waitingCoordinatorInterrupted = false
private var waitingCoordinatorNotified = false
- /** How many times a new task has been started. Guarded by [taskRunner]. */
+ /** How many times a new task has been started. Guarded by [TaskRunner.lock]. */
private var contextSwitchCount = 0
- /** Guarded by [taskRunner]. */
+ /** Guarded by [TaskRunner.lock]. */
private var activeThreads = 0
/** A task runner that posts tasks to this fake. Tasks won't be executed until requested. */
@@ -100,6 +103,7 @@ class TaskFaker : Closeable {
val queuedTask = RunnableSerialTask(runnable)
serialTaskQueue += queuedTask
+ executeCallCount++
isParallel = serialTaskQueue.size > 1
}
diff --git a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
index 6acc7b24e..60143b3c4 100644
--- a/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
+++ b/okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt
@@ -52,6 +52,17 @@ class TaskRunner(
private var coordinatorWaiting = false
private var coordinatorWakeUpAt = 0L
+ /**
+ * When we need a new thread to run tasks, we call [Backend.execute]. A few microseconds later we
+ * expect a newly-started thread to call [Runnable.run]. We shouldn't request new threads until
+ * the already-requested ones are in service, otherwise we might create more threads than we need.
+ *
+ * We use [executeCallCount] and [runCallCount] to defend against starting more threads than we
+ * need. Both fields are guarded by [lock].
+ */
+ private var executeCallCount = 0
+ private var runCallCount = 0
+
/** Queues with tasks that are currently executing their [TaskQueue.activeTask]. */
private val busyQueues = mutableListOf<TaskQueue>()
@@ -61,9 +72,14 @@ class TaskRunner(
private val runnable: Runnable =
object : Runnable {
override fun run() {
+ var incrementedRunCallCount = false
while (true) {
val task =
this@TaskRunner.lock.withLock {
+ if (!incrementedRunCallCount) {
+ incrementedRunCallCount = true
+ runCallCount++
+ }
awaitTaskToRun()
} ?: return
@@ -76,7 +92,7 @@ class TaskRunner(
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
lock.withLock {
- backend.execute(this@TaskRunner, this)
+ startAnotherThread()
}
}
}
@@ -99,7 +115,7 @@ class TaskRunner(
if (coordinatorWaiting) {
backend.coordinatorNotify(this@TaskRunner)
} else {
- backend.execute(this@TaskRunner, runnable)
+ startAnotherThread()
}
}
@@ -157,7 +173,7 @@ class TaskRunner(
* Returns an immediately-executable task for the calling thread to execute, sleeping as necessary
* until one is ready. If there are no ready queues, or if other threads have everything under
* control this will return null. If there is more than a single task ready to execute immediately
- * this will launch another thread to handle that work.
+ * this will start another thread to handle that work.
*/
fun awaitTaskToRun(): Task? {
lock.assertHeld()
@@ -207,7 +223,7 @@ class TaskRunner(
// Also start another thread if there's more work or scheduling to do.
if (multipleReadyTasks || !coordinatorWaiting && readyQueues.isNotEmpty()) {
- backend.execute(this@TaskRunner, runnable)
+ startAnotherThread()
}
return readyTask
@@ -238,6 +254,15 @@ class TaskRunner(
}
}
+ /** Start another thread, unless a new thread is already scheduled to start. */
+ private fun startAnotherThread() {
+ lock.assertHeld()
+ if (executeCallCount > runCallCount) return // A thread is still starting.
+
+ executeCallCount++
+ backend.execute(this@TaskRunner, runnable)
+ }
+
fun newQueue(): TaskQueue {
val name = lock.withLock { nextQueueName++ }
return TaskQueue(this, "Q$name")
diff --git a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt
index 5f343b563..36b0c4c72 100644
--- a/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt
+++ b/okhttp/src/test/java/okhttp3/internal/concurrent/TaskRunnerTest.kt
@@ -680,6 +680,96 @@ class TaskRunnerTest {
assertThat(idleLatch2).isSameAs(idleLatch1)
}
+ @Test fun cancelAllWhenEmptyDoesNotStartWorkerThread() {
+ redQueue.execute("red task", 100.µs) {
+ error("expected to be canceled")
+ }
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+
+ blueQueue.execute("task", 100.µs) {
+ error("expected to be canceled")
+ }
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+
+ redQueue.cancelAll()
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+
+ blueQueue.cancelAll()
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+ }
+
+ @Test fun noMoreThanOneWorkerThreadWaitingToStartAtATime() {
+ // Enqueueing the red task starts a thread because the head of the queue changed.
+ redQueue.execute("red task") {
+ log += "red:starting@${taskFaker.nanoTime}"
+ taskFaker.sleep(100.µs)
+ log += "red:finishing@${taskFaker.nanoTime}"
+ }
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+
+ // Enqueueing the blue task doesn't start a thread because the red one is still starting.
+ blueQueue.execute("blue task") {
+ log += "blue:starting@${taskFaker.nanoTime}"
+ taskFaker.sleep(100.µs)
+ log += "blue:finishing@${taskFaker.nanoTime}"
+ }
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+
+ // Running the red task starts another thread, so the two can run in parallel.
+ taskFaker.runNextTask()
+ assertThat(log).containsExactly("red:starting@0")
+ assertThat(taskFaker.executeCallCount).isEqualTo(2)
+
+ // Next the blue task starts.
+ taskFaker.runNextTask()
+ assertThat(log).containsExactly(
+ "red:starting@0",
+ "blue:starting@0",
+ )
+ assertThat(taskFaker.executeCallCount).isEqualTo(2)
+
+ // Advance time until the tasks complete.
+ taskFaker.advanceUntil(100.µs)
+ assertThat(log).containsExactly(
+ "red:starting@0",
+ "blue:starting@0",
+ "red:finishing@100000",
+ "blue:finishing@100000",
+ )
+ taskFaker.assertNoMoreTasks()
+ assertThat(taskFaker.executeCallCount).isEqualTo(2)
+ }
+
+ @Test fun onlyOneCoordinatorWaitingToStartFutureTasks() {
+ // Enqueueing the red task starts a coordinator thread.
+ redQueue.execute("red task", 100.µs) {
+ log += "red:run@${taskFaker.nanoTime}"
+ }
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+
+ // Enqueueing the blue task doesn't need a 2nd coordinator yet.
+ blueQueue.execute("blue task", 200.µs) {
+ log += "blue:run@${taskFaker.nanoTime}"
+ }
+ assertThat(taskFaker.executeCallCount).isEqualTo(1)
+
+ // Nothing to do.
+ taskFaker.runTasks()
+ assertThat(log).isEmpty()
+
+ // At 100.µs, the coordinator runs the red task and starts a thread for the new coordinator.
+ taskFaker.advanceUntil(100.µs)
+ assertThat(log).containsExactly("red:run@100000")
+ assertThat(taskFaker.executeCallCount).isEqualTo(2)
+
+ // At 200.µs, the blue task runs.
+ taskFaker.advanceUntil(200.µs)
+ assertThat(log).containsExactly("red:run@100000", "blue:run@200000")
+ assertThat(taskFaker.executeCallCount).isEqualTo(2)
+
+ taskFaker.assertNoMoreTasks()
+ }
+
private val Int.µs: Long
get() = this * 1_000L
}