aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHidehiko Abe <hidehiko@chromium.org>2020-07-28 07:59:13 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2020-07-28 07:59:13 +0000
commitd4bf0e9fd29413d3d07b98da2811a4f6b8efd97b (patch)
treec081eed0301e2af00a8d116ba23d6f62c3105e47
parent6565bfd206ab8dac0e31bb11ce15b1e09cb30fbb (diff)
parent48220c6930cca0f9fc1c341289c7d8bc128df542 (diff)
downloadlibbrillo-d4bf0e9fd29413d3d07b98da2811a4f6b8efd97b.tar.gz
libbrillo: Remove WatchFileDescriptor. am: 295ef84cb2 am: 4b4f0480d0 am: 48220c6930
Original change: https://android-review.googlesource.com/c/platform/external/libbrillo/+/1371782 Change-Id: I57b6119e1da1e080a24298cb4f24e21c6537d7bb
-rw-r--r--brillo/asynchronous_signal_handler.cc29
-rw-r--r--brillo/asynchronous_signal_handler.h8
-rw-r--r--brillo/binder_watcher.cc26
-rw-r--r--brillo/binder_watcher.h10
-rw-r--r--brillo/http/http_transport_curl.cc79
-rw-r--r--brillo/message_loops/base_message_loop.cc230
-rw-r--r--brillo/message_loops/base_message_loop.h77
-rw-r--r--brillo/message_loops/fake_message_loop.cc53
-rw-r--r--brillo/message_loops/fake_message_loop.h19
-rw-r--r--brillo/message_loops/fake_message_loop_test.cc28
-rw-r--r--brillo/message_loops/message_loop.h27
-rw-r--r--brillo/message_loops/message_loop_test.cc238
-rw-r--r--brillo/message_loops/mock_message_loop.h14
-rw-r--r--brillo/streams/file_stream.cc56
14 files changed, 86 insertions, 808 deletions
diff --git a/brillo/asynchronous_signal_handler.cc b/brillo/asynchronous_signal_handler.cc
index b8ec529..d5fed50 100644
--- a/brillo/asynchronous_signal_handler.cc
+++ b/brillo/asynchronous_signal_handler.cc
@@ -11,7 +11,6 @@
#include <base/bind.h>
#include <base/files/file_util.h>
#include <base/logging.h>
-#include <base/message_loop/message_loop.h>
#include <base/posix/eintr_wrapper.h>
namespace {
@@ -28,15 +27,15 @@ AsynchronousSignalHandler::AsynchronousSignalHandler()
}
AsynchronousSignalHandler::~AsynchronousSignalHandler() {
- if (descriptor_ != kInvalidDescriptor) {
- MessageLoop::current()->CancelTask(fd_watcher_task_);
+ fd_watcher_ = nullptr;
- if (IGNORE_EINTR(close(descriptor_)) != 0)
- PLOG(WARNING) << "Failed to close file descriptor";
+ if (descriptor_ == kInvalidDescriptor)
+ return;
- descriptor_ = kInvalidDescriptor;
- CHECK_EQ(0, sigprocmask(SIG_SETMASK, &saved_signal_mask_, nullptr));
- }
+ if (IGNORE_EINTR(close(descriptor_)) != 0)
+ PLOG(WARNING) << "Failed to close file descriptor";
+ descriptor_ = kInvalidDescriptor;
+ CHECK_EQ(0, sigprocmask(SIG_SETMASK, &saved_signal_mask_, nullptr));
}
void AsynchronousSignalHandler::Init() {
@@ -45,15 +44,11 @@ void AsynchronousSignalHandler::Init() {
descriptor_ =
signalfd(descriptor_, &signal_mask_, SFD_CLOEXEC | SFD_NONBLOCK);
CHECK_NE(kInvalidDescriptor, descriptor_);
- fd_watcher_task_ = MessageLoop::current()->WatchFileDescriptor(
- FROM_HERE,
+ fd_watcher_ = base::FileDescriptorWatcher::WatchReadable(
descriptor_,
- MessageLoop::WatchMode::kWatchRead,
- true,
- base::Bind(&AsynchronousSignalHandler::OnFileCanReadWithoutBlocking,
- base::Unretained(this)));
- CHECK(fd_watcher_task_ != MessageLoop::kTaskIdNull)
- << "Watching shutdown pipe failed.";
+ base::BindRepeating(&AsynchronousSignalHandler::OnReadable,
+ base::Unretained(this)));
+ CHECK(fd_watcher_) << "Watching shutdown pipe failed.";
}
void AsynchronousSignalHandler::RegisterHandler(int signal,
@@ -71,7 +66,7 @@ void AsynchronousSignalHandler::UnregisterHandler(int signal) {
}
}
-void AsynchronousSignalHandler::OnFileCanReadWithoutBlocking() {
+void AsynchronousSignalHandler::OnReadable() {
struct signalfd_siginfo info;
while (base::ReadFromFD(descriptor_,
reinterpret_cast<char*>(&info), sizeof(info))) {
diff --git a/brillo/asynchronous_signal_handler.h b/brillo/asynchronous_signal_handler.h
index ceae1ff..903e2ef 100644
--- a/brillo/asynchronous_signal_handler.h
+++ b/brillo/asynchronous_signal_handler.h
@@ -9,14 +9,14 @@
#include <sys/signalfd.h>
#include <map>
+#include <memory>
#include <base/callback.h>
#include <base/compiler_specific.h>
+#include <base/files/file_descriptor_watcher_posix.h>
#include <base/macros.h>
-#include <base/message_loop/message_loop.h>
#include <brillo/asynchronous_signal_handler_interface.h>
#include <brillo/brillo_export.h>
-#include <brillo/message_loops/message_loop.h>
namespace brillo {
// Sets up signal handlers for registered signals, and converts signal receipt
@@ -40,10 +40,10 @@ class BRILLO_EXPORT AsynchronousSignalHandler final :
private:
// Called from the main loop when we can read from |descriptor_|, indicated
// that a signal was processed.
- void OnFileCanReadWithoutBlocking();
+ void OnReadable();
// Controller used to manage watching of signalling pipe.
- MessageLoop::TaskId fd_watcher_task_{MessageLoop::kTaskIdNull};
+ std::unique_ptr<base::FileDescriptorWatcher::Controller> fd_watcher_;
// The registered callbacks.
typedef std::map<int, SignalHandler> Callbacks;
diff --git a/brillo/binder_watcher.cc b/brillo/binder_watcher.cc
index 9752204..51b0f59 100644
--- a/brillo/binder_watcher.cc
+++ b/brillo/binder_watcher.cc
@@ -33,24 +33,11 @@ void OnBinderReadReady() {
namespace brillo {
-BinderWatcher::BinderWatcher(MessageLoop* message_loop)
- : message_loop_(message_loop) {}
+BinderWatcher::BinderWatcher() = default;
-BinderWatcher::BinderWatcher() : message_loop_(nullptr) {}
-
-BinderWatcher::~BinderWatcher() {
- if (task_id_ != MessageLoop::kTaskIdNull)
- message_loop_->CancelTask(task_id_);
-}
+BinderWatcher::~BinderWatcher() = default;
bool BinderWatcher::Init() {
- if (!message_loop_)
- message_loop_ = MessageLoop::current();
- if (!message_loop_) {
- LOG(ERROR) << "Must initialize a brillo::MessageLoop to use BinderWatcher";
- return false;
- }
-
int binder_fd = -1;
ProcessState::self()->setThreadPoolMaxThreadCount(0);
IPCThreadState::self()->disableBackgroundScheduling(true);
@@ -66,13 +53,10 @@ bool BinderWatcher::Init() {
}
VLOG(1) << "Got binder FD " << binder_fd;
- task_id_ = message_loop_->WatchFileDescriptor(
- FROM_HERE,
+ watcher_ = base::FileDescriptorWatcher::WatchReadable(
binder_fd,
- MessageLoop::kWatchRead,
- true /* persistent */,
- base::Bind(&OnBinderReadReady));
- if (task_id_ == MessageLoop::kTaskIdNull) {
+ base::BindRepeating(&OnBinderReadReady));
+ if (!watcher_) {
LOG(ERROR) << "Failed to watch binder FD";
return false;
}
diff --git a/brillo/binder_watcher.h b/brillo/binder_watcher.h
index ece999d..d7af50e 100644
--- a/brillo/binder_watcher.h
+++ b/brillo/binder_watcher.h
@@ -17,8 +17,10 @@
#ifndef LIBBRILLO_BRILLO_BINDER_WATCHER_H_
#define LIBBRILLO_BRILLO_BINDER_WATCHER_H_
+#include <memory>
+
+#include <base/files/file_descriptor_watcher_posix.h>
#include <base/macros.h>
-#include <brillo/message_loops/message_loop.h>
namespace brillo {
@@ -26,9 +28,6 @@ namespace brillo {
// make the message loop watch for binder events and pass them to libbinder.
class BinderWatcher final {
public:
- // Construct the BinderWatcher using the passed |message_loop| if not null or
- // the current MessageLoop otherwise.
- explicit BinderWatcher(MessageLoop* message_loop);
BinderWatcher();
~BinderWatcher();
@@ -36,8 +35,7 @@ class BinderWatcher final {
bool Init();
private:
- MessageLoop::TaskId task_id_{MessageLoop::kTaskIdNull};
- MessageLoop* message_loop_;
+ std::unique_ptr<base::FileDescriptorWatcher::Controller> watcher_;
DISALLOW_COPY_AND_ASSIGN(BinderWatcher);
};
diff --git a/brillo/http/http_transport_curl.cc b/brillo/http/http_transport_curl.cc
index 45a28a3..741cb3c 100644
--- a/brillo/http/http_transport_curl.cc
+++ b/brillo/http/http_transport_curl.cc
@@ -7,6 +7,7 @@
#include <limits>
#include <base/bind.h>
+#include <base/files/file_descriptor_watcher_posix.h>
#include <base/files/file_util.h>
#include <base/logging.h>
#include <base/message_loop/message_loop.h>
@@ -22,7 +23,7 @@ namespace curl {
// This is a class that stores connection data on particular CURL socket
// and provides file descriptor watcher to monitor read and/or write operations
// on the socket's file descriptor.
-class Transport::SocketPollData : public base::MessagePumpForIO::FdWatcher {
+class Transport::SocketPollData {
public:
SocketPollData(const std::shared_ptr<CurlInterface>& curl_interface,
CURLM* curl_multi_handle,
@@ -31,27 +32,35 @@ class Transport::SocketPollData : public base::MessagePumpForIO::FdWatcher {
: curl_interface_(curl_interface),
curl_multi_handle_(curl_multi_handle),
transport_(transport),
- socket_fd_(socket_fd),
- file_descriptor_watcher_(FROM_HERE) {}
+ socket_fd_(socket_fd) {}
- // Returns the pointer for the socket-specific file descriptor watcher.
- base::MessagePumpForIO::FdWatchController* GetWatcher() {
- return &file_descriptor_watcher_;
+ void StopWatcher() {
+ read_watcher_ = nullptr;
+ write_watcher_ = nullptr;
}
- private:
- // Overrides from base::MessagePumpForIO::Watcher.
- void OnFileCanReadWithoutBlocking(int fd) override {
- OnSocketReady(fd, CURL_CSELECT_IN);
+ bool WatchReadable() {
+ read_watcher_ = base::FileDescriptorWatcher::WatchReadable(
+ socket_fd_,
+ base::BindRepeating(&Transport::SocketPollData::OnSocketReady,
+ base::Unretained(this),
+ CURL_CSELECT_IN));
+ return read_watcher_.get();
}
- void OnFileCanWriteWithoutBlocking(int fd) override {
- OnSocketReady(fd, CURL_CSELECT_OUT);
+
+ bool WatchWritable() {
+ write_watcher_ = base::FileDescriptorWatcher::WatchWritable(
+ socket_fd_,
+ base::BindRepeating(&Transport::SocketPollData::OnSocketReady,
+ base::Unretained(this),
+ CURL_CSELECT_OUT));
+ return write_watcher_.get();
}
+ private:
// Data on the socket is available to be read from or written to.
// Notify CURL of the action it needs to take on the socket file descriptor.
- void OnSocketReady(int fd, int action) {
- CHECK_EQ(socket_fd_, fd) << "Unexpected socket file descriptor";
+ void OnSocketReady(int action) {
int still_running_count = 0;
CURLMcode code = curl_interface_->MultiSocketAction(
curl_multi_handle_, socket_fd_, action, &still_running_count);
@@ -70,8 +79,9 @@ class Transport::SocketPollData : public base::MessagePumpForIO::FdWatcher {
Transport* transport_;
// The socket file descriptor for the connection.
curl_socket_t socket_fd_;
- // File descriptor watcher to notify us of asynchronous I/O on the FD.
- base::MessagePumpForIO::FdWatchController file_descriptor_watcher_;
+
+ std::unique_ptr<base::FileDescriptorWatcher::Controller> read_watcher_;
+ std::unique_ptr<base::FileDescriptorWatcher::Controller> write_watcher_;
DISALLOW_COPY_AND_ASSIGN(SocketPollData);
};
@@ -386,7 +396,7 @@ int Transport::MultiSocketCallback(CURL* easy,
// Make sure we stop watching the socket file descriptor now, before
// we schedule the SocketPollData for deletion.
- poll_data->GetWatcher()->StopWatchingFileDescriptor();
+ poll_data->StopWatcher();
// This method can be called indirectly from SocketPollData::OnSocketReady,
// so delay destruction of SocketPollData object till the next loop cycle.
base::MessageLoopForIO::current()->task_runner()->DeleteSoon(FROM_HERE,
@@ -394,34 +404,15 @@ int Transport::MultiSocketCallback(CURL* easy,
return 0;
}
- base::MessagePumpForIO::Mode watch_mode = base::MessagePumpForIO::WATCH_READ;
- switch (what) {
- case CURL_POLL_IN:
- watch_mode = base::MessagePumpForIO::WATCH_READ;
- break;
- case CURL_POLL_OUT:
- watch_mode = base::MessagePumpForIO::WATCH_WRITE;
- break;
- case CURL_POLL_INOUT:
- watch_mode = base::MessagePumpForIO::WATCH_READ_WRITE;
- break;
- default:
- LOG(FATAL) << "Unknown CURL socket action: " << what;
- break;
- }
+ poll_data->StopWatcher();
+
+ bool success = true;
+ if (what == CURL_POLL_IN || what == CURL_POLL_INOUT)
+ success = poll_data->WatchReadable() && success;
+ if (what == CURL_POLL_OUT || what == CURL_POLL_INOUT)
+ success = poll_data->WatchWritable() && success;
- // WatchFileDescriptor() can be called with the same controller object
- // (watcher) to amend the watch mode, however this has cumulative effect.
- // For example, if we were watching a file descriptor for READ operations
- // and now call it to watch for WRITE, it will end up watching for both
- // READ and WRITE. This is not what we want here, so stop watching the
- // file descriptor on previous controller before starting with a different
- // mode.
- if (!poll_data->GetWatcher()->StopWatchingFileDescriptor())
- LOG(WARNING) << "Failed to stop watching the previous socket descriptor";
- CHECK(base::MessageLoopForIO::current()->WatchFileDescriptor(
- s, true, watch_mode, poll_data->GetWatcher(), poll_data))
- << "Failed to watch the CURL socket.";
+ CHECK(success) << "Failed to watch the CURL socket.";
return 0;
}
diff --git a/brillo/message_loops/base_message_loop.cc b/brillo/message_loops/base_message_loop.cc
index 9a9e43f..0c2b9db 100644
--- a/brillo/message_loops/base_message_loop.cc
+++ b/brillo/message_loops/base_message_loop.cc
@@ -62,13 +62,6 @@ BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
watcher_(std::make_unique<base::FileDescriptorWatcher>(base_loop_)) {}
BaseMessageLoop::~BaseMessageLoop() {
- for (auto& io_task : io_tasks_) {
- DVLOG_LOC(io_task.second.location(), 1)
- << "Removing file descriptor watcher task_id " << io_task.first
- << " leaked on BaseMessageLoop, scheduled from this location.";
- io_task.second.StopWatching();
- }
-
// Note all pending canceled delayed tasks when destroying the message loop.
size_t lazily_deleted_tasks = 0;
for (const auto& delayed_task : delayed_tasks_) {
@@ -105,81 +98,13 @@ MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
return task_id;
}
-MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
- const base::Location& from_here,
- int fd,
- WatchMode mode,
- bool persistent,
- const Closure &task) {
- // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
- if (fd < 0)
- return MessageLoop::kTaskIdNull;
-
- base::MessagePumpForIO::Mode base_mode = base::MessagePumpForIO::WATCH_READ;
- switch (mode) {
- case MessageLoop::kWatchRead:
- base_mode = base::MessagePumpForIO::WATCH_READ;
- break;
- case MessageLoop::kWatchWrite:
- base_mode = base::MessagePumpForIO::WATCH_WRITE;
- break;
- default:
- return MessageLoop::kTaskIdNull;
- }
-
- TaskId task_id = NextTaskId();
- auto it_bool = io_tasks_.emplace(
- std::piecewise_construct,
- std::forward_as_tuple(task_id),
- std::forward_as_tuple(
- from_here, this, task_id, fd, base_mode, persistent, task));
- // This should always insert a new element.
- DCHECK(it_bool.second);
- bool scheduled = it_bool.first->second.StartWatching();
- DVLOG_LOC(from_here, 1)
- << "Watching fd " << fd << " for "
- << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
- << (persistent ? " persistently" : " just once")
- << " as task_id " << task_id
- << (scheduled ? " successfully" : " failed.");
-
- if (!scheduled) {
- io_tasks_.erase(task_id);
- return MessageLoop::kTaskIdNull;
- }
-
-#ifndef __ANDROID_HOST__
- // Determine if the passed fd is the binder file descriptor. For that, we need
- // to check that is a special char device and that the major and minor device
- // numbers match. The binder file descriptor can't be removed and added back
- // to an epoll group when there's work available to be done by the file
- // descriptor due to bugs in the binder driver (b/26524111) when used with
- // epoll. Therefore, we flag the binder fd and never attempt to remove it.
- // This may cause the binder file descriptor to be attended with higher
- // priority and cause starvation of other events.
- struct stat buf;
- if (fstat(fd, &buf) == 0 &&
- S_ISCHR(buf.st_mode) &&
- major(buf.st_rdev) == MISC_MAJOR &&
- minor(buf.st_rdev) == GetBinderMinor()) {
- it_bool.first->second.RunImmediately();
- }
-#endif
-
- return task_id;
-}
-
bool BaseMessageLoop::CancelTask(TaskId task_id) {
if (task_id == kTaskIdNull)
return false;
auto delayed_task_it = delayed_tasks_.find(task_id);
- if (delayed_task_it == delayed_tasks_.end()) {
- // This might be an IOTask then.
- auto io_task_it = io_tasks_.find(task_id);
- if (io_task_it == io_tasks_.end())
- return false;
- return io_task_it->second.CancelTask();
- }
+ if (delayed_task_it == delayed_tasks_.end())
+ return false;
+
// A DelayedTask was found for this task_id at this point.
// Check if the callback was already canceled but we have the entry in
@@ -241,8 +166,7 @@ MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
res = ++last_id_;
// We would run out of memory before we run out of task ids.
} while (!res ||
- delayed_tasks_.find(res) != delayed_tasks_.end() ||
- io_tasks_.find(res) != io_tasks_.end());
+ delayed_tasks_.find(res) != delayed_tasks_.end());
return res;
}
@@ -269,15 +193,6 @@ void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
delayed_tasks_.erase(task_it);
}
-void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
- auto task_it = io_tasks_.find(task_id);
- // Even if this task was canceled while we were waiting in the message loop
- // for this method to run, the entry in io_tasks_ should still be present, but
- // won't do anything.
- DCHECK(task_it != io_tasks_.end());
- task_it->second.OnFileReadyPostedTask();
-}
-
int BaseMessageLoop::ParseBinderMinor(
const std::string& file_contents) {
int result = kInvalidMinor;
@@ -311,141 +226,4 @@ unsigned int BaseMessageLoop::GetBinderMinor() {
return binder_minor_;
}
-BaseMessageLoop::IOTask::IOTask(const base::Location& location,
- BaseMessageLoop* loop,
- MessageLoop::TaskId task_id,
- int fd,
- base::MessagePumpForIO::Mode base_mode,
- bool persistent,
- const Closure& task)
- : location_(location), loop_(loop), task_id_(task_id),
- fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task),
- fd_watcher_(FROM_HERE) {}
-
-bool BaseMessageLoop::IOTask::StartWatching() {
- // Please see MessagePumpLibevent for definition.
- static_assert(std::is_same<base::MessagePumpForIO, base::MessagePumpLibevent>::value,
- "MessagePumpForIO::WatchFileDescriptor is not supported "
- "when MessagePumpForIO is not a MessagePumpLibevent.");
-
- return static_cast<base::MessagePumpLibevent*>(
- loop_->base_loop_->pump_.get())->WatchFileDescriptor(
- fd_, persistent_, base_mode_, &fd_watcher_, this);
-}
-
-void BaseMessageLoop::IOTask::StopWatching() {
- // This is safe to call even if we are not watching for it.
- fd_watcher_.StopWatchingFileDescriptor();
-}
-
-void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
- OnFileReady();
-}
-
-void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
- OnFileReady();
-}
-
-void BaseMessageLoop::IOTask::OnFileReady() {
- // For file descriptors marked with the immediate_run flag, we don't call
- // StopWatching() and wait, instead we dispatch the callback immediately.
- if (immediate_run_) {
- posted_task_pending_ = true;
- OnFileReadyPostedTask();
- return;
- }
-
- // When the file descriptor becomes available we stop watching for it and
- // schedule a task to run the callback from the main loop. The callback will
- // run using the same scheduler used to run other delayed tasks, avoiding
- // starvation of the available posted tasks if there are file descriptors
- // always available. The new posted task will use the same TaskId as the
- // current file descriptor watching task an could be canceled in either state,
- // when waiting for the file descriptor or waiting in the main loop.
- StopWatching();
- bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
- location_,
- base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
- loop_->weak_ptr_factory_.GetWeakPtr(),
- task_id_));
- posted_task_pending_ = true;
- if (base_scheduled) {
- DVLOG_LOC(location_, 1)
- << "Dispatching task_id " << task_id_ << " for "
- << (base_mode_ == base::MessagePumpForIO::WATCH_READ ?
- "reading" : "writing")
- << " file descriptor " << fd_ << ", scheduled from this location.";
- } else {
- // In the rare case that PostTask() fails, we fall back to run it directly.
- // This would indicate a bigger problem with the message loop setup.
- LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
- OnFileReadyPostedTask();
- }
-}
-
-void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
- // We can't access |this| after running the |closure_| since it could call
- // CancelTask on its own task_id, so we copy the members we need now.
- BaseMessageLoop* loop_ptr = loop_;
- DCHECK(posted_task_pending_ = true);
- posted_task_pending_ = false;
-
- // If this task was already canceled, the closure will be null and there is
- // nothing else to do here. This execution doesn't count a step for RunOnce()
- // unless we have a callback to run.
- if (closure_.is_null()) {
- loop_->io_tasks_.erase(task_id_);
- return;
- }
-
- DVLOG_LOC(location_, 1)
- << "Running task_id " << task_id_ << " for "
- << (base_mode_ == base::MessagePumpForIO::WATCH_READ ?
- "reading" : "writing")
- << " file descriptor " << fd_ << ", scheduled from this location.";
-
- if (persistent_) {
- // In the persistent case we just run the callback. If this callback cancels
- // the task id, we can't access |this| anymore, so we re-start watching the
- // file descriptor before running the callback, unless this is a fd where
- // we didn't stop watching the file descriptor when it became available.
- if (!immediate_run_)
- StartWatching();
- closure_.Run();
- } else {
- // This will destroy |this|, the fd_watcher and therefore stop watching this
- // file descriptor.
- Closure closure_copy = std::move(closure_);
- loop_->io_tasks_.erase(task_id_);
- // Run the closure from the local copy we just made.
- closure_copy.Run();
- }
-
- if (loop_ptr->run_once_) {
- loop_ptr->run_once_ = false;
- loop_ptr->BreakLoop();
- }
-}
-
-bool BaseMessageLoop::IOTask::CancelTask() {
- if (closure_.is_null())
- return false;
-
- DVLOG_LOC(location_, 1)
- << "Removing task_id " << task_id_ << " scheduled from this location.";
-
- if (!posted_task_pending_) {
- // Destroying the FileDescriptorWatcher implicitly stops watching the file
- // descriptor. This will delete our instance.
- loop_->io_tasks_.erase(task_id_);
- return true;
- }
- // The IOTask is waiting for the message loop to run its delayed task, so
- // it is not watching for the file descriptor. We release the closure
- // resources now but keep the IOTask instance alive while we wait for the
- // callback to run and delete the IOTask.
- closure_ = Closure();
- return true;
-}
-
} // namespace brillo
diff --git a/brillo/message_loops/base_message_loop.h b/brillo/message_loops/base_message_loop.h
index c038ac7..92c5bda 100644
--- a/brillo/message_loops/base_message_loop.h
+++ b/brillo/message_loops/base_message_loop.h
@@ -45,12 +45,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop {
const base::Closure& task,
base::TimeDelta delay) override;
using MessageLoop::PostDelayedTask;
- TaskId WatchFileDescriptor(const base::Location& from_here,
- int fd,
- WatchMode mode,
- bool persistent,
- const base::Closure& task) override;
- using MessageLoop::WatchFileDescriptor;
bool CancelTask(TaskId task_id) override;
bool RunOnce(bool may_block) override;
void Run() override;
@@ -75,12 +69,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop {
// scheduled with Post*Task() of id |task_id|, even if it was canceled.
void OnRanPostedTask(MessageLoop::TaskId task_id);
- // Called from the message loop when the IOTask should run the scheduled
- // callback. This is a simple wrapper of IOTask::OnFileReadyPostedTask()
- // posted from the BaseMessageLoop so it is deleted when the BaseMessageLoop
- // goes out of scope since we can't cancel the callback otherwise.
- void OnFileReadyPostedTask(MessageLoop::TaskId task_id);
-
// Return a new unused task_id.
TaskId NextTaskId();
@@ -94,68 +82,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop {
base::Closure closure;
};
- class IOTask : public base::MessagePumpForIO::FdWatcher {
- public:
- IOTask(const base::Location& location,
- BaseMessageLoop* loop,
- MessageLoop::TaskId task_id,
- int fd,
- base::MessagePumpForIO::Mode base_mode,
- bool persistent,
- const base::Closure& task);
-
- const base::Location& location() const { return location_; }
-
- // Used to start/stop watching the file descriptor while keeping the
- // IOTask entry available.
- bool StartWatching();
- void StopWatching();
-
- // Called from the message loop as a PostTask() when the file descriptor is
- // available, scheduled to run from OnFileReady().
- void OnFileReadyPostedTask();
-
- // Cancel the IOTask and returns whether it was actually canceled, with the
- // same semantics as MessageLoop::CancelTask().
- bool CancelTask();
-
- // Sets the closure to be run immediately whenever the file descriptor
- // becomes ready.
- void RunImmediately() { immediate_run_ = true; }
-
- private:
- base::Location location_;
- BaseMessageLoop* loop_;
-
- // These are the arguments passed in the constructor, basically forwarding
- // all the arguments passed to WatchFileDescriptor() plus the assigned
- // TaskId for this task.
- MessageLoop::TaskId task_id_;
- int fd_;
- base::MessagePumpForIO::Mode base_mode_;
- bool persistent_;
- base::Closure closure_;
-
- base::MessagePumpForIO::FdWatchController fd_watcher_;
-
- // Tells whether there is a pending call to OnFileReadPostedTask().
- bool posted_task_pending_{false};
-
- // Whether the registered callback should be running immediately when the
- // file descriptor is ready, as opposed to posting a task to the main loop
- // to prevent starvation.
- bool immediate_run_{false};
-
- // base::MessageLoopForIO::Watcher overrides:
- void OnFileCanReadWithoutBlocking(int fd) override;
- void OnFileCanWriteWithoutBlocking(int fd) override;
-
- // Common implementation for both the read and write case.
- void OnFileReady();
-
- DISALLOW_COPY_AND_ASSIGN(IOTask);
- };
-
// The base::MessageLoopForIO instance owned by this class, if any. This
// is declared first in this class so it is destroyed last.
std::unique_ptr<base::MessageLoopForIO> owned_base_loop_;
@@ -163,9 +89,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop {
// Tasks blocked on a timeout.
std::map<MessageLoop::TaskId, DelayedTask> delayed_tasks_;
- // Tasks blocked on I/O.
- std::map<MessageLoop::TaskId, IOTask> io_tasks_;
-
// Flag to mark that we should run the message loop only one iteration.
bool run_once_{false};
diff --git a/brillo/message_loops/fake_message_loop.cc b/brillo/message_loops/fake_message_loop.cc
index 41f5b51..9ab2aa9 100644
--- a/brillo/message_loops/fake_message_loop.cc
+++ b/brillo/message_loops/fake_message_loop.cc
@@ -33,20 +33,6 @@ MessageLoop::TaskId FakeMessageLoop::PostDelayedTask(
return current_id;
}
-MessageLoop::TaskId FakeMessageLoop::WatchFileDescriptor(
- const base::Location& from_here,
- int fd,
- WatchMode mode,
- bool persistent,
- const base::Closure& task) {
- MessageLoop::TaskId current_id = ++last_id_;
- // FakeMessageLoop is limited to only 2^64 tasks. That should be enough.
- CHECK(current_id);
- tasks_.emplace(current_id, ScheduledTask{from_here, persistent, task});
- fds_watched_.emplace(std::make_pair(fd, mode), current_id);
- return current_id;
-}
-
bool FakeMessageLoop::CancelTask(TaskId task_id) {
if (task_id == MessageLoop::kTaskIdNull)
return false;
@@ -58,36 +44,6 @@ bool FakeMessageLoop::CancelTask(TaskId task_id) {
bool FakeMessageLoop::RunOnce(bool may_block) {
if (test_clock_)
current_time_ = test_clock_->Now();
- // Try to fire ready file descriptors first.
- for (const auto& fd_mode : fds_ready_) {
- const auto& fd_watched = fds_watched_.find(fd_mode);
- if (fd_watched == fds_watched_.end())
- continue;
- // The fd_watched->second task might have been canceled and we never removed
- // it from the fds_watched_, so we fix that now.
- const auto& scheduled_task_ref = tasks_.find(fd_watched->second);
- if (scheduled_task_ref == tasks_.end()) {
- fds_watched_.erase(fd_watched);
- continue;
- }
- VLOG_LOC(scheduled_task_ref->second.location, 1)
- << "Running task_id " << fd_watched->second
- << " for watching file descriptor " << fd_mode.first << " for "
- << (fd_mode.second == MessageLoop::kWatchRead ? "reading" : "writing")
- << (scheduled_task_ref->second.persistent ?
- " persistently" : " just once")
- << " scheduled from this location.";
- if (scheduled_task_ref->second.persistent) {
- scheduled_task_ref->second.callback.Run();
- } else {
- base::Closure callback = std::move(scheduled_task_ref->second.callback);
- tasks_.erase(scheduled_task_ref);
- fds_watched_.erase(fd_watched);
- callback.Run();
- }
- return true;
- }
-
// Try to fire time-based callbacks afterwards.
while (!fire_order_.empty() &&
(may_block || fire_order_.top().first <= current_time_)) {
@@ -120,15 +76,6 @@ bool FakeMessageLoop::RunOnce(bool may_block) {
return false;
}
-void FakeMessageLoop::SetFileDescriptorReadiness(int fd,
- WatchMode mode,
- bool ready) {
- if (ready)
- fds_ready_.emplace(fd, mode);
- else
- fds_ready_.erase(std::make_pair(fd, mode));
-}
-
bool FakeMessageLoop::PendingTasks() {
for (const auto& task : tasks_) {
VLOG_LOC(task.second.location, 1)
diff --git a/brillo/message_loops/fake_message_loop.h b/brillo/message_loops/fake_message_loop.h
index 4b6e8ac..45bab81 100644
--- a/brillo/message_loops/fake_message_loop.h
+++ b/brillo/message_loops/fake_message_loop.h
@@ -40,23 +40,11 @@ class BRILLO_EXPORT FakeMessageLoop : public MessageLoop {
const base::Closure& task,
base::TimeDelta delay) override;
using MessageLoop::PostDelayedTask;
- TaskId WatchFileDescriptor(const base::Location& from_here,
- int fd,
- WatchMode mode,
- bool persistent,
- const base::Closure& task) override;
- using MessageLoop::WatchFileDescriptor;
bool CancelTask(TaskId task_id) override;
bool RunOnce(bool may_block) override;
// FakeMessageLoop methods:
- // Pretend, for the purpose of the FakeMessageLoop watching for file
- // descriptors, that the file descriptor |fd| readiness to perform the
- // operation described by |mode| is |ready|. Initially, no file descriptor
- // is ready for any operation.
- void SetFileDescriptorReadiness(int fd, WatchMode mode, bool ready);
-
// Return whether there are peding tasks. Useful to check that no
// callbacks were leaked.
bool PendingTasks();
@@ -79,13 +67,6 @@ class BRILLO_EXPORT FakeMessageLoop : public MessageLoop {
std::vector<std::pair<base::Time, MessageLoop::TaskId>>,
std::greater<std::pair<base::Time, MessageLoop::TaskId>>> fire_order_;
- // The bag of watched (fd, mode) pair associated with the TaskId that's
- // watching them.
- std::multimap<std::pair<int, WatchMode>, MessageLoop::TaskId> fds_watched_;
-
- // The set of (fd, mode) pairs that are faked as ready.
- std::set<std::pair<int, WatchMode>> fds_ready_;
-
base::SimpleTestClock* test_clock_ = nullptr;
base::Time current_time_ = base::Time::FromDoubleT(1246996800.);
diff --git a/brillo/message_loops/fake_message_loop_test.cc b/brillo/message_loops/fake_message_loop_test.cc
index b4b839c..07af569 100644
--- a/brillo/message_loops/fake_message_loop_test.cc
+++ b/brillo/message_loops/fake_message_loop_test.cc
@@ -84,34 +84,6 @@ TEST_F(FakeMessageLoopTest, PostDelayedTaskAdvancesTheTime) {
EXPECT_EQ(start + TimeDelta::FromSeconds(3), clock_.Now());
}
-TEST_F(FakeMessageLoopTest, WatchFileDescriptorWaits) {
- int fd = 1234;
- // We will simulate this situation. At the beginning, we will watch for a
- // file descriptor that won't trigger for 10s. Then we will pretend it is
- // ready after 10s and expect its callback to run just once.
- int called = 0;
- TaskId task_id = loop_->WatchFileDescriptor(
- FROM_HERE, fd, MessageLoop::kWatchRead, false,
- Bind([](int* called) { (*called)++; }, base::Unretained(&called)));
- EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
-
- EXPECT_NE(MessageLoop::kTaskIdNull,
- loop_->PostDelayedTask(Bind(&FakeMessageLoop::BreakLoop,
- base::Unretained(loop_.get())),
- TimeDelta::FromSeconds(10)));
- EXPECT_NE(MessageLoop::kTaskIdNull,
- loop_->PostDelayedTask(Bind(&FakeMessageLoop::BreakLoop,
- base::Unretained(loop_.get())),
- TimeDelta::FromSeconds(20)));
- loop_->Run();
- EXPECT_EQ(0, called);
-
- loop_->SetFileDescriptorReadiness(fd, MessageLoop::kWatchRead, true);
- loop_->Run();
- EXPECT_EQ(1, called);
- EXPECT_FALSE(loop_->CancelTask(task_id));
-}
-
TEST_F(FakeMessageLoopTest, PendingTasksTest) {
loop_->PostDelayedTask(base::DoNothing(), TimeDelta::FromSeconds(1));
EXPECT_TRUE(loop_->PendingTasks());
diff --git a/brillo/message_loops/message_loop.h b/brillo/message_loops/message_loop.h
index e9f804e..0f9740d 100644
--- a/brillo/message_loops/message_loop.h
+++ b/brillo/message_loops/message_loop.h
@@ -67,33 +67,6 @@ class BRILLO_EXPORT MessageLoop {
return PostDelayedTask(from_here, task, base::TimeDelta());
}
- // Watch mode flag used to watch for file descriptors.
- enum WatchMode {
- kWatchRead,
- kWatchWrite,
- };
-
- // Watch a file descriptor |fd| for it to be ready to perform the operation
- // passed in |mode| without blocking. When that happens, the |task| closure
- // will be executed. If |persistent| is true, the file descriptor will
- // continue to be watched and |task| will continue to be called until the task
- // is canceled with CancelTask().
- // Returns the TaskId describing this task. In case of error, returns
- // kTaskIdNull.
- virtual TaskId WatchFileDescriptor(const base::Location& from_here,
- int fd,
- WatchMode mode,
- bool persistent,
- const base::Closure& task) = 0;
-
- // Convenience function to call WatchFileDescriptor() without a location.
- TaskId WatchFileDescriptor(int fd,
- WatchMode mode,
- bool persistent,
- const base::Closure& task) {
- return WatchFileDescriptor(base::Location(), fd, mode, persistent, task);
- }
-
// Cancel a scheduled task. Returns whether the task was canceled. For
// example, if the callback was already executed (or is being executed) or was
// already canceled this method will fail. Note that the TaskId can be reused
diff --git a/brillo/message_loops/message_loop_test.cc b/brillo/message_loops/message_loop_test.cc
index 7b57015..45a3b95 100644
--- a/brillo/message_loops/message_loop_test.cc
+++ b/brillo/message_loops/message_loop_test.cc
@@ -36,10 +36,6 @@ bool ReturnBool(bool *b) {
return *b;
}
-void Increment(int* i) {
- (*i)++;
-}
-
} // namespace
namespace brillo {
@@ -122,115 +118,6 @@ TYPED_TEST(MessageLoopTest, PostDelayedTaskWithoutLocation) {
EXPECT_EQ(1, MessageLoopRunMaxIterations(this->loop_.get(), 100));
}
-TYPED_TEST(MessageLoopTest, WatchForInvalidFD) {
- bool called = false;
- EXPECT_EQ(MessageLoop::kTaskIdNull, this->loop_->WatchFileDescriptor(
- FROM_HERE, -1, MessageLoop::kWatchRead, true,
- Bind(&SetToTrue, &called)));
- EXPECT_EQ(MessageLoop::kTaskIdNull, this->loop_->WatchFileDescriptor(
- FROM_HERE, -1, MessageLoop::kWatchWrite, true,
- Bind(&SetToTrue, &called)));
- EXPECT_EQ(0, MessageLoopRunMaxIterations(this->loop_.get(), 100));
- EXPECT_FALSE(called);
-}
-
-TYPED_TEST(MessageLoopTest, CancelWatchedFileDescriptor) {
- ScopedPipe pipe;
- bool called = false;
- TaskId task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipe.reader, MessageLoop::kWatchRead, true,
- Bind(&SetToTrue, &called));
- EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
- // The reader end is blocked because we didn't write anything to the writer
- // end.
- EXPECT_EQ(0, MessageLoopRunMaxIterations(this->loop_.get(), 100));
- EXPECT_FALSE(called);
- EXPECT_TRUE(this->loop_->CancelTask(task_id));
-}
-
-// When you watch a file descriptor for reading, the guaranties are that a
-// blocking call to read() on that file descriptor will not block. This should
-// include the case when the other end of a pipe is closed or the file is empty.
-TYPED_TEST(MessageLoopTest, WatchFileDescriptorTriggersWhenPipeClosed) {
- ScopedPipe pipe;
- bool called = false;
- EXPECT_EQ(0, HANDLE_EINTR(close(pipe.writer)));
- pipe.writer = -1;
- TaskId task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipe.reader, MessageLoop::kWatchRead, true,
- Bind(&SetToTrue, &called));
- EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
- // The reader end is not blocked because we closed the writer end so a read on
- // the reader end would return 0 bytes read.
- EXPECT_NE(0, MessageLoopRunMaxIterations(this->loop_.get(), 10));
- EXPECT_TRUE(called);
- EXPECT_TRUE(this->loop_->CancelTask(task_id));
-}
-
-// When a WatchFileDescriptor task is scheduled with |persistent| = true, we
-// should keep getting a call whenever the file descriptor is ready.
-TYPED_TEST(MessageLoopTest, WatchFileDescriptorPersistently) {
- ScopedPipe pipe;
- EXPECT_EQ(1, HANDLE_EINTR(write(pipe.writer, "a", 1)));
-
- int called = 0;
- TaskId task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipe.reader, MessageLoop::kWatchRead, true,
- Bind(&Increment, &called));
- EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
- // We let the main loop run for 20 iterations to give it enough iterations to
- // verify that our callback was called more than one. We only check that our
- // callback is called more than once.
- EXPECT_EQ(20, MessageLoopRunMaxIterations(this->loop_.get(), 20));
- EXPECT_LT(1, called);
- EXPECT_TRUE(this->loop_->CancelTask(task_id));
-}
-
-TYPED_TEST(MessageLoopTest, WatchFileDescriptorNonPersistent) {
- ScopedPipe pipe;
- EXPECT_EQ(1, HANDLE_EINTR(write(pipe.writer, "a", 1)));
-
- int called = 0;
- TaskId task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipe.reader, MessageLoop::kWatchRead, false,
- Bind(&Increment, &called));
- EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
- // We let the main loop run for 20 iterations but we just expect it to run
- // at least once. The callback should be called exactly once since we
- // scheduled it non-persistently. After it ran, we shouldn't be able to cancel
- // this task.
- EXPECT_LT(0, MessageLoopRunMaxIterations(this->loop_.get(), 20));
- EXPECT_EQ(1, called);
- EXPECT_FALSE(this->loop_->CancelTask(task_id));
-}
-
-TYPED_TEST(MessageLoopTest, WatchFileDescriptorForReadAndWriteSimultaneously) {
- ScopedSocketPair socks;
- EXPECT_EQ(1, HANDLE_EINTR(write(socks.right, "a", 1)));
- // socks.left should be able to read this "a" and should also be able to write
- // without blocking since the kernel has some buffering for it.
-
- TaskId read_task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, socks.left, MessageLoop::kWatchRead, true,
- Bind([] (MessageLoop* loop, TaskId* read_task_id) {
- EXPECT_TRUE(loop->CancelTask(*read_task_id))
- << "task_id" << *read_task_id;
- }, this->loop_.get(), &read_task_id));
- EXPECT_NE(MessageLoop::kTaskIdNull, read_task_id);
-
- TaskId write_task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, socks.left, MessageLoop::kWatchWrite, true,
- Bind([] (MessageLoop* loop, TaskId* write_task_id) {
- EXPECT_TRUE(loop->CancelTask(*write_task_id));
- }, this->loop_.get(), &write_task_id));
- EXPECT_NE(MessageLoop::kTaskIdNull, write_task_id);
-
- EXPECT_LT(0, MessageLoopRunMaxIterations(this->loop_.get(), 20));
-
- EXPECT_FALSE(this->loop_->CancelTask(read_task_id));
- EXPECT_FALSE(this->loop_->CancelTask(write_task_id));
-}
-
// Test that we can cancel the task we are running, and should just fail.
TYPED_TEST(MessageLoopTest, DeleteTaskFromSelf) {
bool cancel_result = true; // We would expect this to be false.
@@ -244,129 +131,4 @@ TYPED_TEST(MessageLoopTest, DeleteTaskFromSelf) {
EXPECT_FALSE(cancel_result);
}
-// Test that we can cancel a non-persistent file descriptor watching callback,
-// which should fail.
-TYPED_TEST(MessageLoopTest, DeleteNonPersistenIOTaskFromSelf) {
- ScopedPipe pipe;
- TaskId task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipe.writer, MessageLoop::kWatchWrite, false /* persistent */,
- Bind([](MessageLoop* loop, TaskId* task_id) {
- EXPECT_FALSE(loop->CancelTask(*task_id));
- *task_id = MessageLoop::kTaskIdNull;
- }, this->loop_.get(), &task_id));
- EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
- EXPECT_EQ(1, MessageLoopRunMaxIterations(this->loop_.get(), 100));
- EXPECT_EQ(MessageLoop::kTaskIdNull, task_id);
-}
-
-// Test that we can cancel a persistent file descriptor watching callback from
-// the same callback.
-TYPED_TEST(MessageLoopTest, DeletePersistenIOTaskFromSelf) {
- ScopedPipe pipe;
- TaskId task_id = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipe.writer, MessageLoop::kWatchWrite, true /* persistent */,
- Bind([](MessageLoop* loop, TaskId* task_id) {
- EXPECT_TRUE(loop->CancelTask(*task_id));
- *task_id = MessageLoop::kTaskIdNull;
- }, this->loop_.get(), &task_id));
- EXPECT_NE(MessageLoop::kTaskIdNull, task_id);
- EXPECT_EQ(1, MessageLoopRunMaxIterations(this->loop_.get(), 100));
- EXPECT_EQ(MessageLoop::kTaskIdNull, task_id);
-}
-
-// Test that we can cancel several persistent file descriptor watching callbacks
-// from a scheduled callback. In the BaseMessageLoop implementation, this code
-// will cause us to cancel an IOTask that has a pending delayed task, but
-// otherwise is a valid test case on all implementations.
-TYPED_TEST(MessageLoopTest, DeleteAllPersistenIOTaskFromSelf) {
- const int kNumTasks = 5;
- ScopedPipe pipes[kNumTasks];
- TaskId task_ids[kNumTasks];
-
- for (int i = 0; i < kNumTasks; ++i) {
- task_ids[i] = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipes[i].writer, MessageLoop::kWatchWrite,
- true /* persistent */,
- Bind([] (MessageLoop* loop, TaskId* task_ids) {
- for (int j = 0; j < kNumTasks; ++j) {
- // Once we cancel all the tasks, none should run, so this code runs
- // only once from one callback.
- EXPECT_TRUE(loop->CancelTask(task_ids[j]));
- task_ids[j] = MessageLoop::kTaskIdNull;
- }
- }, this->loop_.get(), task_ids));
- }
- MessageLoopRunMaxIterations(this->loop_.get(), 100);
- for (int i = 0; i < kNumTasks; ++i) {
- EXPECT_EQ(MessageLoop::kTaskIdNull, task_ids[i]);
- }
-}
-
-// Test that if there are several tasks watching for file descriptors to be
-// available or simply waiting in the message loop are fairly scheduled to run.
-// In other words, this test ensures that having a file descriptor always
-// available doesn't prevent other file descriptors watching tasks or delayed
-// tasks to be dispatched, causing starvation.
-TYPED_TEST(MessageLoopTest, AllTasksAreEqual) {
- int total_calls = 0;
-
- // First, schedule a repeating timeout callback to run from the main loop.
- int timeout_called = 0;
- base::Closure timeout_callback;
- MessageLoop::TaskId timeout_task;
- timeout_callback = base::Bind(
- [](MessageLoop* loop, int* timeout_called, int* total_calls,
- base::Closure* timeout_callback, MessageLoop::TaskId* timeout_task) {
- (*timeout_called)++;
- (*total_calls)++;
- *timeout_task = loop->PostTask(FROM_HERE, *timeout_callback);
- if (*total_calls > 100)
- loop->BreakLoop();
- },
- this->loop_.get(), &timeout_called, &total_calls, &timeout_callback,
- &timeout_task);
- timeout_task = this->loop_->PostTask(FROM_HERE, timeout_callback);
-
- // Second, schedule several file descriptor watchers.
- const int kNumTasks = 3;
- ScopedPipe pipes[kNumTasks];
- MessageLoop::TaskId tasks[kNumTasks];
-
- int reads[kNumTasks] = {};
- base::Callback<void(int)> fd_callback = base::Bind(
- [](MessageLoop* loop, ScopedPipe* pipes, int* reads,
- int* total_calls, int i) {
- reads[i]++;
- (*total_calls)++;
- char c;
- EXPECT_EQ(1, HANDLE_EINTR(read(pipes[i].reader, &c, 1)));
- if (*total_calls > 100)
- loop->BreakLoop();
- }, this->loop_.get(), pipes, reads, &total_calls);
-
- for (int i = 0; i < kNumTasks; ++i) {
- tasks[i] = this->loop_->WatchFileDescriptor(
- FROM_HERE, pipes[i].reader, MessageLoop::kWatchRead,
- true /* persistent */,
- Bind(fd_callback, i));
- // Make enough bytes available on each file descriptor. This should not
- // block because we set the size of the file descriptor buffer when
- // creating it.
- std::vector<char> blob(1000, 'a');
- EXPECT_EQ(blob.size(),
- HANDLE_EINTR(write(pipes[i].writer, blob.data(), blob.size())));
- }
- this->loop_->Run();
- EXPECT_GT(total_calls, 100);
- // We run the loop up 100 times and expect each callback to run at least 10
- // times. A good scheduler should balance these callbacks.
- EXPECT_GE(timeout_called, 10);
- EXPECT_TRUE(this->loop_->CancelTask(timeout_task));
- for (int i = 0; i < kNumTasks; ++i) {
- EXPECT_GE(reads[i], 10) << "Reading from pipes[" << i << "], fd "
- << pipes[i].reader;
- EXPECT_TRUE(this->loop_->CancelTask(tasks[i]));
- }
-}
-
} // namespace brillo
diff --git a/brillo/message_loops/mock_message_loop.h b/brillo/message_loops/mock_message_loop.h
index c84e585..30a19b8 100644
--- a/brillo/message_loops/mock_message_loop.h
+++ b/brillo/message_loops/mock_message_loop.h
@@ -40,14 +40,6 @@ class BRILLO_EXPORT MockMessageLoop : public MessageLoop {
const base::Closure&,
base::TimeDelta)>(
&FakeMessageLoop::PostDelayedTask)));
- ON_CALL(*this, WatchFileDescriptor(
- ::testing::_, ::testing::_, ::testing::_, ::testing::_, ::testing::_))
- .WillByDefault(::testing::Invoke(
- &fake_loop_,
- static_cast<TaskId(FakeMessageLoop::*)(
- const base::Location&, int, WatchMode, bool,
- const base::Closure&)>(
- &FakeMessageLoop::WatchFileDescriptor)));
ON_CALL(*this, CancelTask(::testing::_))
.WillByDefault(::testing::Invoke(&fake_loop_,
&FakeMessageLoop::CancelTask));
@@ -62,12 +54,6 @@ class BRILLO_EXPORT MockMessageLoop : public MessageLoop {
(const base::Location&, const base::Closure&, base::TimeDelta),
(override));
using MessageLoop::PostDelayedTask;
- MOCK_METHOD(
- TaskId,
- WatchFileDescriptor,
- (const base::Location&, int, WatchMode, bool, const base::Closure&),
- (override));
- using MessageLoop::WatchFileDescriptor;
MOCK_METHOD(bool, CancelTask, (TaskId), (override));
MOCK_METHOD(bool, RunOnce, (bool), (override));
diff --git a/brillo/streams/file_stream.cc b/brillo/streams/file_stream.cc
index db22192..70b25dd 100644
--- a/brillo/streams/file_stream.cc
+++ b/brillo/streams/file_stream.cc
@@ -12,6 +12,7 @@
#include <utility>
#include <base/bind.h>
+#include <base/files/file_descriptor_watcher_posix.h>
#include <base/files/file_util.h>
#include <base/posix/eintr_wrapper.h>
#include <brillo/errors/error_codes.h>
@@ -86,15 +87,11 @@ class FileDescriptor : public FileStream::FileDescriptorInterface {
ErrorPtr* error) override {
if (stream_utils::IsReadAccessMode(mode)) {
CHECK(read_data_callback_.is_null());
- MessageLoop::current()->CancelTask(read_watcher_);
- read_watcher_ = MessageLoop::current()->WatchFileDescriptor(
- FROM_HERE,
+ read_watcher_ = base::FileDescriptorWatcher::WatchReadable(
fd_,
- MessageLoop::WatchMode::kWatchRead,
- false, // persistent
- base::Bind(&FileDescriptor::OnFileCanReadWithoutBlocking,
- base::Unretained(this)));
- if (read_watcher_ == MessageLoop::kTaskIdNull) {
+ base::BindRepeating(&FileDescriptor::OnReadable,
+ base::Unretained(this)));
+ if (!read_watcher_) {
Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
errors::stream::kInvalidParameter,
"File descriptor doesn't support watching for reading.");
@@ -104,15 +101,11 @@ class FileDescriptor : public FileStream::FileDescriptorInterface {
}
if (stream_utils::IsWriteAccessMode(mode)) {
CHECK(write_data_callback_.is_null());
- MessageLoop::current()->CancelTask(write_watcher_);
- write_watcher_ = MessageLoop::current()->WatchFileDescriptor(
- FROM_HERE,
+ write_watcher_ = base::FileDescriptorWatcher::WatchWritable(
fd_,
- MessageLoop::WatchMode::kWatchWrite,
- false, // persistent
- base::Bind(&FileDescriptor::OnFileCanWriteWithoutBlocking,
- base::Unretained(this)));
- if (write_watcher_ == MessageLoop::kTaskIdNull) {
+ base::BindRepeating(&FileDescriptor::OnWritable,
+ base::Unretained(this)));
+ if (!write_watcher_) {
Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
errors::stream::kInvalidParameter,
"File descriptor doesn't support watching for writing.");
@@ -157,31 +150,26 @@ class FileDescriptor : public FileStream::FileDescriptorInterface {
void CancelPendingAsyncOperations() override {
read_data_callback_.Reset();
- if (read_watcher_ != MessageLoop::kTaskIdNull) {
- MessageLoop::current()->CancelTask(read_watcher_);
- read_watcher_ = MessageLoop::kTaskIdNull;
- }
-
+ read_watcher_ = nullptr;
write_data_callback_.Reset();
- if (write_watcher_ != MessageLoop::kTaskIdNull) {
- MessageLoop::current()->CancelTask(write_watcher_);
- write_watcher_ = MessageLoop::kTaskIdNull;
- }
+ write_watcher_ = nullptr;
}
// Called from the brillo::MessageLoop when the file descriptor is available
// for reading.
- void OnFileCanReadWithoutBlocking() {
+ void OnReadable() {
CHECK(!read_data_callback_.is_null());
- DataCallback cb = read_data_callback_;
- read_data_callback_.Reset();
+
+ read_watcher_ = nullptr;
+ DataCallback cb = std::move(read_data_callback_);
cb.Run(Stream::AccessMode::READ);
}
- void OnFileCanWriteWithoutBlocking() {
+ void OnWritable() {
CHECK(!write_data_callback_.is_null());
- DataCallback cb = write_data_callback_;
- write_data_callback_.Reset();
+
+ write_watcher_ = nullptr;
+ DataCallback cb = std::move(write_data_callback_);
cb.Run(Stream::AccessMode::WRITE);
}
@@ -200,9 +188,9 @@ class FileDescriptor : public FileStream::FileDescriptorInterface {
DataCallback read_data_callback_;
DataCallback write_data_callback_;
- // MessageLoop tasks monitoring read/write operations on the file descriptor.
- MessageLoop::TaskId read_watcher_{MessageLoop::kTaskIdNull};
- MessageLoop::TaskId write_watcher_{MessageLoop::kTaskIdNull};
+ // Monitoring read/write operations on the file descriptor.
+ std::unique_ptr<base::FileDescriptorWatcher::Controller> read_watcher_;
+ std::unique_ptr<base::FileDescriptorWatcher::Controller> write_watcher_;
DISALLOW_COPY_AND_ASSIGN(FileDescriptor);
};