summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Deymo <deymo@chromium.org>2015-08-12 10:59:36 -0700
committerBertrand SIMONNET <bsimonnet@google.com>2015-08-14 16:07:09 -0700
commit07c1779e51680364060f3ec289249869ac7bc5ca (patch)
treecb74bcdb41c3b547b4f78f0d055e52b588e0b99b
parentcc76bff0d0a26687b9117f748bb8473761cfdfe6 (diff)
downloadlibchromeos-07c1779e51680364060f3ec289249869ac7bc5ca.tar.gz
libchromeos: Prevent starvation in BaseMessageLoop.
(cherry-picked from https://chromium.googlesource.com/a/chromiumos/platform2 at 3f609aba265a91634b5af226ceac9783beac6b36) When more than once source of events (file descriptor available or delayed task ready) is available, the MessageLoop has some freedom to schedule between I/O tasks and delayed tasks. The base::MessageLoopForIO will not dispatch posted delayed tasks if there are file descriptors indefinitely available and will prefer to allways run all the I/O callbacks before running the delayed tasks that are due, causing what is known as starvation. In the Chrome context, this is not a big problem since the thread running the base::MessageLoopForIO normally posts delayed tasks to other threads. Even with a single CPU, it is sufficient to have a fair thread scheduler to avoid starvation. In the Chrome OS context of single threaded daemons doing I/O, this is more important. This patch tightens the contract on chromeos::MessageLoop API to prevent starvation. The chromeos::GlibMessageLoop implementation already ensures this property, so we fix the chromeos::BaseMessageLoop by posting a task to the message loop when a file descriptor is ready and stop watching that file descriptor while the task is waiting in the message loop. New unittest are included here to verify the same behaviour over both implementations. BUG=chromium:419827 TEST=Added unittest. Change-Id: Ibc0aa999b9e86e6e3205d71ece3a7b72e019b6a3 Reviewed-on: https://chromium-review.googlesource.com/293334 Reviewed-by: Alex Vakulenko <avakulenko@chromium.org> Trybot-Ready: Alex Deymo <deymo@chromium.org> Tested-by: Alex Deymo <deymo@chromium.org> Commit-Queue: Alex Deymo <deymo@chromium.org>
-rw-r--r--chromeos/message_loops/base_message_loop.cc123
-rw-r--r--chromeos/message_loops/base_message_loop.h34
-rw-r--r--chromeos/message_loops/message_loop_unittest.cc97
3 files changed, 226 insertions, 28 deletions
diff --git a/chromeos/message_loops/base_message_loop.cc b/chromeos/message_loops/base_message_loop.cc
index dbca1ef..4923138 100644
--- a/chromeos/message_loops/base_message_loop.cc
+++ b/chromeos/message_loops/base_message_loop.cc
@@ -25,7 +25,7 @@ BaseMessageLoop::~BaseMessageLoop() {
DVLOG_LOC(io_task.second.location(), 1)
<< "Removing file descriptor watcher task_id " << io_task.first
<< " leaked on BaseMessageLoop, scheduled from this location.";
- io_task.second.fd_watcher()->StopWatchingFileDescriptor();
+ io_task.second.StopWatching();
}
// Note all pending canceled delayed tasks when destroying the message loop.
@@ -91,14 +91,11 @@ MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
auto it_bool = io_tasks_.emplace(
std::piecewise_construct,
std::forward_as_tuple(task_id),
- std::forward_as_tuple(from_here, this, task_id, persistent, task));
+ std::forward_as_tuple(
+ from_here, this, task_id, fd, base_mode, persistent, task));
// This should always insert a new element.
DCHECK(it_bool.second);
- IOTask* new_io_task = &it_bool.first->second;
-
- bool scheduled = base_loop_->WatchFileDescriptor(
- fd, persistent, base_mode, new_io_task->fd_watcher(), new_io_task);
-
+ bool scheduled = it_bool.first->second.StartWatching();
DVLOG_LOC(from_here, 1)
<< "Watching fd " << fd << " for "
<< (mode == MessageLoop::kWatchRead ? "reading" : "writing")
@@ -122,13 +119,7 @@ bool BaseMessageLoop::CancelTask(TaskId task_id) {
auto io_task_it = io_tasks_.find(task_id);
if (io_task_it == io_tasks_.end())
return false;
-
- DVLOG_LOC(io_task_it->second.location(), 1)
- << "Removing task_id " << task_id << " scheduled from this location.";
- // Destroying the FileDescriptorWatcher implicitly stops watching the file
- // descriptor.
- io_tasks_.erase(io_task_it);
- return true;
+ return io_task_it->second.CancelTask();
}
// A DelayedTask was found for this task_id at this point.
@@ -219,35 +210,98 @@ void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
delayed_tasks_.erase(task_it);
}
+void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
+ auto task_it = io_tasks_.find(task_id);
+ // Even if this task was canceled while we were waiting in the message loop
+ // for this method to run, the entry in io_tasks_ should still be present, but
+ // won't do anything.
+ DCHECK(task_it != io_tasks_.end());
+ task_it->second.OnFileReadyPostedTask();
+}
+
BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
BaseMessageLoop* loop,
MessageLoop::TaskId task_id,
+ int fd,
+ base::MessageLoopForIO::Mode base_mode,
bool persistent,
const Closure& task)
: location_(location), loop_(loop), task_id_(task_id),
- persistent_(persistent), closure_(task) {}
+ fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
-void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int fd) {
- OnFileReady(fd);
+bool BaseMessageLoop::IOTask::StartWatching() {
+ return loop_->base_loop_->WatchFileDescriptor(
+ fd_, persistent_, base_mode_, &fd_watcher_, this);
}
-void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int fd) {
- OnFileReady(fd);
+void BaseMessageLoop::IOTask::StopWatching() {
+ // This is safe to call even if we are not watching for it.
+ fd_watcher_.StopWatchingFileDescriptor();
+}
+
+void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
+ OnFileReady();
+}
+
+void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
+ OnFileReady();
+}
+
+void BaseMessageLoop::IOTask::OnFileReady() {
+ // When the file descriptor becomes available we stop watching for it and
+ // schedule a task to run the callback from the main loop. The callback will
+ // run using the same scheduler use to run other delayed tasks, avoiding
+ // starvation of the available posted tasks if there are file descriptors
+ // always available. The new posted task will use the same TaskId as the
+ // current file descriptor watching task an could be canceled in either state,
+ // when waiting for the file descriptor or waiting in the main loop.
+ StopWatching();
+ bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
+ location_,
+ base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
+ loop_->weak_ptr_factory_.GetWeakPtr(),
+ task_id_));
+ posted_task_pending_ = true;
+ if (base_scheduled) {
+ DVLOG_LOC(location_, 1)
+ << "Dispatching task_id " << task_id_ << " for "
+ << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
+ "reading" : "writing")
+ << " file descriptor " << fd_ << ", scheduled from this location.";
+ } else {
+ // In the rare case that PostTask() fails, we fall back to run it directly.
+ // This would indicate a bigger problem with the message loop setup.
+ LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
+ OnFileReadyPostedTask();
+ }
}
-void BaseMessageLoop::IOTask::OnFileReady(int fd) {
+void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
// We can't access |this| after running the |closure_| since it could call
// CancelTask on its own task_id, so we copy the members we need now.
BaseMessageLoop* loop_ptr = loop_;
+ DCHECK(posted_task_pending_ = true);
+ posted_task_pending_ = false;
+
+ // If this task was already canceled, the closure will be null and there is
+ // nothing else to do here. This execution doesn't count a step for RunOnce()
+ // unless we have a callback to run.
+ if (closure_.is_null()) {
+ loop_->io_tasks_.erase(task_id_);
+ return;
+ }
DVLOG_LOC(location_, 1)
- << "Running task_id " << task_id_
- << " for watching file descriptor " << fd
- << ", scheduled from this location.";
+ << "Running task_id " << task_id_ << " for "
+ << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
+ "reading" : "writing")
+ << " file descriptor " << fd_ << ", scheduled from this location.";
if (persistent_) {
// In the persistent case we just run the callback. If this callback cancels
- // the task id, we can't access |this| anymore.
+ // the task id, we can't access |this| anymore, so we re-start watching the
+ // file descriptor before running the callback.
+ StartWatching();
closure_.Run();
} else {
// This will destroy |this|, the fd_watcher and therefore stop watching this
@@ -264,4 +318,25 @@ void BaseMessageLoop::IOTask::OnFileReady(int fd) {
}
}
+bool BaseMessageLoop::IOTask::CancelTask() {
+ if (closure_.is_null())
+ return false;
+
+ DVLOG_LOC(location_, 1)
+ << "Removing task_id " << task_id_ << " scheduled from this location.";
+
+ if (!posted_task_pending_) {
+ // Destroying the FileDescriptorWatcher implicitly stops watching the file
+ // descriptor. This will delete our instance.
+ loop_->io_tasks_.erase(task_id_);
+ return true;
+ }
+ // The IOTask is waiting for the message loop to run its delayed task, so
+ // it is not watching for the file descriptor. We release the closure
+ // resources now but keep the IOTask instance alive while we wait for the
+ // callback to run and delete the IOTask.
+ closure_ = Closure();
+ return true;
+}
+
} // namespace chromeos
diff --git a/chromeos/message_loops/base_message_loop.h b/chromeos/message_loops/base_message_loop.h
index 2014c71..902b828 100644
--- a/chromeos/message_loops/base_message_loop.h
+++ b/chromeos/message_loops/base_message_loop.h
@@ -55,6 +55,12 @@ class CHROMEOS_EXPORT BaseMessageLoop : public MessageLoop {
// scheduled with Post*Task() of id |task_id|, even if it was canceled.
void OnRanPostedTask(MessageLoop::TaskId task_id);
+ // Called from the message loop when the IOTask should run the scheduled
+ // callback. This is a simple wrapper of IOTask::OnFileReadyPostedTask()
+ // posted from the BaseMessageLoop so it is deleted when the BaseMessageLoop
+ // goes out of scope since we can't cancel the callback otherwise.
+ void OnFileReadyPostedTask(MessageLoop::TaskId task_id);
+
// Return a new unused task_id.
TaskId NextTaskId();
@@ -72,30 +78,50 @@ class CHROMEOS_EXPORT BaseMessageLoop : public MessageLoop {
IOTask(const tracked_objects::Location& location,
BaseMessageLoop* loop,
MessageLoop::TaskId task_id,
+ int fd,
+ base::MessageLoopForIO::Mode base_mode,
bool persistent,
const base::Closure& task);
const tracked_objects::Location& location() const { return location_; }
- base::MessageLoopForIO::FileDescriptorWatcher* fd_watcher() {
- return &fd_watcher_;
- }
+
+ // Used to start/stop watching the file descriptor while keeping the
+ // IOTask entry available.
+ bool StartWatching();
+ void StopWatching();
+
+ // Called from the message loop as a PostTask() when the file descriptor is
+ // available, scheduled to run from OnFileReady().
+ void OnFileReadyPostedTask();
+
+ // Cancel the IOTask and returns whether it was actually canceled, with the
+ // same semantics as MessageLoop::CancelTask().
+ bool CancelTask();
private:
tracked_objects::Location location_;
BaseMessageLoop* loop_;
+ // These are the arguments passed in the constructor, basically forwarding
+ // all the arguments passed to WatchFileDescriptor() plus the assigned
+ // TaskId for this task.
MessageLoop::TaskId task_id_;
+ int fd_;
+ base::MessageLoopForIO::Mode base_mode_;
bool persistent_;
base::Closure closure_;
base::MessageLoopForIO::FileDescriptorWatcher fd_watcher_;
+ // Tells whether there is a pending call to OnFileReadPostedTask().
+ bool posted_task_pending_{false};
+
// base::MessageLoopForIO::Watcher overrides:
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;
// Common implementation for both the read and write case.
- void OnFileReady(int fd);
+ void OnFileReady();
DISALLOW_COPY_AND_ASSIGN(IOTask);
};
diff --git a/chromeos/message_loops/message_loop_unittest.cc b/chromeos/message_loops/message_loop_unittest.cc
index 30f400f..73b1f96 100644
--- a/chromeos/message_loops/message_loop_unittest.cc
+++ b/chromeos/message_loops/message_loop_unittest.cc
@@ -15,6 +15,7 @@
#include <unistd.h>
#include <memory>
+#include <vector>
#include <base/bind.h>
#include <base/location.h>
@@ -34,6 +35,9 @@ namespace {
// file descriptors when testing watching for a file descriptor.
class ScopedPipe {
public:
+ // The internal pipe size.
+ static const int kPipeSize;
+
ScopedPipe() {
int fds[2];
if (pipe(fds) != 0) {
@@ -41,6 +45,7 @@ class ScopedPipe {
}
reader = fds[0];
writer = fds[1];
+ EXPECT_EQ(kPipeSize, fcntl(writer, F_SETPIPE_SZ, kPipeSize));
}
~ScopedPipe() {
if (reader != -1)
@@ -54,6 +59,8 @@ class ScopedPipe {
int writer{-1};
};
+const int ScopedPipe::kPipeSize = 4096;
+
class ScopedSocketPair {
public:
ScopedSocketPair() {
@@ -322,4 +329,94 @@ TYPED_TEST(MessageLoopTest, DeletePersistenIOTaskFromSelf) {
EXPECT_EQ(MessageLoop::kTaskIdNull, task_id);
}
+// Test that we can cancel several persistent file descriptor watching callbacks
+// from a scheduled callback. In the BaseMessageLoop implementation, this code
+// will cause us to cancel an IOTask that has a pending delayed task, but
+// otherwise is a valid test case on all implementations.
+TYPED_TEST(MessageLoopTest, DeleteAllPersistenIOTaskFromSelf) {
+ const int kNumTasks = 5;
+ ScopedPipe pipes[kNumTasks];
+ TaskId task_ids[kNumTasks];
+
+ for (int i = 0; i < kNumTasks; ++i) {
+ task_ids[i] = this->loop_->WatchFileDescriptor(
+ FROM_HERE, pipes[i].writer, MessageLoop::kWatchWrite,
+ true /* persistent */,
+ Bind([this, kNumTasks, &task_ids] {
+ for (int j = 0; j < kNumTasks; ++j) {
+ // Once we cancel all the tasks, none should run, so this code runs
+ // only once from one callback.
+ EXPECT_TRUE(this->loop_->CancelTask(task_ids[j]));
+ task_ids[j] = MessageLoop::kTaskIdNull;
+ }
+ }));
+ }
+ MessageLoopRunMaxIterations(this->loop_.get(), 100);
+ for (int i = 0; i < kNumTasks; ++i) {
+ EXPECT_EQ(MessageLoop::kTaskIdNull, task_ids[i]);
+ }
+}
+
+// Test that if there are several tasks watching for file descriptors to be
+// available or simply waiting in the message loop are fairly scheduled to run.
+// In other words, this test ensures that having a file descriptor always
+// available doesn't prevent other file descriptors watching tasks or delayed
+// tasks to be dispatched, causing starvation.
+TYPED_TEST(MessageLoopTest, AllTasksAreEqual) {
+ int total_calls = 0;
+
+ // First, schedule a repeating timeout callback to run from the main loop.
+ int timeout_called = 0;
+ base::Closure timeout_callback;
+ MessageLoop::TaskId timeout_task;
+ timeout_callback = base::Bind(
+ [this, &timeout_called, &total_calls, &timeout_callback, &timeout_task] {
+ timeout_called++;
+ total_calls++;
+ timeout_task = this->loop_->PostTask(FROM_HERE, Bind(timeout_callback));
+ if (total_calls > 100)
+ this->loop_->BreakLoop();
+ });
+ timeout_task = this->loop_->PostTask(FROM_HERE, timeout_callback);
+
+ // Second, schedule several file descriptor watchers.
+ const int kNumTasks = 3;
+ ScopedPipe pipes[kNumTasks];
+ MessageLoop::TaskId tasks[kNumTasks];
+
+ int reads[kNumTasks] = {};
+ auto fd_callback = [this, &pipes, &reads, &total_calls](int i) {
+ reads[i]++;
+ total_calls++;
+ char c;
+ EXPECT_EQ(1, HANDLE_EINTR(read(pipes[i].reader, &c, 1)));
+ if (total_calls > 100)
+ this->loop_->BreakLoop();
+ };
+
+ for (int i = 0; i < kNumTasks; ++i) {
+ tasks[i] = this->loop_->WatchFileDescriptor(
+ FROM_HERE, pipes[i].reader, MessageLoop::kWatchRead,
+ true /* persistent */,
+ Bind(fd_callback, i));
+ // Make enough bytes available on each file descriptor. This should not
+ // block because we set the size of the file descriptor buffer when
+ // creating it.
+ std::vector<char> blob(1000, 'a');
+ EXPECT_EQ(blob.size(),
+ HANDLE_EINTR(write(pipes[i].writer, blob.data(), blob.size())));
+ }
+ this->loop_->Run();
+ EXPECT_GT(total_calls, 100);
+ // We run the loop up 100 times and expect each callback to run at least 10
+ // times. A good scheduler should balance these callbacks.
+ EXPECT_GE(timeout_called, 10);
+ EXPECT_TRUE(this->loop_->CancelTask(timeout_task));
+ for (int i = 0; i < kNumTasks; ++i) {
+ EXPECT_GE(reads[i], 10) << "Reading from pipes[" << i << "], fd "
+ << pipes[i].reader;
+ EXPECT_TRUE(this->loop_->CancelTask(tasks[i]));
+ }
+}
+
} // namespace chromeos