summaryrefslogtreecommitdiff
path: root/base/threading/sequenced_worker_pool.h
blob: 8cdeb0b5dbcc7aa799f6a52335096db156e0f02c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
#define BASE_THREADING_SEQUENCED_WORKER_POOL_H_

#include <stddef.h>

#include <cstddef>
#include <memory>
#include <string>

#include "base/base_export.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/task_runner.h"
#include "base/task_scheduler/task_traits.h"

namespace tracked_objects {
class Location;
}  // namespace tracked_objects

namespace base {

class SequencedTaskRunner;

template <class T> class DeleteHelper;

// A worker thread pool that enforces ordering between sets of tasks. It also
// allows you to specify what should happen to your tasks on shutdown.
//
// To enforce ordering, get a unique sequence token from the pool and post all
// tasks you want to order with the token. All tasks with the same token are
// guaranteed to execute serially, though not necessarily on the same thread.
// This means that:
//
//   - No two tasks with the same token will run at the same time.
//
//   - Given two tasks T1 and T2 with the same token such that T2 will
//     run after T1, then T2 will start after T1 is destroyed.
//
//   - If T2 will run after T1, then all memory changes in T1 and T1's
//     destruction will be visible to T2.
//
// Example:
//   SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
//   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
//                                FROM_HERE, base::Bind(...));
//   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
//                                FROM_HERE, base::Bind(...));
//
// You can make named sequence tokens to make it easier to share a token
// across different components.
//
// You can also post tasks to the pool without ordering using PostWorkerTask.
// These will be executed in an unspecified order. The order of execution
// between tasks with different sequence tokens is also unspecified.
//
// You must call EnableForProcess() or
// EnableWithRedirectionToTaskSchedulerForProcess() before starting to post
// tasks to a process' SequencedWorkerPools.
//
// This class may be leaked on shutdown to facilitate fast shutdown. The
// expected usage, however, is to call Shutdown(), which correctly accounts
// for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
// behavior.
//
// Implementation note: This does not use a base::WorkerPool since that does
// not enforce shutdown semantics or allow us to specify how many worker
// threads to run. For the typical use case of random background work, we don't
// necessarily want to be super aggressive about creating threads.
//
// Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
// from TaskRunner).
//
// Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid
// memory leaks. See http://crbug.com/273800
class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
 public:
  // Defines what should happen to a task posted to the worker pool on
  // shutdown.
  enum WorkerShutdown {
    // Tasks posted with this mode which have not run at shutdown will be
    // deleted rather than run, and any tasks with this mode running at
    // shutdown will be ignored (the worker thread will not be joined).
    //
    // This option provides a nice way to post stuff you don't want blocking
    // shutdown. For example, you might be doing a slow DNS lookup and if it's
    // blocked on the OS, you may not want to stop shutdown, since the result
    // doesn't really matter at that point.
    //
    // However, you need to be very careful what you do in your callback when
    // you use this option. Since the thread will continue to run until the OS
    // terminates the process, the app can be in the process of tearing down
    // when you're running. This means any singletons or global objects you
    // use may suddenly become invalid out from under you. For this reason,
    // it's best to use this only for slow but simple operations like the DNS
    // example.
    CONTINUE_ON_SHUTDOWN,

    // Tasks posted with this mode that have not started executing at
    // shutdown will be deleted rather than executed. However, any tasks that
    // have already begun executing when shutdown is called will be allowed
    // to continue, and will block shutdown until completion.
    //
    // Note: Because Shutdown() may block while these tasks are executing,
    // care must be taken to ensure that they do not block on the thread that
    // called Shutdown(), as this may lead to deadlock.
    SKIP_ON_SHUTDOWN,

    // Tasks posted with this mode will block shutdown until they're
    // executed. Since this can have significant performance implications,
    // use sparingly.
    //
    // Generally, this should be used only for user data, for example, a task
    // writing a preference file.
    //
    // If a task is posted during shutdown, it will not get run since the
    // workers may already be stopped. In this case, the post operation will
    // fail (return false) and the task will be deleted.
    BLOCK_SHUTDOWN,
  };

  // Opaque identifier that defines sequencing of tasks posted to the worker
  // pool.
  class BASE_EXPORT SequenceToken {
   public:
    SequenceToken() : id_(0) {}
    ~SequenceToken() {}

    bool Equals(const SequenceToken& other) const {
      return id_ == other.id_;
    }

    // Returns false if current thread is executing an unsequenced task.
    bool IsValid() const {
      return id_ != 0;
    }

    // Returns a string representation of this token. This method should only be
    // used for debugging.
    std::string ToString() const;

   private:
    friend class SequencedWorkerPool;

    explicit SequenceToken(int id) : id_(id) {}

    int id_;
  };

  // Allows tests to perform certain actions.
  class TestingObserver {
   public:
    virtual ~TestingObserver() {}
    virtual void OnHasWork() = 0;
    virtual void WillWaitForShutdown() = 0;
    virtual void OnDestruct() = 0;
  };

  // Gets the SequencedToken of the current thread.
  // If current thread is not a SequencedWorkerPool worker thread or is running
  // an unsequenced task, returns an invalid SequenceToken.
  static SequenceToken GetSequenceTokenForCurrentThread();

  // Returns the SequencedWorkerPool that owns this thread, or null if the
  // current thread is not a SequencedWorkerPool worker thread.
  //
  // Always returns nullptr when SequencedWorkerPool is redirected to
  // TaskScheduler.
  //
  // DEPRECATED. Use SequencedTaskRunnerHandle::Get() instead. Consequentially
  // the only remaining use case is in sequenced_task_runner_handle.cc to
  // implement that and will soon be removed along with SequencedWorkerPool:
  // http://crbug.com/622400.
  static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();

  // Returns a unique token that can be used to sequence tasks posted to
  // PostSequencedWorkerTask(). Valid tokens are always nonzero.
  static SequenceToken GetSequenceToken();

  // Enables posting tasks to this process' SequencedWorkerPools. Cannot be
  // called if already enabled. This is not thread-safe; proper synchronization
  // is required to use any SequencedWorkerPool method after calling this.
  static void EnableForProcess();

  // Same as EnableForProcess(), but tasks are redirected to the registered
  // TaskScheduler. All redirections' TaskPriority will be capped to
  // |max_task_priority|. There must be a registered TaskScheduler when this is
  // called.
  // TODO(gab): Remove this if http://crbug.com/622400 fails
  // (SequencedWorkerPool will be phased out completely otherwise).
  static void EnableWithRedirectionToTaskSchedulerForProcess(
      TaskPriority max_task_priority = TaskPriority::HIGHEST);

  // Disables posting tasks to this process' SequencedWorkerPools. Calling this
  // while there are active SequencedWorkerPools is not supported. This is not
  // thread-safe; proper synchronization is required to use any
  // SequencedWorkerPool method after calling this.
  static void DisableForProcessForTesting();

  // Returns true if posting tasks to this process' SequencedWorkerPool is
  // enabled (with or without redirection to TaskScheduler).
  static bool IsEnabled();

  // When constructing a SequencedWorkerPool, there must be a
  // ThreadTaskRunnerHandle on the current thread unless you plan to
  // deliberately leak it.

  // Constructs a SequencedWorkerPool which will lazily create up to
  // |max_threads| and a prefix for the thread name to aid in debugging.
  // |max_threads| must be greater than 1. |task_priority| will be used to hint
  // base::TaskScheduler for an experiment in which all SequencedWorkerPool
  // tasks will be redirected to it in processes where a base::TaskScheduler was
  // instantiated.
  SequencedWorkerPool(size_t max_threads,
                      const std::string& thread_name_prefix,
                      base::TaskPriority task_priority);

  // Like above, but with |observer| for testing.  Does not take ownership of
  // |observer|.
  SequencedWorkerPool(size_t max_threads,
                      const std::string& thread_name_prefix,
                      base::TaskPriority task_priority,
                      TestingObserver* observer);

  // Returns the sequence token associated with the given name. Calling this
  // function multiple times with the same string will always produce the
  // same sequence token. If the name has not been used before, a new token
  // will be created.
  SequenceToken GetNamedSequenceToken(const std::string& name);

  // Returns a SequencedTaskRunner wrapper which posts to this
  // SequencedWorkerPool using the given sequence token. Tasks with nonzero
  // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
  // are posted with BLOCK_SHUTDOWN behavior.
  scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
      SequenceToken token) WARN_UNUSED_RESULT;

  // Returns a SequencedTaskRunner wrapper which posts to this
  // SequencedWorkerPool using the given sequence token. Tasks with nonzero
  // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
  // are posted with the given shutdown behavior.
  scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
      SequenceToken token,
      WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT;

  // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
  // the given shutdown behavior. Tasks with nonzero delay are posted with
  // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
  // given shutdown behavior.
  scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
      WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT;

  // Posts the given task for execution in the worker pool. Tasks posted with
  // this function will execute in an unspecified order on a background thread.
  // Returns true if the task was posted. If your tasks have ordering
  // requirements, see PostSequencedWorkerTask().
  //
  // This class will attempt to delete tasks that aren't run
  // (non-block-shutdown semantics) but can't guarantee that this happens. If
  // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
  // will be no workers available to delete these tasks. And there may be
  // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
  // tasks. Deleting those tasks before the previous one has completed could
  // cause nondeterministic crashes because the task could be keeping some
  // objects alive which do work in their destructor, which could voilate the
  // assumptions of the running task.
  //
  // The task will be guaranteed to run to completion before shutdown
  // (BLOCK_SHUTDOWN semantics).
  //
  // Returns true if the task was posted successfully. This may fail during
  // shutdown regardless of the specified ShutdownBehavior.
  bool PostWorkerTask(const tracked_objects::Location& from_here, Closure task);

  // Same as PostWorkerTask but allows a delay to be specified (although doing
  // so changes the shutdown behavior). The task will be run after the given
  // delay has elapsed.
  //
  // If the delay is nonzero, the task won't be guaranteed to run to completion
  // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
  // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
  // task will be guaranteed to run to completion before shutdown
  // (BLOCK_SHUTDOWN semantics).
  bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
                             Closure task,
                             TimeDelta delay);

  // Same as PostWorkerTask but allows specification of the shutdown behavior.
  bool PostWorkerTaskWithShutdownBehavior(
      const tracked_objects::Location& from_here,
      Closure task,
      WorkerShutdown shutdown_behavior);

  // Like PostWorkerTask above, but provides sequencing semantics. This means
  // that tasks posted with the same sequence token (see GetSequenceToken())
  // are guaranteed to execute in order. This is useful in cases where you're
  // doing operations that may depend on previous ones, like appending to a
  // file.
  //
  // The task will be guaranteed to run to completion before shutdown
  // (BLOCK_SHUTDOWN semantics).
  //
  // Returns true if the task was posted successfully. This may fail during
  // shutdown regardless of the specified ShutdownBehavior.
  bool PostSequencedWorkerTask(SequenceToken sequence_token,
                               const tracked_objects::Location& from_here,
                               Closure task);

  // Like PostSequencedWorkerTask above, but allows you to specify a named
  // token, which saves an extra call to GetNamedSequenceToken.
  bool PostNamedSequencedWorkerTask(const std::string& token_name,
                                    const tracked_objects::Location& from_here,
                                    Closure task);

  // Same as PostSequencedWorkerTask but allows a delay to be specified
  // (although doing so changes the shutdown behavior). The task will be run
  // after the given delay has elapsed.
  //
  // If the delay is nonzero, the task won't be guaranteed to run to completion
  // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
  // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
  // i.e. the task will be guaranteed to run to completion before shutdown
  // (BLOCK_SHUTDOWN semantics).
  bool PostDelayedSequencedWorkerTask(
      SequenceToken sequence_token,
      const tracked_objects::Location& from_here,
      Closure task,
      TimeDelta delay);

  // Same as PostSequencedWorkerTask but allows specification of the shutdown
  // behavior.
  bool PostSequencedWorkerTaskWithShutdownBehavior(
      SequenceToken sequence_token,
      const tracked_objects::Location& from_here,
      Closure task,
      WorkerShutdown shutdown_behavior);

  // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
  bool PostDelayedTask(const tracked_objects::Location& from_here,
                       Closure task,
                       TimeDelta delay) override;
  bool RunsTasksOnCurrentThread() const override;

  // Blocks until all pending tasks are complete. This should only be called in
  // unit tests when you want to validate something that should have happened.
  // Does not wait for delayed tasks. If redirection to TaskScheduler is
  // disabled, delayed tasks are deleted. If redirection to TaskScheduler is
  // enabled, this will wait for all tasks posted to TaskScheduler (not just
  // tasks posted to this SequencedWorkerPool).
  //
  // Note that calling this will not prevent other threads from posting work to
  // the queue while the calling thread is waiting on Flush(). In this case,
  // Flush will return only when there's no more work in the queue. Normally,
  // this doesn't come up since in a test, all the work is being posted from
  // the main thread.
  //
  // TODO(gab): Remove mentions of TaskScheduler in this comment if
  // http://crbug.com/622400 fails.
  void FlushForTesting();

  // Spuriously signal that there is work to be done.
  void SignalHasWorkForTesting();

  // Implements the worker pool shutdown. This should be called during app
  // shutdown, and will discard/join with appropriate tasks before returning.
  // After this call, subsequent calls to post tasks will fail.
  //
  // Must be called from the same thread this object was constructed on.
  void Shutdown() { Shutdown(0); }

  // A variant that allows an arbitrary number of new blocking tasks to be
  // posted during shutdown. The tasks cannot be posted within the execution
  // context of tasks whose shutdown behavior is not BLOCKING_SHUTDOWN. Once
  // the limit is reached, subsequent calls to post task fail in all cases.
  // Must be called from the same thread this object was constructed on.
  void Shutdown(int max_new_blocking_tasks_after_shutdown);

  // Check if Shutdown was called for given threading pool. This method is used
  // for aborting time consuming operation to avoid blocking shutdown.
  //
  // Can be called from any thread.
  bool IsShutdownInProgress();

 protected:
  ~SequencedWorkerPool() override;

  void OnDestruct() const override;

 private:
  friend class RefCountedThreadSafe<SequencedWorkerPool>;
  friend class DeleteHelper<SequencedWorkerPool>;

  class Inner;
  class PoolSequencedTaskRunner;
  class Worker;

  // Returns true if the current thread is processing a task with the given
  // sequence_token.
  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;

  const scoped_refptr<SequencedTaskRunner> constructor_task_runner_;

  // Avoid pulling in too many headers by putting (almost) everything
  // into |inner_|.
  const std::unique_ptr<Inner> inner_;

  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
};

}  // namespace base

#endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_