diff options
author | Hidehiko Abe <hidehiko@chromium.org> | 2020-07-28 07:59:13 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2020-07-28 07:59:13 +0000 |
commit | d4bf0e9fd29413d3d07b98da2811a4f6b8efd97b (patch) | |
tree | c081eed0301e2af00a8d116ba23d6f62c3105e47 | |
parent | 6565bfd206ab8dac0e31bb11ce15b1e09cb30fbb (diff) | |
parent | 48220c6930cca0f9fc1c341289c7d8bc128df542 (diff) | |
download | libbrillo-d4bf0e9fd29413d3d07b98da2811a4f6b8efd97b.tar.gz |
libbrillo: Remove WatchFileDescriptor. am: 295ef84cb2 am: 4b4f0480d0 am: 48220c6930
Original change: https://android-review.googlesource.com/c/platform/external/libbrillo/+/1371782
Change-Id: I57b6119e1da1e080a24298cb4f24e21c6537d7bb
-rw-r--r-- | brillo/asynchronous_signal_handler.cc | 29 | ||||
-rw-r--r-- | brillo/asynchronous_signal_handler.h | 8 | ||||
-rw-r--r-- | brillo/binder_watcher.cc | 26 | ||||
-rw-r--r-- | brillo/binder_watcher.h | 10 | ||||
-rw-r--r-- | brillo/http/http_transport_curl.cc | 79 | ||||
-rw-r--r-- | brillo/message_loops/base_message_loop.cc | 230 | ||||
-rw-r--r-- | brillo/message_loops/base_message_loop.h | 77 | ||||
-rw-r--r-- | brillo/message_loops/fake_message_loop.cc | 53 | ||||
-rw-r--r-- | brillo/message_loops/fake_message_loop.h | 19 | ||||
-rw-r--r-- | brillo/message_loops/fake_message_loop_test.cc | 28 | ||||
-rw-r--r-- | brillo/message_loops/message_loop.h | 27 | ||||
-rw-r--r-- | brillo/message_loops/message_loop_test.cc | 238 | ||||
-rw-r--r-- | brillo/message_loops/mock_message_loop.h | 14 | ||||
-rw-r--r-- | brillo/streams/file_stream.cc | 56 |
14 files changed, 86 insertions, 808 deletions
diff --git a/brillo/asynchronous_signal_handler.cc b/brillo/asynchronous_signal_handler.cc index b8ec529..d5fed50 100644 --- a/brillo/asynchronous_signal_handler.cc +++ b/brillo/asynchronous_signal_handler.cc @@ -11,7 +11,6 @@ #include <base/bind.h> #include <base/files/file_util.h> #include <base/logging.h> -#include <base/message_loop/message_loop.h> #include <base/posix/eintr_wrapper.h> namespace { @@ -28,15 +27,15 @@ AsynchronousSignalHandler::AsynchronousSignalHandler() } AsynchronousSignalHandler::~AsynchronousSignalHandler() { - if (descriptor_ != kInvalidDescriptor) { - MessageLoop::current()->CancelTask(fd_watcher_task_); + fd_watcher_ = nullptr; - if (IGNORE_EINTR(close(descriptor_)) != 0) - PLOG(WARNING) << "Failed to close file descriptor"; + if (descriptor_ == kInvalidDescriptor) + return; - descriptor_ = kInvalidDescriptor; - CHECK_EQ(0, sigprocmask(SIG_SETMASK, &saved_signal_mask_, nullptr)); - } + if (IGNORE_EINTR(close(descriptor_)) != 0) + PLOG(WARNING) << "Failed to close file descriptor"; + descriptor_ = kInvalidDescriptor; + CHECK_EQ(0, sigprocmask(SIG_SETMASK, &saved_signal_mask_, nullptr)); } void AsynchronousSignalHandler::Init() { @@ -45,15 +44,11 @@ void AsynchronousSignalHandler::Init() { descriptor_ = signalfd(descriptor_, &signal_mask_, SFD_CLOEXEC | SFD_NONBLOCK); CHECK_NE(kInvalidDescriptor, descriptor_); - fd_watcher_task_ = MessageLoop::current()->WatchFileDescriptor( - FROM_HERE, + fd_watcher_ = base::FileDescriptorWatcher::WatchReadable( descriptor_, - MessageLoop::WatchMode::kWatchRead, - true, - base::Bind(&AsynchronousSignalHandler::OnFileCanReadWithoutBlocking, - base::Unretained(this))); - CHECK(fd_watcher_task_ != MessageLoop::kTaskIdNull) - << "Watching shutdown pipe failed."; + base::BindRepeating(&AsynchronousSignalHandler::OnReadable, + base::Unretained(this))); + CHECK(fd_watcher_) << "Watching shutdown pipe failed."; } void AsynchronousSignalHandler::RegisterHandler(int signal, @@ -71,7 +66,7 @@ void AsynchronousSignalHandler::UnregisterHandler(int signal) { } } -void AsynchronousSignalHandler::OnFileCanReadWithoutBlocking() { +void AsynchronousSignalHandler::OnReadable() { struct signalfd_siginfo info; while (base::ReadFromFD(descriptor_, reinterpret_cast<char*>(&info), sizeof(info))) { diff --git a/brillo/asynchronous_signal_handler.h b/brillo/asynchronous_signal_handler.h index ceae1ff..903e2ef 100644 --- a/brillo/asynchronous_signal_handler.h +++ b/brillo/asynchronous_signal_handler.h @@ -9,14 +9,14 @@ #include <sys/signalfd.h> #include <map> +#include <memory> #include <base/callback.h> #include <base/compiler_specific.h> +#include <base/files/file_descriptor_watcher_posix.h> #include <base/macros.h> -#include <base/message_loop/message_loop.h> #include <brillo/asynchronous_signal_handler_interface.h> #include <brillo/brillo_export.h> -#include <brillo/message_loops/message_loop.h> namespace brillo { // Sets up signal handlers for registered signals, and converts signal receipt @@ -40,10 +40,10 @@ class BRILLO_EXPORT AsynchronousSignalHandler final : private: // Called from the main loop when we can read from |descriptor_|, indicated // that a signal was processed. - void OnFileCanReadWithoutBlocking(); + void OnReadable(); // Controller used to manage watching of signalling pipe. - MessageLoop::TaskId fd_watcher_task_{MessageLoop::kTaskIdNull}; + std::unique_ptr<base::FileDescriptorWatcher::Controller> fd_watcher_; // The registered callbacks. typedef std::map<int, SignalHandler> Callbacks; diff --git a/brillo/binder_watcher.cc b/brillo/binder_watcher.cc index 9752204..51b0f59 100644 --- a/brillo/binder_watcher.cc +++ b/brillo/binder_watcher.cc @@ -33,24 +33,11 @@ void OnBinderReadReady() { namespace brillo { -BinderWatcher::BinderWatcher(MessageLoop* message_loop) - : message_loop_(message_loop) {} +BinderWatcher::BinderWatcher() = default; -BinderWatcher::BinderWatcher() : message_loop_(nullptr) {} - -BinderWatcher::~BinderWatcher() { - if (task_id_ != MessageLoop::kTaskIdNull) - message_loop_->CancelTask(task_id_); -} +BinderWatcher::~BinderWatcher() = default; bool BinderWatcher::Init() { - if (!message_loop_) - message_loop_ = MessageLoop::current(); - if (!message_loop_) { - LOG(ERROR) << "Must initialize a brillo::MessageLoop to use BinderWatcher"; - return false; - } - int binder_fd = -1; ProcessState::self()->setThreadPoolMaxThreadCount(0); IPCThreadState::self()->disableBackgroundScheduling(true); @@ -66,13 +53,10 @@ bool BinderWatcher::Init() { } VLOG(1) << "Got binder FD " << binder_fd; - task_id_ = message_loop_->WatchFileDescriptor( - FROM_HERE, + watcher_ = base::FileDescriptorWatcher::WatchReadable( binder_fd, - MessageLoop::kWatchRead, - true /* persistent */, - base::Bind(&OnBinderReadReady)); - if (task_id_ == MessageLoop::kTaskIdNull) { + base::BindRepeating(&OnBinderReadReady)); + if (!watcher_) { LOG(ERROR) << "Failed to watch binder FD"; return false; } diff --git a/brillo/binder_watcher.h b/brillo/binder_watcher.h index ece999d..d7af50e 100644 --- a/brillo/binder_watcher.h +++ b/brillo/binder_watcher.h @@ -17,8 +17,10 @@ #ifndef LIBBRILLO_BRILLO_BINDER_WATCHER_H_ #define LIBBRILLO_BRILLO_BINDER_WATCHER_H_ +#include <memory> + +#include <base/files/file_descriptor_watcher_posix.h> #include <base/macros.h> -#include <brillo/message_loops/message_loop.h> namespace brillo { @@ -26,9 +28,6 @@ namespace brillo { // make the message loop watch for binder events and pass them to libbinder. class BinderWatcher final { public: - // Construct the BinderWatcher using the passed |message_loop| if not null or - // the current MessageLoop otherwise. - explicit BinderWatcher(MessageLoop* message_loop); BinderWatcher(); ~BinderWatcher(); @@ -36,8 +35,7 @@ class BinderWatcher final { bool Init(); private: - MessageLoop::TaskId task_id_{MessageLoop::kTaskIdNull}; - MessageLoop* message_loop_; + std::unique_ptr<base::FileDescriptorWatcher::Controller> watcher_; DISALLOW_COPY_AND_ASSIGN(BinderWatcher); }; diff --git a/brillo/http/http_transport_curl.cc b/brillo/http/http_transport_curl.cc index 45a28a3..741cb3c 100644 --- a/brillo/http/http_transport_curl.cc +++ b/brillo/http/http_transport_curl.cc @@ -7,6 +7,7 @@ #include <limits> #include <base/bind.h> +#include <base/files/file_descriptor_watcher_posix.h> #include <base/files/file_util.h> #include <base/logging.h> #include <base/message_loop/message_loop.h> @@ -22,7 +23,7 @@ namespace curl { // This is a class that stores connection data on particular CURL socket // and provides file descriptor watcher to monitor read and/or write operations // on the socket's file descriptor. -class Transport::SocketPollData : public base::MessagePumpForIO::FdWatcher { +class Transport::SocketPollData { public: SocketPollData(const std::shared_ptr<CurlInterface>& curl_interface, CURLM* curl_multi_handle, @@ -31,27 +32,35 @@ class Transport::SocketPollData : public base::MessagePumpForIO::FdWatcher { : curl_interface_(curl_interface), curl_multi_handle_(curl_multi_handle), transport_(transport), - socket_fd_(socket_fd), - file_descriptor_watcher_(FROM_HERE) {} + socket_fd_(socket_fd) {} - // Returns the pointer for the socket-specific file descriptor watcher. - base::MessagePumpForIO::FdWatchController* GetWatcher() { - return &file_descriptor_watcher_; + void StopWatcher() { + read_watcher_ = nullptr; + write_watcher_ = nullptr; } - private: - // Overrides from base::MessagePumpForIO::Watcher. - void OnFileCanReadWithoutBlocking(int fd) override { - OnSocketReady(fd, CURL_CSELECT_IN); + bool WatchReadable() { + read_watcher_ = base::FileDescriptorWatcher::WatchReadable( + socket_fd_, + base::BindRepeating(&Transport::SocketPollData::OnSocketReady, + base::Unretained(this), + CURL_CSELECT_IN)); + return read_watcher_.get(); } - void OnFileCanWriteWithoutBlocking(int fd) override { - OnSocketReady(fd, CURL_CSELECT_OUT); + + bool WatchWritable() { + write_watcher_ = base::FileDescriptorWatcher::WatchWritable( + socket_fd_, + base::BindRepeating(&Transport::SocketPollData::OnSocketReady, + base::Unretained(this), + CURL_CSELECT_OUT)); + return write_watcher_.get(); } + private: // Data on the socket is available to be read from or written to. // Notify CURL of the action it needs to take on the socket file descriptor. - void OnSocketReady(int fd, int action) { - CHECK_EQ(socket_fd_, fd) << "Unexpected socket file descriptor"; + void OnSocketReady(int action) { int still_running_count = 0; CURLMcode code = curl_interface_->MultiSocketAction( curl_multi_handle_, socket_fd_, action, &still_running_count); @@ -70,8 +79,9 @@ class Transport::SocketPollData : public base::MessagePumpForIO::FdWatcher { Transport* transport_; // The socket file descriptor for the connection. curl_socket_t socket_fd_; - // File descriptor watcher to notify us of asynchronous I/O on the FD. - base::MessagePumpForIO::FdWatchController file_descriptor_watcher_; + + std::unique_ptr<base::FileDescriptorWatcher::Controller> read_watcher_; + std::unique_ptr<base::FileDescriptorWatcher::Controller> write_watcher_; DISALLOW_COPY_AND_ASSIGN(SocketPollData); }; @@ -386,7 +396,7 @@ int Transport::MultiSocketCallback(CURL* easy, // Make sure we stop watching the socket file descriptor now, before // we schedule the SocketPollData for deletion. - poll_data->GetWatcher()->StopWatchingFileDescriptor(); + poll_data->StopWatcher(); // This method can be called indirectly from SocketPollData::OnSocketReady, // so delay destruction of SocketPollData object till the next loop cycle. base::MessageLoopForIO::current()->task_runner()->DeleteSoon(FROM_HERE, @@ -394,34 +404,15 @@ int Transport::MultiSocketCallback(CURL* easy, return 0; } - base::MessagePumpForIO::Mode watch_mode = base::MessagePumpForIO::WATCH_READ; - switch (what) { - case CURL_POLL_IN: - watch_mode = base::MessagePumpForIO::WATCH_READ; - break; - case CURL_POLL_OUT: - watch_mode = base::MessagePumpForIO::WATCH_WRITE; - break; - case CURL_POLL_INOUT: - watch_mode = base::MessagePumpForIO::WATCH_READ_WRITE; - break; - default: - LOG(FATAL) << "Unknown CURL socket action: " << what; - break; - } + poll_data->StopWatcher(); + + bool success = true; + if (what == CURL_POLL_IN || what == CURL_POLL_INOUT) + success = poll_data->WatchReadable() && success; + if (what == CURL_POLL_OUT || what == CURL_POLL_INOUT) + success = poll_data->WatchWritable() && success; - // WatchFileDescriptor() can be called with the same controller object - // (watcher) to amend the watch mode, however this has cumulative effect. - // For example, if we were watching a file descriptor for READ operations - // and now call it to watch for WRITE, it will end up watching for both - // READ and WRITE. This is not what we want here, so stop watching the - // file descriptor on previous controller before starting with a different - // mode. - if (!poll_data->GetWatcher()->StopWatchingFileDescriptor()) - LOG(WARNING) << "Failed to stop watching the previous socket descriptor"; - CHECK(base::MessageLoopForIO::current()->WatchFileDescriptor( - s, true, watch_mode, poll_data->GetWatcher(), poll_data)) - << "Failed to watch the CURL socket."; + CHECK(success) << "Failed to watch the CURL socket."; return 0; } diff --git a/brillo/message_loops/base_message_loop.cc b/brillo/message_loops/base_message_loop.cc index 9a9e43f..0c2b9db 100644 --- a/brillo/message_loops/base_message_loop.cc +++ b/brillo/message_loops/base_message_loop.cc @@ -62,13 +62,6 @@ BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop) watcher_(std::make_unique<base::FileDescriptorWatcher>(base_loop_)) {} BaseMessageLoop::~BaseMessageLoop() { - for (auto& io_task : io_tasks_) { - DVLOG_LOC(io_task.second.location(), 1) - << "Removing file descriptor watcher task_id " << io_task.first - << " leaked on BaseMessageLoop, scheduled from this location."; - io_task.second.StopWatching(); - } - // Note all pending canceled delayed tasks when destroying the message loop. size_t lazily_deleted_tasks = 0; for (const auto& delayed_task : delayed_tasks_) { @@ -105,81 +98,13 @@ MessageLoop::TaskId BaseMessageLoop::PostDelayedTask( return task_id; } -MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor( - const base::Location& from_here, - int fd, - WatchMode mode, - bool persistent, - const Closure &task) { - // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here. - if (fd < 0) - return MessageLoop::kTaskIdNull; - - base::MessagePumpForIO::Mode base_mode = base::MessagePumpForIO::WATCH_READ; - switch (mode) { - case MessageLoop::kWatchRead: - base_mode = base::MessagePumpForIO::WATCH_READ; - break; - case MessageLoop::kWatchWrite: - base_mode = base::MessagePumpForIO::WATCH_WRITE; - break; - default: - return MessageLoop::kTaskIdNull; - } - - TaskId task_id = NextTaskId(); - auto it_bool = io_tasks_.emplace( - std::piecewise_construct, - std::forward_as_tuple(task_id), - std::forward_as_tuple( - from_here, this, task_id, fd, base_mode, persistent, task)); - // This should always insert a new element. - DCHECK(it_bool.second); - bool scheduled = it_bool.first->second.StartWatching(); - DVLOG_LOC(from_here, 1) - << "Watching fd " << fd << " for " - << (mode == MessageLoop::kWatchRead ? "reading" : "writing") - << (persistent ? " persistently" : " just once") - << " as task_id " << task_id - << (scheduled ? " successfully" : " failed."); - - if (!scheduled) { - io_tasks_.erase(task_id); - return MessageLoop::kTaskIdNull; - } - -#ifndef __ANDROID_HOST__ - // Determine if the passed fd is the binder file descriptor. For that, we need - // to check that is a special char device and that the major and minor device - // numbers match. The binder file descriptor can't be removed and added back - // to an epoll group when there's work available to be done by the file - // descriptor due to bugs in the binder driver (b/26524111) when used with - // epoll. Therefore, we flag the binder fd and never attempt to remove it. - // This may cause the binder file descriptor to be attended with higher - // priority and cause starvation of other events. - struct stat buf; - if (fstat(fd, &buf) == 0 && - S_ISCHR(buf.st_mode) && - major(buf.st_rdev) == MISC_MAJOR && - minor(buf.st_rdev) == GetBinderMinor()) { - it_bool.first->second.RunImmediately(); - } -#endif - - return task_id; -} - bool BaseMessageLoop::CancelTask(TaskId task_id) { if (task_id == kTaskIdNull) return false; auto delayed_task_it = delayed_tasks_.find(task_id); - if (delayed_task_it == delayed_tasks_.end()) { - // This might be an IOTask then. - auto io_task_it = io_tasks_.find(task_id); - if (io_task_it == io_tasks_.end()) - return false; - return io_task_it->second.CancelTask(); - } + if (delayed_task_it == delayed_tasks_.end()) + return false; + // A DelayedTask was found for this task_id at this point. // Check if the callback was already canceled but we have the entry in @@ -241,8 +166,7 @@ MessageLoop::TaskId BaseMessageLoop::NextTaskId() { res = ++last_id_; // We would run out of memory before we run out of task ids. } while (!res || - delayed_tasks_.find(res) != delayed_tasks_.end() || - io_tasks_.find(res) != io_tasks_.end()); + delayed_tasks_.find(res) != delayed_tasks_.end()); return res; } @@ -269,15 +193,6 @@ void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) { delayed_tasks_.erase(task_it); } -void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) { - auto task_it = io_tasks_.find(task_id); - // Even if this task was canceled while we were waiting in the message loop - // for this method to run, the entry in io_tasks_ should still be present, but - // won't do anything. - DCHECK(task_it != io_tasks_.end()); - task_it->second.OnFileReadyPostedTask(); -} - int BaseMessageLoop::ParseBinderMinor( const std::string& file_contents) { int result = kInvalidMinor; @@ -311,141 +226,4 @@ unsigned int BaseMessageLoop::GetBinderMinor() { return binder_minor_; } -BaseMessageLoop::IOTask::IOTask(const base::Location& location, - BaseMessageLoop* loop, - MessageLoop::TaskId task_id, - int fd, - base::MessagePumpForIO::Mode base_mode, - bool persistent, - const Closure& task) - : location_(location), loop_(loop), task_id_(task_id), - fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task), - fd_watcher_(FROM_HERE) {} - -bool BaseMessageLoop::IOTask::StartWatching() { - // Please see MessagePumpLibevent for definition. - static_assert(std::is_same<base::MessagePumpForIO, base::MessagePumpLibevent>::value, - "MessagePumpForIO::WatchFileDescriptor is not supported " - "when MessagePumpForIO is not a MessagePumpLibevent."); - - return static_cast<base::MessagePumpLibevent*>( - loop_->base_loop_->pump_.get())->WatchFileDescriptor( - fd_, persistent_, base_mode_, &fd_watcher_, this); -} - -void BaseMessageLoop::IOTask::StopWatching() { - // This is safe to call even if we are not watching for it. - fd_watcher_.StopWatchingFileDescriptor(); -} - -void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) { - OnFileReady(); -} - -void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) { - OnFileReady(); -} - -void BaseMessageLoop::IOTask::OnFileReady() { - // For file descriptors marked with the immediate_run flag, we don't call - // StopWatching() and wait, instead we dispatch the callback immediately. - if (immediate_run_) { - posted_task_pending_ = true; - OnFileReadyPostedTask(); - return; - } - - // When the file descriptor becomes available we stop watching for it and - // schedule a task to run the callback from the main loop. The callback will - // run using the same scheduler used to run other delayed tasks, avoiding - // starvation of the available posted tasks if there are file descriptors - // always available. The new posted task will use the same TaskId as the - // current file descriptor watching task an could be canceled in either state, - // when waiting for the file descriptor or waiting in the main loop. - StopWatching(); - bool base_scheduled = loop_->base_loop_->task_runner()->PostTask( - location_, - base::Bind(&BaseMessageLoop::OnFileReadyPostedTask, - loop_->weak_ptr_factory_.GetWeakPtr(), - task_id_)); - posted_task_pending_ = true; - if (base_scheduled) { - DVLOG_LOC(location_, 1) - << "Dispatching task_id " << task_id_ << " for " - << (base_mode_ == base::MessagePumpForIO::WATCH_READ ? - "reading" : "writing") - << " file descriptor " << fd_ << ", scheduled from this location."; - } else { - // In the rare case that PostTask() fails, we fall back to run it directly. - // This would indicate a bigger problem with the message loop setup. - LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask()."; - OnFileReadyPostedTask(); - } -} - -void BaseMessageLoop::IOTask::OnFileReadyPostedTask() { - // We can't access |this| after running the |closure_| since it could call - // CancelTask on its own task_id, so we copy the members we need now. - BaseMessageLoop* loop_ptr = loop_; - DCHECK(posted_task_pending_ = true); - posted_task_pending_ = false; - - // If this task was already canceled, the closure will be null and there is - // nothing else to do here. This execution doesn't count a step for RunOnce() - // unless we have a callback to run. - if (closure_.is_null()) { - loop_->io_tasks_.erase(task_id_); - return; - } - - DVLOG_LOC(location_, 1) - << "Running task_id " << task_id_ << " for " - << (base_mode_ == base::MessagePumpForIO::WATCH_READ ? - "reading" : "writing") - << " file descriptor " << fd_ << ", scheduled from this location."; - - if (persistent_) { - // In the persistent case we just run the callback. If this callback cancels - // the task id, we can't access |this| anymore, so we re-start watching the - // file descriptor before running the callback, unless this is a fd where - // we didn't stop watching the file descriptor when it became available. - if (!immediate_run_) - StartWatching(); - closure_.Run(); - } else { - // This will destroy |this|, the fd_watcher and therefore stop watching this - // file descriptor. - Closure closure_copy = std::move(closure_); - loop_->io_tasks_.erase(task_id_); - // Run the closure from the local copy we just made. - closure_copy.Run(); - } - - if (loop_ptr->run_once_) { - loop_ptr->run_once_ = false; - loop_ptr->BreakLoop(); - } -} - -bool BaseMessageLoop::IOTask::CancelTask() { - if (closure_.is_null()) - return false; - - DVLOG_LOC(location_, 1) - << "Removing task_id " << task_id_ << " scheduled from this location."; - - if (!posted_task_pending_) { - // Destroying the FileDescriptorWatcher implicitly stops watching the file - // descriptor. This will delete our instance. - loop_->io_tasks_.erase(task_id_); - return true; - } - // The IOTask is waiting for the message loop to run its delayed task, so - // it is not watching for the file descriptor. We release the closure - // resources now but keep the IOTask instance alive while we wait for the - // callback to run and delete the IOTask. - closure_ = Closure(); - return true; -} - } // namespace brillo diff --git a/brillo/message_loops/base_message_loop.h b/brillo/message_loops/base_message_loop.h index c038ac7..92c5bda 100644 --- a/brillo/message_loops/base_message_loop.h +++ b/brillo/message_loops/base_message_loop.h @@ -45,12 +45,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop { const base::Closure& task, base::TimeDelta delay) override; using MessageLoop::PostDelayedTask; - TaskId WatchFileDescriptor(const base::Location& from_here, - int fd, - WatchMode mode, - bool persistent, - const base::Closure& task) override; - using MessageLoop::WatchFileDescriptor; bool CancelTask(TaskId task_id) override; bool RunOnce(bool may_block) override; void Run() override; @@ -75,12 +69,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop { // scheduled with Post*Task() of id |task_id|, even if it was canceled. void OnRanPostedTask(MessageLoop::TaskId task_id); - // Called from the message loop when the IOTask should run the scheduled - // callback. This is a simple wrapper of IOTask::OnFileReadyPostedTask() - // posted from the BaseMessageLoop so it is deleted when the BaseMessageLoop - // goes out of scope since we can't cancel the callback otherwise. - void OnFileReadyPostedTask(MessageLoop::TaskId task_id); - // Return a new unused task_id. TaskId NextTaskId(); @@ -94,68 +82,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop { base::Closure closure; }; - class IOTask : public base::MessagePumpForIO::FdWatcher { - public: - IOTask(const base::Location& location, - BaseMessageLoop* loop, - MessageLoop::TaskId task_id, - int fd, - base::MessagePumpForIO::Mode base_mode, - bool persistent, - const base::Closure& task); - - const base::Location& location() const { return location_; } - - // Used to start/stop watching the file descriptor while keeping the - // IOTask entry available. - bool StartWatching(); - void StopWatching(); - - // Called from the message loop as a PostTask() when the file descriptor is - // available, scheduled to run from OnFileReady(). - void OnFileReadyPostedTask(); - - // Cancel the IOTask and returns whether it was actually canceled, with the - // same semantics as MessageLoop::CancelTask(). - bool CancelTask(); - - // Sets the closure to be run immediately whenever the file descriptor - // becomes ready. - void RunImmediately() { immediate_run_ = true; } - - private: - base::Location location_; - BaseMessageLoop* loop_; - - // These are the arguments passed in the constructor, basically forwarding - // all the arguments passed to WatchFileDescriptor() plus the assigned - // TaskId for this task. - MessageLoop::TaskId task_id_; - int fd_; - base::MessagePumpForIO::Mode base_mode_; - bool persistent_; - base::Closure closure_; - - base::MessagePumpForIO::FdWatchController fd_watcher_; - - // Tells whether there is a pending call to OnFileReadPostedTask(). - bool posted_task_pending_{false}; - - // Whether the registered callback should be running immediately when the - // file descriptor is ready, as opposed to posting a task to the main loop - // to prevent starvation. - bool immediate_run_{false}; - - // base::MessageLoopForIO::Watcher overrides: - void OnFileCanReadWithoutBlocking(int fd) override; - void OnFileCanWriteWithoutBlocking(int fd) override; - - // Common implementation for both the read and write case. - void OnFileReady(); - - DISALLOW_COPY_AND_ASSIGN(IOTask); - }; - // The base::MessageLoopForIO instance owned by this class, if any. This // is declared first in this class so it is destroyed last. std::unique_ptr<base::MessageLoopForIO> owned_base_loop_; @@ -163,9 +89,6 @@ class BRILLO_EXPORT BaseMessageLoop : public MessageLoop { // Tasks blocked on a timeout. std::map<MessageLoop::TaskId, DelayedTask> delayed_tasks_; - // Tasks blocked on I/O. - std::map<MessageLoop::TaskId, IOTask> io_tasks_; - // Flag to mark that we should run the message loop only one iteration. bool run_once_{false}; diff --git a/brillo/message_loops/fake_message_loop.cc b/brillo/message_loops/fake_message_loop.cc index 41f5b51..9ab2aa9 100644 --- a/brillo/message_loops/fake_message_loop.cc +++ b/brillo/message_loops/fake_message_loop.cc @@ -33,20 +33,6 @@ MessageLoop::TaskId FakeMessageLoop::PostDelayedTask( return current_id; } -MessageLoop::TaskId FakeMessageLoop::WatchFileDescriptor( - const base::Location& from_here, - int fd, - WatchMode mode, - bool persistent, - const base::Closure& task) { - MessageLoop::TaskId current_id = ++last_id_; - // FakeMessageLoop is limited to only 2^64 tasks. That should be enough. - CHECK(current_id); - tasks_.emplace(current_id, ScheduledTask{from_here, persistent, task}); - fds_watched_.emplace(std::make_pair(fd, mode), current_id); - return current_id; -} - bool FakeMessageLoop::CancelTask(TaskId task_id) { if (task_id == MessageLoop::kTaskIdNull) return false; @@ -58,36 +44,6 @@ bool FakeMessageLoop::CancelTask(TaskId task_id) { bool FakeMessageLoop::RunOnce(bool may_block) { if (test_clock_) current_time_ = test_clock_->Now(); - // Try to fire ready file descriptors first. - for (const auto& fd_mode : fds_ready_) { - const auto& fd_watched = fds_watched_.find(fd_mode); - if (fd_watched == fds_watched_.end()) - continue; - // The fd_watched->second task might have been canceled and we never removed - // it from the fds_watched_, so we fix that now. - const auto& scheduled_task_ref = tasks_.find(fd_watched->second); - if (scheduled_task_ref == tasks_.end()) { - fds_watched_.erase(fd_watched); - continue; - } - VLOG_LOC(scheduled_task_ref->second.location, 1) - << "Running task_id " << fd_watched->second - << " for watching file descriptor " << fd_mode.first << " for " - << (fd_mode.second == MessageLoop::kWatchRead ? "reading" : "writing") - << (scheduled_task_ref->second.persistent ? - " persistently" : " just once") - << " scheduled from this location."; - if (scheduled_task_ref->second.persistent) { - scheduled_task_ref->second.callback.Run(); - } else { - base::Closure callback = std::move(scheduled_task_ref->second.callback); - tasks_.erase(scheduled_task_ref); - fds_watched_.erase(fd_watched); - callback.Run(); - } - return true; - } - // Try to fire time-based callbacks afterwards. while (!fire_order_.empty() && (may_block || fire_order_.top().first <= current_time_)) { @@ -120,15 +76,6 @@ bool FakeMessageLoop::RunOnce(bool may_block) { return false; } -void FakeMessageLoop::SetFileDescriptorReadiness(int fd, - WatchMode mode, - bool ready) { - if (ready) - fds_ready_.emplace(fd, mode); - else - fds_ready_.erase(std::make_pair(fd, mode)); -} - bool FakeMessageLoop::PendingTasks() { for (const auto& task : tasks_) { VLOG_LOC(task.second.location, 1) diff --git a/brillo/message_loops/fake_message_loop.h b/brillo/message_loops/fake_message_loop.h index 4b6e8ac..45bab81 100644 --- a/brillo/message_loops/fake_message_loop.h +++ b/brillo/message_loops/fake_message_loop.h @@ -40,23 +40,11 @@ class BRILLO_EXPORT FakeMessageLoop : public MessageLoop { const base::Closure& task, base::TimeDelta delay) override; using MessageLoop::PostDelayedTask; - TaskId WatchFileDescriptor(const base::Location& from_here, - int fd, - WatchMode mode, - bool persistent, - const base::Closure& task) override; - using MessageLoop::WatchFileDescriptor; bool CancelTask(TaskId task_id) override; bool RunOnce(bool may_block) override; // FakeMessageLoop methods: - // Pretend, for the purpose of the FakeMessageLoop watching for file - // descriptors, that the file descriptor |fd| readiness to perform the - // operation described by |mode| is |ready|. Initially, no file descriptor - // is ready for any operation. - void SetFileDescriptorReadiness(int fd, WatchMode mode, bool ready); - // Return whether there are peding tasks. Useful to check that no // callbacks were leaked. bool PendingTasks(); @@ -79,13 +67,6 @@ class BRILLO_EXPORT FakeMessageLoop : public MessageLoop { std::vector<std::pair<base::Time, MessageLoop::TaskId>>, std::greater<std::pair<base::Time, MessageLoop::TaskId>>> fire_order_; - // The bag of watched (fd, mode) pair associated with the TaskId that's - // watching them. - std::multimap<std::pair<int, WatchMode>, MessageLoop::TaskId> fds_watched_; - - // The set of (fd, mode) pairs that are faked as ready. - std::set<std::pair<int, WatchMode>> fds_ready_; - base::SimpleTestClock* test_clock_ = nullptr; base::Time current_time_ = base::Time::FromDoubleT(1246996800.); diff --git a/brillo/message_loops/fake_message_loop_test.cc b/brillo/message_loops/fake_message_loop_test.cc index b4b839c..07af569 100644 --- a/brillo/message_loops/fake_message_loop_test.cc +++ b/brillo/message_loops/fake_message_loop_test.cc @@ -84,34 +84,6 @@ TEST_F(FakeMessageLoopTest, PostDelayedTaskAdvancesTheTime) { EXPECT_EQ(start + TimeDelta::FromSeconds(3), clock_.Now()); } -TEST_F(FakeMessageLoopTest, WatchFileDescriptorWaits) { - int fd = 1234; - // We will simulate this situation. At the beginning, we will watch for a - // file descriptor that won't trigger for 10s. Then we will pretend it is - // ready after 10s and expect its callback to run just once. - int called = 0; - TaskId task_id = loop_->WatchFileDescriptor( - FROM_HERE, fd, MessageLoop::kWatchRead, false, - Bind([](int* called) { (*called)++; }, base::Unretained(&called))); - EXPECT_NE(MessageLoop::kTaskIdNull, task_id); - - EXPECT_NE(MessageLoop::kTaskIdNull, - loop_->PostDelayedTask(Bind(&FakeMessageLoop::BreakLoop, - base::Unretained(loop_.get())), - TimeDelta::FromSeconds(10))); - EXPECT_NE(MessageLoop::kTaskIdNull, - loop_->PostDelayedTask(Bind(&FakeMessageLoop::BreakLoop, - base::Unretained(loop_.get())), - TimeDelta::FromSeconds(20))); - loop_->Run(); - EXPECT_EQ(0, called); - - loop_->SetFileDescriptorReadiness(fd, MessageLoop::kWatchRead, true); - loop_->Run(); - EXPECT_EQ(1, called); - EXPECT_FALSE(loop_->CancelTask(task_id)); -} - TEST_F(FakeMessageLoopTest, PendingTasksTest) { loop_->PostDelayedTask(base::DoNothing(), TimeDelta::FromSeconds(1)); EXPECT_TRUE(loop_->PendingTasks()); diff --git a/brillo/message_loops/message_loop.h b/brillo/message_loops/message_loop.h index e9f804e..0f9740d 100644 --- a/brillo/message_loops/message_loop.h +++ b/brillo/message_loops/message_loop.h @@ -67,33 +67,6 @@ class BRILLO_EXPORT MessageLoop { return PostDelayedTask(from_here, task, base::TimeDelta()); } - // Watch mode flag used to watch for file descriptors. - enum WatchMode { - kWatchRead, - kWatchWrite, - }; - - // Watch a file descriptor |fd| for it to be ready to perform the operation - // passed in |mode| without blocking. When that happens, the |task| closure - // will be executed. If |persistent| is true, the file descriptor will - // continue to be watched and |task| will continue to be called until the task - // is canceled with CancelTask(). - // Returns the TaskId describing this task. In case of error, returns - // kTaskIdNull. - virtual TaskId WatchFileDescriptor(const base::Location& from_here, - int fd, - WatchMode mode, - bool persistent, - const base::Closure& task) = 0; - - // Convenience function to call WatchFileDescriptor() without a location. - TaskId WatchFileDescriptor(int fd, - WatchMode mode, - bool persistent, - const base::Closure& task) { - return WatchFileDescriptor(base::Location(), fd, mode, persistent, task); - } - // Cancel a scheduled task. Returns whether the task was canceled. For // example, if the callback was already executed (or is being executed) or was // already canceled this method will fail. Note that the TaskId can be reused diff --git a/brillo/message_loops/message_loop_test.cc b/brillo/message_loops/message_loop_test.cc index 7b57015..45a3b95 100644 --- a/brillo/message_loops/message_loop_test.cc +++ b/brillo/message_loops/message_loop_test.cc @@ -36,10 +36,6 @@ bool ReturnBool(bool *b) { return *b; } -void Increment(int* i) { - (*i)++; -} - } // namespace namespace brillo { @@ -122,115 +118,6 @@ TYPED_TEST(MessageLoopTest, PostDelayedTaskWithoutLocation) { EXPECT_EQ(1, MessageLoopRunMaxIterations(this->loop_.get(), 100)); } -TYPED_TEST(MessageLoopTest, WatchForInvalidFD) { - bool called = false; - EXPECT_EQ(MessageLoop::kTaskIdNull, this->loop_->WatchFileDescriptor( - FROM_HERE, -1, MessageLoop::kWatchRead, true, - Bind(&SetToTrue, &called))); - EXPECT_EQ(MessageLoop::kTaskIdNull, this->loop_->WatchFileDescriptor( - FROM_HERE, -1, MessageLoop::kWatchWrite, true, - Bind(&SetToTrue, &called))); - EXPECT_EQ(0, MessageLoopRunMaxIterations(this->loop_.get(), 100)); - EXPECT_FALSE(called); -} - -TYPED_TEST(MessageLoopTest, CancelWatchedFileDescriptor) { - ScopedPipe pipe; - bool called = false; - TaskId task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, pipe.reader, MessageLoop::kWatchRead, true, - Bind(&SetToTrue, &called)); - EXPECT_NE(MessageLoop::kTaskIdNull, task_id); - // The reader end is blocked because we didn't write anything to the writer - // end. - EXPECT_EQ(0, MessageLoopRunMaxIterations(this->loop_.get(), 100)); - EXPECT_FALSE(called); - EXPECT_TRUE(this->loop_->CancelTask(task_id)); -} - -// When you watch a file descriptor for reading, the guaranties are that a -// blocking call to read() on that file descriptor will not block. This should -// include the case when the other end of a pipe is closed or the file is empty. -TYPED_TEST(MessageLoopTest, WatchFileDescriptorTriggersWhenPipeClosed) { - ScopedPipe pipe; - bool called = false; - EXPECT_EQ(0, HANDLE_EINTR(close(pipe.writer))); - pipe.writer = -1; - TaskId task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, pipe.reader, MessageLoop::kWatchRead, true, - Bind(&SetToTrue, &called)); - EXPECT_NE(MessageLoop::kTaskIdNull, task_id); - // The reader end is not blocked because we closed the writer end so a read on - // the reader end would return 0 bytes read. - EXPECT_NE(0, MessageLoopRunMaxIterations(this->loop_.get(), 10)); - EXPECT_TRUE(called); - EXPECT_TRUE(this->loop_->CancelTask(task_id)); -} - -// When a WatchFileDescriptor task is scheduled with |persistent| = true, we -// should keep getting a call whenever the file descriptor is ready. -TYPED_TEST(MessageLoopTest, WatchFileDescriptorPersistently) { - ScopedPipe pipe; - EXPECT_EQ(1, HANDLE_EINTR(write(pipe.writer, "a", 1))); - - int called = 0; - TaskId task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, pipe.reader, MessageLoop::kWatchRead, true, - Bind(&Increment, &called)); - EXPECT_NE(MessageLoop::kTaskIdNull, task_id); - // We let the main loop run for 20 iterations to give it enough iterations to - // verify that our callback was called more than one. We only check that our - // callback is called more than once. - EXPECT_EQ(20, MessageLoopRunMaxIterations(this->loop_.get(), 20)); - EXPECT_LT(1, called); - EXPECT_TRUE(this->loop_->CancelTask(task_id)); -} - -TYPED_TEST(MessageLoopTest, WatchFileDescriptorNonPersistent) { - ScopedPipe pipe; - EXPECT_EQ(1, HANDLE_EINTR(write(pipe.writer, "a", 1))); - - int called = 0; - TaskId task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, pipe.reader, MessageLoop::kWatchRead, false, - Bind(&Increment, &called)); - EXPECT_NE(MessageLoop::kTaskIdNull, task_id); - // We let the main loop run for 20 iterations but we just expect it to run - // at least once. The callback should be called exactly once since we - // scheduled it non-persistently. After it ran, we shouldn't be able to cancel - // this task. - EXPECT_LT(0, MessageLoopRunMaxIterations(this->loop_.get(), 20)); - EXPECT_EQ(1, called); - EXPECT_FALSE(this->loop_->CancelTask(task_id)); -} - -TYPED_TEST(MessageLoopTest, WatchFileDescriptorForReadAndWriteSimultaneously) { - ScopedSocketPair socks; - EXPECT_EQ(1, HANDLE_EINTR(write(socks.right, "a", 1))); - // socks.left should be able to read this "a" and should also be able to write - // without blocking since the kernel has some buffering for it. - - TaskId read_task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, socks.left, MessageLoop::kWatchRead, true, - Bind([] (MessageLoop* loop, TaskId* read_task_id) { - EXPECT_TRUE(loop->CancelTask(*read_task_id)) - << "task_id" << *read_task_id; - }, this->loop_.get(), &read_task_id)); - EXPECT_NE(MessageLoop::kTaskIdNull, read_task_id); - - TaskId write_task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, socks.left, MessageLoop::kWatchWrite, true, - Bind([] (MessageLoop* loop, TaskId* write_task_id) { - EXPECT_TRUE(loop->CancelTask(*write_task_id)); - }, this->loop_.get(), &write_task_id)); - EXPECT_NE(MessageLoop::kTaskIdNull, write_task_id); - - EXPECT_LT(0, MessageLoopRunMaxIterations(this->loop_.get(), 20)); - - EXPECT_FALSE(this->loop_->CancelTask(read_task_id)); - EXPECT_FALSE(this->loop_->CancelTask(write_task_id)); -} - // Test that we can cancel the task we are running, and should just fail. TYPED_TEST(MessageLoopTest, DeleteTaskFromSelf) { bool cancel_result = true; // We would expect this to be false. @@ -244,129 +131,4 @@ TYPED_TEST(MessageLoopTest, DeleteTaskFromSelf) { EXPECT_FALSE(cancel_result); } -// Test that we can cancel a non-persistent file descriptor watching callback, -// which should fail. -TYPED_TEST(MessageLoopTest, DeleteNonPersistenIOTaskFromSelf) { - ScopedPipe pipe; - TaskId task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, pipe.writer, MessageLoop::kWatchWrite, false /* persistent */, - Bind([](MessageLoop* loop, TaskId* task_id) { - EXPECT_FALSE(loop->CancelTask(*task_id)); - *task_id = MessageLoop::kTaskIdNull; - }, this->loop_.get(), &task_id)); - EXPECT_NE(MessageLoop::kTaskIdNull, task_id); - EXPECT_EQ(1, MessageLoopRunMaxIterations(this->loop_.get(), 100)); - EXPECT_EQ(MessageLoop::kTaskIdNull, task_id); -} - -// Test that we can cancel a persistent file descriptor watching callback from -// the same callback. -TYPED_TEST(MessageLoopTest, DeletePersistenIOTaskFromSelf) { - ScopedPipe pipe; - TaskId task_id = this->loop_->WatchFileDescriptor( - FROM_HERE, pipe.writer, MessageLoop::kWatchWrite, true /* persistent */, - Bind([](MessageLoop* loop, TaskId* task_id) { - EXPECT_TRUE(loop->CancelTask(*task_id)); - *task_id = MessageLoop::kTaskIdNull; - }, this->loop_.get(), &task_id)); - EXPECT_NE(MessageLoop::kTaskIdNull, task_id); - EXPECT_EQ(1, MessageLoopRunMaxIterations(this->loop_.get(), 100)); - EXPECT_EQ(MessageLoop::kTaskIdNull, task_id); -} - -// Test that we can cancel several persistent file descriptor watching callbacks -// from a scheduled callback. In the BaseMessageLoop implementation, this code -// will cause us to cancel an IOTask that has a pending delayed task, but -// otherwise is a valid test case on all implementations. -TYPED_TEST(MessageLoopTest, DeleteAllPersistenIOTaskFromSelf) { - const int kNumTasks = 5; - ScopedPipe pipes[kNumTasks]; - TaskId task_ids[kNumTasks]; - - for (int i = 0; i < kNumTasks; ++i) { - task_ids[i] = this->loop_->WatchFileDescriptor( - FROM_HERE, pipes[i].writer, MessageLoop::kWatchWrite, - true /* persistent */, - Bind([] (MessageLoop* loop, TaskId* task_ids) { - for (int j = 0; j < kNumTasks; ++j) { - // Once we cancel all the tasks, none should run, so this code runs - // only once from one callback. - EXPECT_TRUE(loop->CancelTask(task_ids[j])); - task_ids[j] = MessageLoop::kTaskIdNull; - } - }, this->loop_.get(), task_ids)); - } - MessageLoopRunMaxIterations(this->loop_.get(), 100); - for (int i = 0; i < kNumTasks; ++i) { - EXPECT_EQ(MessageLoop::kTaskIdNull, task_ids[i]); - } -} - -// Test that if there are several tasks watching for file descriptors to be -// available or simply waiting in the message loop are fairly scheduled to run. -// In other words, this test ensures that having a file descriptor always -// available doesn't prevent other file descriptors watching tasks or delayed -// tasks to be dispatched, causing starvation. -TYPED_TEST(MessageLoopTest, AllTasksAreEqual) { - int total_calls = 0; - - // First, schedule a repeating timeout callback to run from the main loop. - int timeout_called = 0; - base::Closure timeout_callback; - MessageLoop::TaskId timeout_task; - timeout_callback = base::Bind( - [](MessageLoop* loop, int* timeout_called, int* total_calls, - base::Closure* timeout_callback, MessageLoop::TaskId* timeout_task) { - (*timeout_called)++; - (*total_calls)++; - *timeout_task = loop->PostTask(FROM_HERE, *timeout_callback); - if (*total_calls > 100) - loop->BreakLoop(); - }, - this->loop_.get(), &timeout_called, &total_calls, &timeout_callback, - &timeout_task); - timeout_task = this->loop_->PostTask(FROM_HERE, timeout_callback); - - // Second, schedule several file descriptor watchers. - const int kNumTasks = 3; - ScopedPipe pipes[kNumTasks]; - MessageLoop::TaskId tasks[kNumTasks]; - - int reads[kNumTasks] = {}; - base::Callback<void(int)> fd_callback = base::Bind( - [](MessageLoop* loop, ScopedPipe* pipes, int* reads, - int* total_calls, int i) { - reads[i]++; - (*total_calls)++; - char c; - EXPECT_EQ(1, HANDLE_EINTR(read(pipes[i].reader, &c, 1))); - if (*total_calls > 100) - loop->BreakLoop(); - }, this->loop_.get(), pipes, reads, &total_calls); - - for (int i = 0; i < kNumTasks; ++i) { - tasks[i] = this->loop_->WatchFileDescriptor( - FROM_HERE, pipes[i].reader, MessageLoop::kWatchRead, - true /* persistent */, - Bind(fd_callback, i)); - // Make enough bytes available on each file descriptor. This should not - // block because we set the size of the file descriptor buffer when - // creating it. - std::vector<char> blob(1000, 'a'); - EXPECT_EQ(blob.size(), - HANDLE_EINTR(write(pipes[i].writer, blob.data(), blob.size()))); - } - this->loop_->Run(); - EXPECT_GT(total_calls, 100); - // We run the loop up 100 times and expect each callback to run at least 10 - // times. A good scheduler should balance these callbacks. - EXPECT_GE(timeout_called, 10); - EXPECT_TRUE(this->loop_->CancelTask(timeout_task)); - for (int i = 0; i < kNumTasks; ++i) { - EXPECT_GE(reads[i], 10) << "Reading from pipes[" << i << "], fd " - << pipes[i].reader; - EXPECT_TRUE(this->loop_->CancelTask(tasks[i])); - } -} - } // namespace brillo diff --git a/brillo/message_loops/mock_message_loop.h b/brillo/message_loops/mock_message_loop.h index c84e585..30a19b8 100644 --- a/brillo/message_loops/mock_message_loop.h +++ b/brillo/message_loops/mock_message_loop.h @@ -40,14 +40,6 @@ class BRILLO_EXPORT MockMessageLoop : public MessageLoop { const base::Closure&, base::TimeDelta)>( &FakeMessageLoop::PostDelayedTask))); - ON_CALL(*this, WatchFileDescriptor( - ::testing::_, ::testing::_, ::testing::_, ::testing::_, ::testing::_)) - .WillByDefault(::testing::Invoke( - &fake_loop_, - static_cast<TaskId(FakeMessageLoop::*)( - const base::Location&, int, WatchMode, bool, - const base::Closure&)>( - &FakeMessageLoop::WatchFileDescriptor))); ON_CALL(*this, CancelTask(::testing::_)) .WillByDefault(::testing::Invoke(&fake_loop_, &FakeMessageLoop::CancelTask)); @@ -62,12 +54,6 @@ class BRILLO_EXPORT MockMessageLoop : public MessageLoop { (const base::Location&, const base::Closure&, base::TimeDelta), (override)); using MessageLoop::PostDelayedTask; - MOCK_METHOD( - TaskId, - WatchFileDescriptor, - (const base::Location&, int, WatchMode, bool, const base::Closure&), - (override)); - using MessageLoop::WatchFileDescriptor; MOCK_METHOD(bool, CancelTask, (TaskId), (override)); MOCK_METHOD(bool, RunOnce, (bool), (override)); diff --git a/brillo/streams/file_stream.cc b/brillo/streams/file_stream.cc index db22192..70b25dd 100644 --- a/brillo/streams/file_stream.cc +++ b/brillo/streams/file_stream.cc @@ -12,6 +12,7 @@ #include <utility> #include <base/bind.h> +#include <base/files/file_descriptor_watcher_posix.h> #include <base/files/file_util.h> #include <base/posix/eintr_wrapper.h> #include <brillo/errors/error_codes.h> @@ -86,15 +87,11 @@ class FileDescriptor : public FileStream::FileDescriptorInterface { ErrorPtr* error) override { if (stream_utils::IsReadAccessMode(mode)) { CHECK(read_data_callback_.is_null()); - MessageLoop::current()->CancelTask(read_watcher_); - read_watcher_ = MessageLoop::current()->WatchFileDescriptor( - FROM_HERE, + read_watcher_ = base::FileDescriptorWatcher::WatchReadable( fd_, - MessageLoop::WatchMode::kWatchRead, - false, // persistent - base::Bind(&FileDescriptor::OnFileCanReadWithoutBlocking, - base::Unretained(this))); - if (read_watcher_ == MessageLoop::kTaskIdNull) { + base::BindRepeating(&FileDescriptor::OnReadable, + base::Unretained(this))); + if (!read_watcher_) { Error::AddTo(error, FROM_HERE, errors::stream::kDomain, errors::stream::kInvalidParameter, "File descriptor doesn't support watching for reading."); @@ -104,15 +101,11 @@ class FileDescriptor : public FileStream::FileDescriptorInterface { } if (stream_utils::IsWriteAccessMode(mode)) { CHECK(write_data_callback_.is_null()); - MessageLoop::current()->CancelTask(write_watcher_); - write_watcher_ = MessageLoop::current()->WatchFileDescriptor( - FROM_HERE, + write_watcher_ = base::FileDescriptorWatcher::WatchWritable( fd_, - MessageLoop::WatchMode::kWatchWrite, - false, // persistent - base::Bind(&FileDescriptor::OnFileCanWriteWithoutBlocking, - base::Unretained(this))); - if (write_watcher_ == MessageLoop::kTaskIdNull) { + base::BindRepeating(&FileDescriptor::OnWritable, + base::Unretained(this))); + if (!write_watcher_) { Error::AddTo(error, FROM_HERE, errors::stream::kDomain, errors::stream::kInvalidParameter, "File descriptor doesn't support watching for writing."); @@ -157,31 +150,26 @@ class FileDescriptor : public FileStream::FileDescriptorInterface { void CancelPendingAsyncOperations() override { read_data_callback_.Reset(); - if (read_watcher_ != MessageLoop::kTaskIdNull) { - MessageLoop::current()->CancelTask(read_watcher_); - read_watcher_ = MessageLoop::kTaskIdNull; - } - + read_watcher_ = nullptr; write_data_callback_.Reset(); - if (write_watcher_ != MessageLoop::kTaskIdNull) { - MessageLoop::current()->CancelTask(write_watcher_); - write_watcher_ = MessageLoop::kTaskIdNull; - } + write_watcher_ = nullptr; } // Called from the brillo::MessageLoop when the file descriptor is available // for reading. - void OnFileCanReadWithoutBlocking() { + void OnReadable() { CHECK(!read_data_callback_.is_null()); - DataCallback cb = read_data_callback_; - read_data_callback_.Reset(); + + read_watcher_ = nullptr; + DataCallback cb = std::move(read_data_callback_); cb.Run(Stream::AccessMode::READ); } - void OnFileCanWriteWithoutBlocking() { + void OnWritable() { CHECK(!write_data_callback_.is_null()); - DataCallback cb = write_data_callback_; - write_data_callback_.Reset(); + + write_watcher_ = nullptr; + DataCallback cb = std::move(write_data_callback_); cb.Run(Stream::AccessMode::WRITE); } @@ -200,9 +188,9 @@ class FileDescriptor : public FileStream::FileDescriptorInterface { DataCallback read_data_callback_; DataCallback write_data_callback_; - // MessageLoop tasks monitoring read/write operations on the file descriptor. - MessageLoop::TaskId read_watcher_{MessageLoop::kTaskIdNull}; - MessageLoop::TaskId write_watcher_{MessageLoop::kTaskIdNull}; + // Monitoring read/write operations on the file descriptor. + std::unique_ptr<base::FileDescriptorWatcher::Controller> read_watcher_; + std::unique_ptr<base::FileDescriptorWatcher::Controller> write_watcher_; DISALLOW_COPY_AND_ASSIGN(FileDescriptor); }; |