summaryrefslogtreecommitdiff
path: root/base/task_scheduler/scheduler_worker_pool_impl.h
blob: a641cb39a5ffddcbeaaa2808e37214bf565852c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
#define BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_

#include <stddef.h>

#include <memory>
#include <string>
#include <vector>

#include "base/base_export.h"
#include "base/containers/stack.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/strings/string_piece.h"
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/waitable_event.h"
#include "base/task_runner.h"
#include "base/task_scheduler/priority_queue.h"
#include "base/task_scheduler/scheduler_lock.h"
#include "base/task_scheduler/scheduler_worker.h"
#include "base/task_scheduler/scheduler_worker_pool.h"
#include "base/task_scheduler/scheduler_worker_stack.h"
#include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/task.h"
#include "base/task_scheduler/tracked_ref.h"
#include "base/time/time.h"
#include "build/build_config.h"

namespace base {

class HistogramBase;
class SchedulerWorkerObserver;
class SchedulerWorkerPoolParams;

namespace internal {

class DelayedTaskManager;
class TaskTracker;

// A pool of workers that run Tasks.
//
// The pool doesn't create threads until Start() is called. Tasks can be posted
// at any time but will not run until after Start() is called.
//
// This class is thread-safe.
class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
 public:
  enum class WorkerEnvironment {
    // No special worker environment required.
    NONE,
#if defined(OS_WIN)
    // Initialize a COM MTA on the worker.
    COM_MTA,
#endif  // defined(OS_WIN)
  };

  // Constructs a pool without workers.
  //
  // |histogram_label| is used to label the pool's histograms ("TaskScheduler."
  // + histogram_name + "." + |histogram_label| + extra suffixes), it must not
  // be empty. |pool_label| is used to label the pool's threads, it must not be
  // empty. |priority_hint| is the preferred thread priority; the actual thread
  // priority depends on shutdown state and platform capabilities.
  // |task_tracker| keeps track of tasks. |delayed_task_manager| handles tasks
  // posted with a delay.
  SchedulerWorkerPoolImpl(StringPiece histogram_label,
                          StringPiece pool_label,
                          ThreadPriority priority_hint,
                          TrackedRef<TaskTracker> task_tracker,
                          DelayedTaskManager* delayed_task_manager);

  // Creates workers following the |params| specification, allowing existing and
  // future tasks to run. The pool will run at most |max_background_tasks|
  // unblocked TaskPriority::BACKGROUND tasks concurrently. Uses
  // |service_thread_task_runner| to monitor for blocked threads in the pool. If
  // specified, |scheduler_worker_observer| will be notified when a worker
  // enters and exits its main function. It must not be destroyed before
  // JoinForTesting() has returned (must never be destroyed in production).
  // |worker_environment| specifies any requested environment to execute the
  // tasks. Can only be called once. CHECKs on failure.
  void Start(const SchedulerWorkerPoolParams& params,
             int max_background_tasks,
             scoped_refptr<TaskRunner> service_thread_task_runner,
             SchedulerWorkerObserver* scheduler_worker_observer,
             WorkerEnvironment worker_environment);

  // Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in
  // production; it is always leaked. In tests, it can only be destroyed after
  // JoinForTesting() has returned.
  ~SchedulerWorkerPoolImpl() override;

  // SchedulerWorkerPool:
  void JoinForTesting() override;

  const HistogramBase* num_tasks_before_detach_histogram() const {
    return num_tasks_before_detach_histogram_;
  }

  const HistogramBase* num_tasks_between_waits_histogram() const {
    return num_tasks_between_waits_histogram_;
  }

  void GetHistograms(std::vector<const HistogramBase*>* histograms) const;

  // Returns the maximum number of non-blocked tasks that can run concurrently
  // in this pool.
  //
  // TODO(fdoray): Remove this method. https://crbug.com/687264
  int GetMaxConcurrentNonBlockedTasksDeprecated() const;

  // Waits until at least |n| workers are idle. Note that while workers are
  // disallowed from cleaning up during this call: tests using a custom
  // |suggested_reclaim_time_| need to be careful to invoke this swiftly after
  // unblocking the waited upon workers as: if a worker is already detached by
  // the time this is invoked, it will never make it onto the idle stack and
  // this call will hang.
  void WaitForWorkersIdleForTesting(size_t n);

  // Waits until all workers are idle.
  void WaitForAllWorkersIdleForTesting();

  // Waits until |n| workers have cleaned up (since the last call to
  // WaitForWorkersCleanedUpForTesting() or Start() if it wasn't called yet).
  void WaitForWorkersCleanedUpForTesting(size_t n);

  // Returns the number of workers in this worker pool.
  size_t NumberOfWorkersForTesting() const;

  // Returns |max_tasks_|.
  size_t GetMaxTasksForTesting() const;

  // Returns the number of workers that are idle (i.e. not running tasks).
  size_t NumberOfIdleWorkersForTesting() const;

  // Sets the MayBlock waiting threshold to TimeDelta::Max().
  void MaximizeMayBlockThresholdForTesting();

 private:
  class SchedulerWorkerDelegateImpl;

  // Friend tests so that they can access |kBlockedWorkersPollPeriod| and
  // BlockedThreshold().
  friend class TaskSchedulerWorkerPoolBlockingTest;
  friend class TaskSchedulerWorkerPoolMayBlockTest;

  // The period between calls to AdjustMaxTasks() when the pool is at capacity.
  // This value was set unscientifically based on intuition and may be adjusted
  // in the future.
  static constexpr TimeDelta kBlockedWorkersPollPeriod =
      TimeDelta::FromMilliseconds(50);

  // SchedulerWorkerPool:
  void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;

  // Waits until at least |n| workers are idle. |lock_| must be held to call
  // this function.
  void WaitForWorkersIdleLockRequiredForTesting(size_t n);

  // Wakes up the last worker from this worker pool to go idle, if any.
  void WakeUpOneWorker();

  // Performs the same action as WakeUpOneWorker() except asserts |lock_| is
  // acquired rather than acquires it and returns true if worker wakeups are
  // permitted.
  bool WakeUpOneWorkerLockRequired();

  // Adds a worker, if needed, to maintain one idle worker, |max_tasks_|
  // permitting.
  void MaintainAtLeastOneIdleWorkerLockRequired();

  // Adds |worker| to |idle_workers_stack_|.
  void AddToIdleWorkersStackLockRequired(SchedulerWorker* worker);

  // Removes |worker| from |idle_workers_stack_|.
  void RemoveFromIdleWorkersStackLockRequired(SchedulerWorker* worker);

  // Returns true if worker cleanup is permitted.
  bool CanWorkerCleanupForTestingLockRequired();

  // Tries to add a new SchedulerWorker to the pool. Returns the new
  // SchedulerWorker on success, nullptr otherwise. Cannot be called before
  // Start(). Must be called under the protection of |lock_|.
  SchedulerWorker* CreateRegisterAndStartSchedulerWorkerLockRequired();

  // Returns the number of workers in the pool that should not run tasks due to
  // the pool being over capacity.
  size_t NumberOfExcessWorkersLockRequired() const;

  // Examines the list of SchedulerWorkers and increments |max_tasks_| for each
  // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for
  // more than BlockedThreshold().
  void AdjustMaxTasks();

  // Returns the threshold after which the max tasks is increased to compensate
  // for a worker that is within a MAY_BLOCK ScopedBlockingCall.
  TimeDelta MayBlockThreshold() const;

  // Starts calling AdjustMaxTasks() periodically on
  // |service_thread_task_runner_| if not already requested.
  void ScheduleAdjustMaxTasksIfNeeded();

  // Calls AdjustMaxTasks() and schedules it again as necessary. May only be
  // called from the service thread.
  void AdjustMaxTasksFunction();

  // Returns true if AdjustMaxTasks() should periodically be called on
  // |service_thread_task_runner_|.
  bool ShouldPeriodicallyAdjustMaxTasksLockRequired();

  // Increments/decrements the number of tasks that can run in this pool.
  // |is_running_background_task| indicates whether the worker causing the
  // change is currently running a TaskPriority::BACKGROUND task.
  void DecrementMaxTasksLockRequired(bool is_running_background_task);
  void IncrementMaxTasksLockRequired(bool is_running_background_task);

  const std::string pool_label_;
  const ThreadPriority priority_hint_;

  // PriorityQueue from which all threads of this worker pool get work.
  PriorityQueue shared_priority_queue_;

  // Suggested reclaim time for workers. Initialized by Start(). Never modified
  // afterwards (i.e. can be read without synchronization after Start()).
  TimeDelta suggested_reclaim_time_;

  SchedulerBackwardCompatibility backward_compatibility_;

  // Synchronizes accesses to |workers_|, |max_tasks_|, |max_background_tasks_|,
  // |num_running_background_tasks_|, |num_pending_may_block_workers_|,
  // |idle_workers_stack_|, |idle_workers_stack_cv_for_testing_|,
  // |num_wake_ups_before_start_|, |cleanup_timestamps_|, |polling_max_tasks_|,
  // |worker_cleanup_disallowed_for_testing_|,
  // |num_workers_cleaned_up_for_testing_|,
  // |SchedulerWorkerDelegateImpl::is_on_idle_workers_stack_|,
  // |SchedulerWorkerDelegateImpl::incremented_max_tasks_since_blocked_| and
  // |SchedulerWorkerDelegateImpl::may_block_start_time_|. Has
  // |shared_priority_queue_|'s lock as its predecessor so that a worker can be
  // pushed to |idle_workers_stack_| within the scope of a Transaction (more
  // details in GetWork()).
  mutable SchedulerLock lock_;

  // All workers owned by this worker pool.
  std::vector<scoped_refptr<SchedulerWorker>> workers_;

  // The maximum number of tasks that can run concurrently in this pool. Workers
  // can be added as needed up until there are |max_tasks_| workers.
  size_t max_tasks_ = 0;

  // Initial value of |max_tasks_| as set in Start().
  size_t initial_max_tasks_ = 0;

  // The maximum number of background tasks that can run concurrently in this
  // pool.
  int max_background_tasks_ = 0;

  // The number of background tasks that are currently running in this pool.
  int num_running_background_tasks_ = 0;

  // Number of workers that are within the scope of a MAY_BLOCK
  // ScopedBlockingCall but haven't caused a max task increase yet.
  int num_pending_may_block_workers_ = 0;

  // Number of workers that are running a TaskPriority::BACKGROUND task and are
  // within the scope of a MAY_BLOCK ScopedBlockingCall but haven't caused a max
  // task increase yet.
  int num_pending_background_may_block_workers_ = 0;

  // Environment to be initialized per worker.
  WorkerEnvironment worker_environment_ = WorkerEnvironment::NONE;

  // Stack of idle workers. Initially, all workers are on this stack. A worker
  // is removed from the stack before its WakeUp() function is called and when
  // it receives work from GetWork() (a worker calls GetWork() when its sleep
  // timeout expires, even if its WakeUp() method hasn't been called). A worker
  // is pushed on this stack when it receives nullptr from GetWork().
  SchedulerWorkerStack idle_workers_stack_;

  // Signaled when a worker is added to the idle workers stack.
  std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_;

  // Number of wake ups that occurred before Start(). Never modified after
  // Start() (i.e. can be read without synchronization after Start()).
  int num_wake_ups_before_start_ = 0;

  // Stack that contains the timestamps of when workers get cleaned up.
  // Timestamps get popped off the stack as new workers are added.
  base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_;

  // Whether we are currently polling for necessary adjustments to |max_tasks_|.
  bool polling_max_tasks_ = false;

  // Indicates to the delegates that workers are not permitted to cleanup.
  bool worker_cleanup_disallowed_for_testing_ = false;

  // Counts the number of workers cleaned up since the last call to
  // WaitForWorkersCleanedUpForTesting() (or Start() if it wasn't called yet).
  // |some_workers_cleaned_up_for_testing_| is true if this was ever
  // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
  // specific number of workers being cleaned up via
  // WaitForWorkersCleanedUpForTesting().
  size_t num_workers_cleaned_up_for_testing_ = 0;
#if DCHECK_IS_ON()
  bool some_workers_cleaned_up_for_testing_ = false;
#endif

  // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
  // incremented.
  std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_;

  // Used for testing and makes MayBlockThreshold() return the maximum
  // TimeDelta.
  AtomicFlag maximum_blocked_threshold_for_testing_;

#if DCHECK_IS_ON()
  // Set at the start of JoinForTesting().
  AtomicFlag join_for_testing_started_;
#endif

  // TaskScheduler.DetachDuration.[worker pool name] histogram. Intentionally
  // leaked.
  HistogramBase* const detach_duration_histogram_;

  // TaskScheduler.NumTasksBeforeDetach.[worker pool name] histogram.
  // Intentionally leaked.
  HistogramBase* const num_tasks_before_detach_histogram_;

  // TaskScheduler.NumTasksBetweenWaits.[worker pool name] histogram.
  // Intentionally leaked.
  HistogramBase* const num_tasks_between_waits_histogram_;

  scoped_refptr<TaskRunner> service_thread_task_runner_;

  // Optional observer notified when a worker enters and exits its main
  // function. Set in Start() and never modified afterwards.
  SchedulerWorkerObserver* scheduler_worker_observer_ = nullptr;

  // Ensures recently cleaned up workers (ref.
  // SchedulerWorkerDelegateImpl::CleanupLockRequired()) had time to exit as
  // they have a raw reference to |this| (and to TaskTracker) which can
  // otherwise result in racy use-after-frees per no longer being part of
  // |workers_| and hence not being explicitly joined in JoinForTesting() :
  // https://crbug.com/810464.
  TrackedRefFactory<SchedulerWorkerPoolImpl> tracked_ref_factory_;

  DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolImpl);
};

}  // namespace internal
}  // namespace base

#endif  // BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_