aboutsummaryrefslogtreecommitdiff
path: root/platform/impl/task_runner.cc
blob: 35b9a5e2972f79fd0c8d7f5efd3f7173bed795f3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "platform/impl/task_runner.h"

#include <csignal>
#include <thread>

#include "util/osp_logging.h"

namespace openscreen {

namespace {

// This is mutated by the signal handler installed by RunUntilSignaled(), and is
// checked by RunUntilStopped().
//
// Per the C++14 spec, passing visible changes to memory between a signal
// handler and a program thread must be done through a volatile variable.
volatile enum {
  kNotRunning,
  kNotSignaled,
  kSignaled
} g_signal_state = kNotRunning;

void OnReceivedSignal(int signal) {
  g_signal_state = kSignaled;
}

}  // namespace

TaskRunnerImpl::TaskRunnerImpl(ClockNowFunctionPtr now_function,
                               TaskWaiter* event_waiter,
                               Clock::duration waiter_timeout)
    : now_function_(now_function),
      is_running_(false),
      task_waiter_(event_waiter),
      waiter_timeout_(waiter_timeout) {}

TaskRunnerImpl::~TaskRunnerImpl() {
  // Ensure no thread is currently executing inside RunUntilStopped().
  OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
}

void TaskRunnerImpl::PostPackagedTask(Task task) {
  std::lock_guard<std::mutex> lock(task_mutex_);
  tasks_.emplace_back(std::move(task));
  if (task_waiter_) {
    task_waiter_->OnTaskPosted();
  } else {
    run_loop_wakeup_.notify_one();
  }
}

void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
                                               Clock::duration delay) {
  std::lock_guard<std::mutex> lock(task_mutex_);
  if (delay <= Clock::duration::zero()) {
    tasks_.emplace_back(std::move(task));
  } else {
    delayed_tasks_.emplace(
        std::make_pair(now_function_() + delay, std::move(task)));
  }
  if (task_waiter_) {
    task_waiter_->OnTaskPosted();
  } else {
    run_loop_wakeup_.notify_one();
  }
}

bool TaskRunnerImpl::IsRunningOnTaskRunner() {
  return task_runner_thread_id_ == std::this_thread::get_id();
}

void TaskRunnerImpl::RunUntilStopped() {
  OSP_DCHECK(!is_running_);
  task_runner_thread_id_ = std::this_thread::get_id();
  is_running_ = true;

  OSP_DVLOG << "Running tasks until stopped...";
  // Main loop: Run until the |is_running_| flag is set back to false by the
  // "quit task" posted by RequestStopSoon(), or the process received a
  // termination signal.
  while (is_running_) {
    ScheduleDelayedTasks();
    if (GrabMoreRunnableTasks()) {
      RunRunnableTasks();
    }
    if (g_signal_state == kSignaled) {
      is_running_ = false;
    }
  }

  OSP_DVLOG << "Finished running, entering flushing phase...";
  // Flushing phase: Ensure all immediately-runnable tasks are run before
  // returning. Since running some tasks might cause more immediately-runnable
  // tasks to be posted, loop until there is no more work.
  //
  // If there is bad code that posts tasks indefinitely, this loop will never
  // break. However, that also means there is a code path spinning a CPU core at
  // 100% all the time. Rather than mitigate this problem scenario, purposely
  // let it manifest here in the hopes that unit testing will reveal it (e.g., a
  // unit test that never finishes running).
  while (GrabMoreRunnableTasks()) {
    RunRunnableTasks();
  }
  OSP_DVLOG << "Finished flushing...";
  task_runner_thread_id_ = std::thread::id();
}

void TaskRunnerImpl::RunUntilSignaled() {
  OSP_CHECK_EQ(g_signal_state, kNotRunning)
      << __func__ << " may not be invoked concurrently.";
  g_signal_state = kNotSignaled;
  const auto old_sigint_handler = std::signal(SIGINT, &OnReceivedSignal);
  const auto old_sigterm_handler = std::signal(SIGTERM, &OnReceivedSignal);

  RunUntilStopped();

  std::signal(SIGINT, old_sigint_handler);
  std::signal(SIGTERM, old_sigterm_handler);
  OSP_DVLOG << "Received SIGNIT or SIGTERM, setting state to not running...";
  g_signal_state = kNotRunning;
}

void TaskRunnerImpl::RequestStopSoon() {
  PostTask([this]() { is_running_ = false; });
}

void TaskRunnerImpl::RunRunnableTasks() {
  for (TaskWithMetadata& running_task : running_tasks_) {
    // Move the task to the stack so that its bound state is freed immediately
    // after being run.
    TaskWithMetadata task = std::move(running_task);
    task();
  }
  running_tasks_.clear();
}

void TaskRunnerImpl::ScheduleDelayedTasks() {
  std::lock_guard<std::mutex> lock(task_mutex_);

  // Getting the time can be expensive on some platforms, so only get it once.
  const auto current_time = now_function_();
  const auto end_of_range = delayed_tasks_.upper_bound(current_time);
  for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
    tasks_.push_back(std::move(it->second));
  }
  delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
}

bool TaskRunnerImpl::GrabMoreRunnableTasks() {
  OSP_DCHECK(running_tasks_.empty());

  std::unique_lock<std::mutex> lock(task_mutex_);
  if (!tasks_.empty()) {
    running_tasks_.swap(tasks_);
    return true;
  }

  if (!is_running_) {
    return false;  // Stop was requested. Don't wait for more tasks.
  }

  if (task_waiter_) {
    Clock::duration timeout = waiter_timeout_;
    if (!delayed_tasks_.empty()) {
      Clock::duration next_task_delta =
          delayed_tasks_.begin()->first - now_function_();
      if (next_task_delta < timeout) {
        timeout = next_task_delta;
      }
    }
    lock.unlock();
    task_waiter_->WaitForTaskToBePosted(timeout);
    return false;
  }

  if (delayed_tasks_.empty()) {
    run_loop_wakeup_.wait(lock);
  } else {
    run_loop_wakeup_.wait_for(lock,
                              delayed_tasks_.begin()->first - now_function_());
  }
  return false;
}

}  // namespace openscreen