aboutsummaryrefslogtreecommitdiff
path: root/chromeos
diff options
context:
space:
mode:
authorAlex Deymo <deymo@chromium.org>2015-06-25 11:08:30 -0700
committerChromeOS Commit Bot <chromeos-commit-bot@chromium.org>2015-06-30 12:22:05 +0000
commit3845c6dfdf6c1edb45271e691990c12ef92f77c7 (patch)
tree60895345056a2a49943a5dbb4c12a9f26ae601b6 /chromeos
parente9ff62a60c6ff34275f5c2f056d27e8753ae0646 (diff)
downloadlibbrillo-3845c6dfdf6c1edb45271e691990c12ef92f77c7.tar.gz
libchromeos: Implement I/O watching in MessageLoop.
This patch introduces a simple I/O watching method to the MessageLoop interface following the pattern used by shill and what glib and libevent can offer as implementation backend. This also includes implementations for the GlibMessageLoop and the FakeMessageLoop. BUG=brillo:91,chromium:402066 TEST=Added unittest to validate GlibMessageLoop. Change-Id: I0a0032bc40f6fd046b8b98076e0f553cdcd69051 Reviewed-on: https://chromium-review.googlesource.com/282110 Reviewed-by: Alex Deymo <deymo@chromium.org> Tested-by: Alex Deymo <deymo@chromium.org> Commit-Queue: Alex Deymo <deymo@chromium.org>
Diffstat (limited to 'chromeos')
-rw-r--r--chromeos/message_loops/fake_message_loop.cc75
-rw-r--r--chromeos/message_loops/fake_message_loop.h38
-rw-r--r--chromeos/message_loops/fake_message_loop_unittest.cc26
-rw-r--r--chromeos/message_loops/glib_message_loop.cc113
-rw-r--r--chromeos/message_loops/glib_message_loop.h23
-rw-r--r--chromeos/message_loops/glib_message_loop_unittest.cc89
-rw-r--r--chromeos/message_loops/message_loop.h28
7 files changed, 366 insertions, 26 deletions
diff --git a/chromeos/message_loops/fake_message_loop.cc b/chromeos/message_loops/fake_message_loop.cc
index d774120..089591d 100644
--- a/chromeos/message_loops/fake_message_loop.cc
+++ b/chromeos/message_loops/fake_message_loop.cc
@@ -23,9 +23,9 @@ MessageLoop::TaskId FakeMessageLoop::PostDelayedTask(
if (test_clock_)
current_time_ = test_clock_->Now();
MessageLoop::TaskId current_id = ++last_id_;
- if (!current_id)
- current_id = ++last_id_;
- tasks_.emplace(current_id, std::make_pair(from_here, task));
+ // FakeMessageLoop is limited to only 2^64 tasks. That should be enough.
+ CHECK(current_id);
+ tasks_.emplace(current_id, ScheduledTask{from_here, false, task});
fire_order_.push(std::make_pair(current_time_ + delay, current_id));
VLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << current_id
<< " to run at " << current_time_ + delay
@@ -33,6 +33,20 @@ MessageLoop::TaskId FakeMessageLoop::PostDelayedTask(
return current_id;
}
+MessageLoop::TaskId FakeMessageLoop::WatchFileDescriptor(
+ const tracked_objects::Location& from_here,
+ int fd,
+ WatchMode mode,
+ bool persistent,
+ const base::Closure &task) {
+ MessageLoop::TaskId current_id = ++last_id_;
+ // FakeMessageLoop is limited to only 2^64 tasks. That should be enough.
+ CHECK(current_id);
+ tasks_.emplace(current_id, ScheduledTask{from_here, persistent, task});
+ fds_watched_.emplace(std::make_pair(fd, mode), current_id);
+ return current_id;
+}
+
bool FakeMessageLoop::CancelTask(TaskId task_id) {
if (task_id == MessageLoop::kTaskIdNull)
return false;
@@ -44,6 +58,37 @@ bool FakeMessageLoop::CancelTask(TaskId task_id) {
bool FakeMessageLoop::RunOnce(bool may_block) {
if (test_clock_)
current_time_ = test_clock_->Now();
+ // Try to fire ready file descriptors first.
+ for (const auto& fd_mode : fds_ready_) {
+ const auto& fd_watched = fds_watched_.find(fd_mode);
+ if (fd_watched == fds_watched_.end())
+ continue;
+ // The fd_watched->second task might have been canceled and we never removed
+ // it from the fds_watched_, so we fix that now.
+ const auto& scheduled_task_ref = tasks_.find(fd_watched->second);
+ if (scheduled_task_ref == tasks_.end()) {
+ fds_watched_.erase(fd_watched);
+ continue;
+ }
+ VLOG_LOC(scheduled_task_ref->second.location, 1)
+ << "Running task_id " << fd_watched->second
+ << " for watching file descriptor " << fd_mode.first << " for "
+ << (fd_mode.second == MessageLoop::kWatchRead ? "reading" : "writing")
+ << (scheduled_task_ref->second.persistent ?
+ " persistently" : " just once")
+ << " scheduled from this location.";
+ if (scheduled_task_ref->second.persistent) {
+ scheduled_task_ref->second.callback.Run();
+ } else {
+ base::Closure callback = std::move(scheduled_task_ref->second.callback);
+ tasks_.erase(scheduled_task_ref);
+ fds_watched_.erase(fd_watched);
+ callback.Run();
+ }
+ return true;
+ }
+
+ // Try to fire time-based callbacks afterwards.
while (!fire_order_.empty() &&
(may_block || fire_order_.top().first <= current_time_)) {
const auto task_ref = fire_order_.top();
@@ -51,8 +96,8 @@ bool FakeMessageLoop::RunOnce(bool may_block) {
// We need to skip tasks in the priority_queue not in the |tasks_| map.
// This is normal if the task was canceled, as there is no efficient way
// to remove a task from the priority_queue.
- const auto location_task = tasks_.find(task_ref.second);
- if (location_task == tasks_.end())
+ const auto scheduled_task_ref = tasks_.find(task_ref.second);
+ if (scheduled_task_ref == tasks_.end())
continue;
// Advance the clock to the task firing time, if needed.
if (current_time_ < task_ref.first) {
@@ -63,11 +108,11 @@ bool FakeMessageLoop::RunOnce(bool may_block) {
// Move the Closure out of the map before delete it. We need to delete the
// entry from the map before we call the callback, since calling CancelTask
// for the task you are running now should fail and return false.
- base::Closure callback = std::move(location_task->second.second);
- VLOG_LOC(location_task->second.first, 1)
+ base::Closure callback = std::move(scheduled_task_ref->second.callback);
+ VLOG_LOC(scheduled_task_ref->second.location, 1)
<< "Running task_id " << task_ref.second
<< " at time " << current_time_ << " from this location.";
- tasks_.erase(location_task);
+ tasks_.erase(scheduled_task_ref);
callback.Run();
return true;
@@ -75,10 +120,20 @@ bool FakeMessageLoop::RunOnce(bool may_block) {
return false;
}
+void FakeMessageLoop::SetFileDescriptorReadiness(int fd,
+ WatchMode mode,
+ bool ready) {
+ if (ready)
+ fds_ready_.emplace(fd, mode);
+ else
+ fds_ready_.erase(std::make_pair(fd, mode));
+}
+
bool FakeMessageLoop::PendingTasks() {
for (const auto& task : tasks_) {
- VLOG_LOC(task.second.first, 1) << "Pending task_id " << task.first
- << " scheduled from here.";
+ VLOG_LOC(task.second.location, 1)
+ << "Pending " << (task.second.persistent ? "persistent " : "")
+ << "task_id " << task.first << " scheduled from here.";
}
return !tasks_.empty();
}
diff --git a/chromeos/message_loops/fake_message_loop.h b/chromeos/message_loops/fake_message_loop.h
index e2e199c..4d6e7ca 100644
--- a/chromeos/message_loops/fake_message_loop.h
+++ b/chromeos/message_loops/fake_message_loop.h
@@ -8,6 +8,7 @@
#include <functional>
#include <map>
#include <queue>
+#include <set>
#include <utility>
#include <vector>
@@ -35,21 +36,41 @@ class CHROMEOS_EXPORT FakeMessageLoop : public MessageLoop {
explicit FakeMessageLoop(base::SimpleTestClock* clock);
~FakeMessageLoop() override = default;
- MessageLoop::TaskId PostDelayedTask(
- const tracked_objects::Location& from_here,
- const base::Closure &task,
- base::TimeDelta delay) override;
+ TaskId PostDelayedTask(const tracked_objects::Location& from_here,
+ const base::Closure &task,
+ base::TimeDelta delay) override;
using MessageLoop::PostDelayedTask;
+ TaskId WatchFileDescriptor(const tracked_objects::Location& from_here,
+ int fd,
+ WatchMode mode,
+ bool persistent,
+ const base::Closure &task) override;
+ using MessageLoop::WatchFileDescriptor;
bool CancelTask(TaskId task_id) override;
bool RunOnce(bool may_block) override;
// FakeMessageLoop methods:
+ // Pretend, for the purpose of the FakeMessageLoop watching for file
+ // descriptors, that the file descriptor |fd| readiness to perform the
+ // operation described by |mode| is |ready|. Initially, no file descriptor
+ // is ready for any operation.
+ void SetFileDescriptorReadiness(int fd, WatchMode mode, bool ready);
+
// Return whether there are peding tasks. Useful to check that no
// callbacks were leaked.
bool PendingTasks();
private:
+ struct ScheduledTask {
+ tracked_objects::Location location;
+ bool persistent;
+ base::Closure callback;
+ };
+
+ // The sparse list of scheduled pending callbacks.
+ std::map<MessageLoop::TaskId, ScheduledTask> tasks_;
+
// Using std::greater<> for the priority_queue means that the top() of the
// queue is the lowest (earliest) time, and for the same time, the smallest
// TaskId. This determines the order in which the tasks will be fired.
@@ -57,8 +78,13 @@ class CHROMEOS_EXPORT FakeMessageLoop : public MessageLoop {
std::pair<base::Time, MessageLoop::TaskId>,
std::vector<std::pair<base::Time, MessageLoop::TaskId>>,
std::greater<std::pair<base::Time, MessageLoop::TaskId>>> fire_order_;
- std::map<MessageLoop::TaskId,
- std::pair<tracked_objects::Location, base::Closure>> tasks_;
+
+ // The bag of watched (fd, mode) pair associated with the TaskId that's
+ // watching them.
+ std::multimap<std::pair<int, WatchMode>, MessageLoop::TaskId> fds_watched_;
+
+ // The set of (fd, mode) pairs that are faked as ready.
+ std::set<std::pair<int, WatchMode>> fds_ready_;
base::SimpleTestClock* test_clock_ = nullptr;
base::Time current_time_ = base::Time::FromDoubleT(1246996800.);
diff --git a/chromeos/message_loops/fake_message_loop_unittest.cc b/chromeos/message_loops/fake_message_loop_unittest.cc
index d2582fb..cec715d 100644
--- a/chromeos/message_loops/fake_message_loop_unittest.cc
+++ b/chromeos/message_loops/fake_message_loop_unittest.cc
@@ -81,6 +81,32 @@ TEST_F(FakeMessageLoopTest, PostDelayedTaskAdvancesTheTime) {
EXPECT_EQ(start + TimeDelta::FromSeconds(3), clock_.Now());
}
+TEST_F(FakeMessageLoopTest, WatchFileDescriptorWaits) {
+ int fd = 1234;
+ // We will simulate this situation. At the beginning, we will watch for a
+ // file descriptor that won't trigger for 10s. Then we will pretend it is
+ // ready after 10s and expect its callback to run just once.
+ int called = 0;
+ TaskId task_id = loop_->WatchFileDescriptor(
+ FROM_HERE, fd, MessageLoop::kWatchRead, false,
+ Bind([&called] { called++; }));
+ EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
+
+ EXPECT_NE(MessageLoop::kTaskIdNull,
+ loop_->PostDelayedTask(Bind([this] { this->loop_->BreakLoop(); }),
+ TimeDelta::FromSeconds(10)));
+ EXPECT_NE(MessageLoop::kTaskIdNull,
+ loop_->PostDelayedTask(Bind([this] { this->loop_->BreakLoop(); }),
+ TimeDelta::FromSeconds(20)));
+ loop_->Run();
+ EXPECT_EQ(0, called);
+
+ loop_->SetFileDescriptorReadiness(fd, MessageLoop::kWatchRead, true);
+ loop_->Run();
+ EXPECT_EQ(1, called);
+ EXPECT_FALSE(loop_->CancelTask(task_id));
+}
+
TEST_F(FakeMessageLoopTest, PendingTasksTest) {
loop_->PostDelayedTask(Bind(&base::DoNothing), TimeDelta::FromSeconds(1));
EXPECT_TRUE(loop_->PendingTasks());
diff --git a/chromeos/message_loops/glib_message_loop.cc b/chromeos/message_loops/glib_message_loop.cc
index ec4588e..524394b 100644
--- a/chromeos/message_loops/glib_message_loop.cc
+++ b/chromeos/message_loops/glib_message_loop.cc
@@ -4,6 +4,9 @@
#include <chromeos/message_loops/glib_message_loop.h>
+#include <fcntl.h>
+#include <unistd.h>
+
#include <chromeos/location_logging.h>
using base::Closure;
@@ -29,19 +32,91 @@ MessageLoop::TaskId GlibMessageLoop::PostDelayedTask(
const tracked_objects::Location& from_here,
const Closure &task,
base::TimeDelta delay) {
- MessageLoop::TaskId task_id = ++last_id_;
- if (!task_id)
- task_id = ++last_id_;
+ TaskId task_id = NextTaskId();
+ // Note: While we store persistent = false in the ScheduledTask object, we
+ // don't check it in OnRanPostedTask() since it is always false for delayed
+ // tasks. This is only used for WatchFileDescriptor below.
ScheduledTask* scheduled_task = new ScheduledTask{
- this, from_here, task_id, 0, std::move(task)};
+ this, from_here, task_id, 0, false, std::move(task)};
DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
<< " to run in " << delay << ".";
scheduled_task->source_id = g_timeout_add_full(
G_PRIORITY_DEFAULT,
delay.InMillisecondsRoundedUp(),
- OnRanPostedTask,
+ &GlibMessageLoop::OnRanPostedTask,
+ reinterpret_cast<gpointer>(scheduled_task),
+ DestroyPostedTask);
+ tasks_[task_id] = scheduled_task;
+ return task_id;
+}
+
+MessageLoop::TaskId GlibMessageLoop::WatchFileDescriptor(
+ const tracked_objects::Location& from_here,
+ int fd,
+ WatchMode mode,
+ bool persistent,
+ const Closure &task) {
+ // Quick check to see if the fd is valid.
+ if (fcntl(fd, F_GETFD) == -1 && errno == EBADF)
+ return MessageLoop::kTaskIdNull;
+
+ GIOCondition condition = G_IO_NVAL;
+ switch (mode) {
+ case MessageLoop::kWatchRead:
+ condition = G_IO_IN;
+ break;
+ case MessageLoop::kWatchWrite:
+ condition = G_IO_OUT;
+ break;
+ default:
+ return MessageLoop::kTaskIdNull;
+ }
+
+ // TODO(deymo): Used g_unix_fd_add_full() instead of g_io_add_watch_full()
+ // when/if we switch to glib 2.36 or newer so we don't need to create a
+ // GIOChannel for this.
+ GIOChannel* io_channel = g_io_channel_unix_new(fd);
+ if (!io_channel)
+ return MessageLoop::kTaskIdNull;
+ GError* error = nullptr;
+ GIOStatus status = g_io_channel_set_encoding(io_channel, nullptr, &error);
+ if (status != G_IO_STATUS_NORMAL) {
+ LOG(ERROR) << "GError(" << error->code << "): "
+ << (error->message ? error->message : "(unknown)");
+ g_error_free(error);
+ // g_io_channel_set_encoding() documentation states that this should be
+ // valid in this context (a new io_channel), but enforce the check in
+ // debug mode.
+ DCHECK(status == G_IO_STATUS_NORMAL);
+ return MessageLoop::kTaskIdNull;
+ }
+
+ TaskId task_id = NextTaskId();
+ ScheduledTask* scheduled_task = new ScheduledTask{
+ this, from_here, task_id, 0, persistent, std::move(task)};
+ scheduled_task->source_id = g_io_add_watch_full(
+ io_channel,
+ G_PRIORITY_DEFAULT,
+ condition,
+ &GlibMessageLoop::OnWatchedFdReady,
reinterpret_cast<gpointer>(scheduled_task),
DestroyPostedTask);
+ // g_io_add_watch_full() increases the reference count on the newly created
+ // io_channel, so we can dereference it now and it will be free'd once the
+ // source is removed or now if g_io_add_watch_full() failed.
+ g_io_channel_unref(io_channel);
+
+ DVLOG_LOC(from_here, 1)
+ << "Watching fd " << fd << " for "
+ << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
+ << (persistent ? " persistently" : " just once")
+ << " as task_id " << task_id
+ << (scheduled_task->source_id ? " successfully" : " failed.");
+
+ if (!scheduled_task->source_id) {
+ delete scheduled_task;
+ return MessageLoop::kTaskIdNull;
+ }
tasks_[task_id] = scheduled_task;
return task_id;
}
@@ -74,10 +149,19 @@ void GlibMessageLoop::BreakLoop() {
g_main_loop_quit(loop_);
}
+MessageLoop::TaskId GlibMessageLoop::NextTaskId() {
+ TaskId res;
+ do {
+ res = ++last_id_;
+ // We would run out of memory before we run out of task ids.
+ } while (!res || tasks_.find(res) != tasks_.end());
+ return res;
+}
+
gboolean GlibMessageLoop::OnRanPostedTask(gpointer user_data) {
ScheduledTask* scheduled_task = reinterpret_cast<ScheduledTask*>(user_data);
DVLOG_LOC(scheduled_task->location, 1)
- << "Running task_id " << scheduled_task->task_id
+ << "Running delayed task_id " << scheduled_task->task_id
<< " scheduled from this location.";
// We only need to remove this task_id from the map. DestroyPostedTask will be
// called with this same |user_data| where we can delete the ScheduledTask.
@@ -86,6 +170,23 @@ gboolean GlibMessageLoop::OnRanPostedTask(gpointer user_data) {
return FALSE; // Removes the source since a callback can only be called once.
}
+gboolean GlibMessageLoop::OnWatchedFdReady(GIOChannel *source,
+ GIOCondition condition,
+ gpointer user_data) {
+ ScheduledTask* scheduled_task = reinterpret_cast<ScheduledTask*>(user_data);
+ DVLOG_LOC(scheduled_task->location, 1)
+ << "Running task_id " << scheduled_task->task_id
+ << " for watching a file descriptor, scheduled from this location.";
+ if (!scheduled_task->persistent) {
+ // We only need to remove this task_id from the map. DestroyPostedTask will
+ // be called with this same |user_data| where we can delete the
+ // ScheduledTask.
+ scheduled_task->loop->tasks_.erase(scheduled_task->task_id);
+ }
+ scheduled_task->closure.Run();
+ return scheduled_task->persistent;
+}
+
void GlibMessageLoop::DestroyPostedTask(gpointer user_data) {
delete reinterpret_cast<ScheduledTask*>(user_data);
}
diff --git a/chromeos/message_loops/glib_message_loop.h b/chromeos/message_loops/glib_message_loop.h
index 2c4eeb2..92f570b 100644
--- a/chromeos/message_loops/glib_message_loop.h
+++ b/chromeos/message_loops/glib_message_loop.h
@@ -23,11 +23,16 @@ class CHROMEOS_EXPORT GlibMessageLoop : public MessageLoop {
~GlibMessageLoop() override;
// MessageLoop overrides.
- MessageLoop::TaskId PostDelayedTask(
- const tracked_objects::Location& from_here,
- const base::Closure &task,
- base::TimeDelta delay) override;
+ TaskId PostDelayedTask(const tracked_objects::Location& from_here,
+ const base::Closure &task,
+ base::TimeDelta delay) override;
using MessageLoop::PostDelayedTask;
+ TaskId WatchFileDescriptor(const tracked_objects::Location& from_here,
+ int fd,
+ WatchMode mode,
+ bool persistent,
+ const base::Closure &task) override;
+ using MessageLoop::WatchFileDescriptor;
bool CancelTask(TaskId task_id) override;
bool RunOnce(bool may_block) override;
void Run() override;
@@ -39,10 +44,19 @@ class CHROMEOS_EXPORT GlibMessageLoop : public MessageLoop {
// passed to this function as a gpointer on |user_data|.
static gboolean OnRanPostedTask(gpointer user_data);
+ // Called by the GLib's main loop when the watched source |source| is
+ // ready to perform the operation given in |condition| without blocking.
+ static gboolean OnWatchedFdReady(GIOChannel *source,
+ GIOCondition condition,
+ gpointer user_data);
+
// Called by the GLib's main loop when the scheduled callback is removed due
// to it being executed or canceled.
static void DestroyPostedTask(gpointer user_data);
+ // Return a new unused task_id.
+ TaskId NextTaskId();
+
GMainLoop* loop_;
struct ScheduledTask {
@@ -53,6 +67,7 @@ class CHROMEOS_EXPORT GlibMessageLoop : public MessageLoop {
MessageLoop::TaskId task_id;
guint source_id;
+ bool persistent;
base::Closure closure;
};
diff --git a/chromeos/message_loops/glib_message_loop_unittest.cc b/chromeos/message_loops/glib_message_loop_unittest.cc
index fa0ce78..4f00484 100644
--- a/chromeos/message_loops/glib_message_loop_unittest.cc
+++ b/chromeos/message_loops/glib_message_loop_unittest.cc
@@ -4,6 +4,9 @@
#include <chromeos/message_loops/glib_message_loop.h>
+#include <fcntl.h>
+#include <unistd.h>
+
#include <memory>
#include <base/bind.h>
@@ -17,6 +20,32 @@
using base::Bind;
using base::TimeDelta;
+namespace {
+// Helper class to create and close a unidirectional pipe. Used to provide valid
+// file descriptors when testing watching for a file descriptor.
+class ScopedPipe {
+ public:
+ ScopedPipe() {
+ int fds[2];
+ if (pipe(fds) != 0) {
+ PLOG(FATAL) << "Creating a pipe()";
+ }
+ reader = fds[0];
+ writer = fds[1];
+ }
+ ~ScopedPipe() {
+ if (reader != -1)
+ close(reader);
+ if (writer != -1)
+ close(writer);
+ }
+
+ // The reader and writer end of the pipe.
+ int reader{-1};
+ int writer{-1};
+};
+} // namespace
+
namespace chromeos {
using TaskId = MessageLoop::TaskId;
@@ -79,6 +108,66 @@ TEST_F(GlibMessageLoopTest, PostDelayedTaskWithoutLocation) {
EXPECT_EQ(1, MessageLoopRunMaxIterations(loop_.get(), 100));
}
+TEST_F(GlibMessageLoopTest, WatchForInvalidFD) {
+ bool called = false;
+ EXPECT_EQ(MessageLoop::kTaskIdNull, loop_->WatchFileDescriptor(
+ FROM_HERE, -1, MessageLoop::kWatchRead, true,
+ Bind([&called] { called = true; })));
+ EXPECT_EQ(MessageLoop::kTaskIdNull, loop_->WatchFileDescriptor(
+ FROM_HERE, -1, MessageLoop::kWatchWrite, true,
+ Bind([&called] { called = true; })));
+ EXPECT_EQ(0, MessageLoopRunMaxIterations(loop_.get(), 100));
+ EXPECT_FALSE(called);
+}
+
+TEST_F(GlibMessageLoopTest, CancelWatchedFileDescriptor) {
+ ScopedPipe pipe;
+ bool called = false;
+ TaskId task_id = loop_->WatchFileDescriptor(
+ FROM_HERE, pipe.reader, MessageLoop::kWatchRead, true,
+ Bind([&called] { called = true; }));
+ EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
+ // The reader end is blocked because we didn't write anything to the writer
+ // end.
+ EXPECT_EQ(0, MessageLoopRunMaxIterations(loop_.get(), 100));
+ EXPECT_FALSE(called);
+ EXPECT_TRUE(loop_->CancelTask(task_id));
+}
+
+// When a WatchFileDescriptor task is scheduled with |persistent| = true, we
+// should keep getting a call whenever the file descriptor is ready.
+TEST_F(GlibMessageLoopTest, WatchFileDescriptorPersistently) {
+ int fd = open("/dev/zero", O_RDONLY);
+ int called = 0;
+ TaskId task_id = loop_->WatchFileDescriptor(
+ FROM_HERE, fd, MessageLoop::kWatchRead, true,
+ Bind([&called] { called++; }));
+ EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
+ // We let the main loop run for 20 iterations to give it enough iterations to
+ // verify that our callback was called more than one. We only check that our
+ // callback is called more than once.
+ EXPECT_EQ(20, MessageLoopRunMaxIterations(loop_.get(), 20));
+ EXPECT_LT(1, called);
+ EXPECT_TRUE(loop_->CancelTask(task_id));
+ close(fd);
+}
+
+TEST_F(GlibMessageLoopTest, WatchFileDescriptorNonPersistent) {
+ int fd = open("/dev/zero", O_RDONLY);
+ int called = 0;
+ TaskId task_id = loop_->WatchFileDescriptor(
+ fd, MessageLoop::kWatchRead, false, Bind([&called] { called++; }));
+ EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
+ // We let the main loop run for 20 iterations but we just expect it to run
+ // at least once. The callback should be called exactly once since we
+ // scheduled it non-persistently. After it ran, we shouldn't be able to cancel
+ // this task.
+ EXPECT_LT(0, MessageLoopRunMaxIterations(loop_.get(), 20));
+ EXPECT_EQ(1, called);
+ EXPECT_FALSE(loop_->CancelTask(task_id));
+ close(fd);
+}
+
// Test that we can cancel the task we are running, and should just fail.
TEST_F(GlibMessageLoopTest, DeleteTaskFromSelf) {
bool cancel_result = true; // We would expect this to be false.
diff --git a/chromeos/message_loops/message_loop.h b/chromeos/message_loops/message_loop.h
index ed6fd2d..58fe5ea 100644
--- a/chromeos/message_loops/message_loop.h
+++ b/chromeos/message_loops/message_loop.h
@@ -62,6 +62,34 @@ class CHROMEOS_EXPORT MessageLoop {
return PostDelayedTask(from_here, task, base::TimeDelta());
}
+ // Watch mode flag used to watch for file descriptors.
+ enum WatchMode {
+ kWatchRead,
+ kWatchWrite,
+ };
+
+ // Watch a file descriptor |fd| for it to be ready to perform the operation
+ // passed in |mode| without blocking. When that happens, the |task| closure
+ // will be executed. If |persistent| is true, the file descriptor will
+ // continue to be watched and |task| will continue to be called until the task
+ // is canceled with CancelTask().
+ // Returns the TaskId describing this task. In case of error, returns
+ // kTaskIdNull.
+ virtual TaskId WatchFileDescriptor(const tracked_objects::Location& from_here,
+ int fd,
+ WatchMode mode,
+ bool persistent,
+ const base::Closure &task) = 0;
+
+ // Convenience function to call WatchFileDescriptor() without a location.
+ TaskId WatchFileDescriptor(int fd,
+ WatchMode mode,
+ bool persistent,
+ const base::Closure &task) {
+ return WatchFileDescriptor(
+ tracked_objects::Location(), fd, mode, persistent, task);
+ }
+
// Cancel a scheduled task. Returns whether the task was canceled. For
// example, if the callback was already executed (or is being executed) or was
// already canceled this method will fail. Note that the TaskId can be reused