diff options
author | Henri Chataing <henrichataing@google.com> | 2023-04-19 17:00:19 +0000 |
---|---|---|
committer | Gerrit Code Review <noreply-gerritcodereview@google.com> | 2023-04-19 17:00:19 +0000 |
commit | 937b4f2e335ba0e2a3fb983330a1aa1a59fb5c6b (patch) | |
tree | 6250e1d5f80169c5b8604914d21bce34d0f69b32 | |
parent | c275d5ee4d19454990d230aeac77af1d3d1ba8eb (diff) | |
parent | 525343bf893831a12cf3999284a646a8c6ba0af7 (diff) | |
download | netsim-937b4f2e335ba0e2a3fb983330a1aa1a59fb5c6b.tar.gz |
Merge "Remove dependency on libbt-rootcanal-qemu in CMakeLists.txt"
-rw-r--r-- | src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/hci/async_manager.cc | 577 |
2 files changed, 578 insertions, 1 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 28ccd34f..7605b6c1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -108,6 +108,7 @@ if(TARGET Rust::Rustc) frontend/frontend_server.cc frontend/frontend_server.h frontend/server_response_writable.h + hci/async_manager.cc hci/bluetooth_facade.cc hci/bluetooth_facade.h hci/hci_debug.cc @@ -125,7 +126,6 @@ if(TARGET Rust::Rustc) android-emu-base-headers grpc++ libbt-rootcanal - libbt-rootcanal-qemu netsim-cxx netsimd-proto-lib protobuf::libprotobuf diff --git a/src/hci/async_manager.cc b/src/hci/async_manager.cc new file mode 100644 index 00000000..271d87ab --- /dev/null +++ b/src/hci/async_manager.cc @@ -0,0 +1,577 @@ +/* + * Copyright 2016 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "model/setup/async_manager.h" // for AsyncManager + +#include <errno.h> // for errno +#include <atomic> // for atomic_bool, atomic_e... +#include <condition_variable> // for condition_variable +#include <cstring> // for strerror +#include <limits> // for numeric_limits +#include <map> // for map<>::value_type, map +#include <mutex> // for unique_lock, mutex +#include <ratio> // for ratio +#include <set> // for set +#include <thread> // for thread +#include <type_traits> // for remove_extent_t +#include <utility> // for pair, make_pair, oper... +#include <vector> // for vector + +#include "aemu/base/EintrWrapper.h" // for HANDLE_EINTR +#include "aemu/base/Log.h" // for LogStreamVoidify, Log... +#include "aemu/base/sockets/SocketUtils.h" // for socketRecv, socketSet... +#include "aemu/base/sockets/SocketWaiter.h" // for SocketWaiter, SocketW... +#include "aemu/base/logging/CLog.h" + +namespace rootcanal { +// Implementation of AsyncManager is divided between two classes, three if +// AsyncManager itself is taken into account, but its only responsability +// besides being a proxy for the other two classes is to provide a global +// synchronization mechanism for callbacks and client code to use. + +// The watching of file descriptors is done through AsyncFdWatcher. Several +// objects of this class may coexist simultaneosly as they share no state. +// After construction of this objects nothing happens beyond some very simple +// member initialization. When the first FD is set up for watching the object +// starts a new thread which watches the given (and later provided) FDs using +// select() inside a loop. A special FD (a pipe) is also watched which is +// used to notify the thread of internal changes on the object state (like +// the addition of new FDs to watch on). Every access to internal state is +// synchronized using a single internal mutex. The thread is only stopped on +// destruction of the object, by modifying a flag, which is the only member +// variable accessed without acquiring the lock (because the notification to +// the thread is done later by writing to a pipe which means the thread will +// be notified regardless of what phase of the loop it is in that moment) + +// The scheduling of asynchronous tasks, periodic or not, is handled by the +// AsyncTaskManager class. Like the one for FDs, this class shares no internal +// state between different instances so it is safe to use several objects of +// this class, also nothing interesting happens upon construction, but only +// after a Task has been scheduled and access to internal state is synchronized +// using a single internal mutex. When the first task is scheduled a thread +// is started which monitors a queue of tasks. The queue is peeked to see +// when the next task should be carried out and then the thread performs a +// (absolute) timed wait on a condition variable. The wait ends because of a +// time out or a notify on the cond var, the former means a task is due +// for execution while the later means there has been a change in internal +// state, like a task has been scheduled/canceled or the flag to stop has +// been set. Setting and querying the stop flag or modifying the task queue +// and subsequent notification on the cond var is done atomically (e.g while +// holding the lock on the internal mutex) to ensure that the thread never +// misses the notification, since notifying a cond var is not persistent as +// writing on a pipe (if not done this way, the thread could query the +// stopping flag and be put aside by the OS scheduler right after, then the +// 'stop thread' procedure could run, setting the flag, notifying a cond +// var that no one is waiting on and joining the thread, the thread then +// resumes execution believing that it needs to continue and waits on the +// cond var possibly forever if there are no tasks scheduled, efectively +// causing a deadlock). + +// This number also states the maximum number of scheduled tasks we can handle +// at a given time +static const uint16_t kMaxTaskId = -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/ +static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) { + return (id == kMaxTaskId) ? 1 : id + 1; +} +// The buffer is only 10 bytes because the expected number of bytes +// written on this socket is 1. It is possible that the thread is notified +// more than once but highly unlikely, so a buffer of size 10 seems enough +// and the reads are performed inside a while just in case it isn't. From +// the thread routine's point of view it is the same to have been notified +// just once or 100 times so it just tries to consume the entire buffer. +// In the cases where an interrupt would cause read to return without +// having read everything that was available a new iteration of the thread +// loop will bring execution to this point almost immediately, so there is +// no need to treat that case. +static const int kNotificationBufferSize = 10; + + +using android::base::SocketWaiter; + +// Async File Descriptor Watcher Implementation: +class AsyncManager::AsyncFdWatcher { + + public: + int WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) { + // add file descriptor and callback + { + std::unique_lock<std::recursive_mutex> guard(internal_mutex_); + watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback; + } + + // start the thread if not started yet + int started = tryStartThread(); + if (started != 0) { + derror("%s: Unable to start thread", __func__); + return started; + } + + // notify the thread so that it knows of the new FD + notifyThread(); + + return 0; + } + + void StopWatchingFileDescriptor(int file_descriptor) { + std::unique_lock<std::recursive_mutex> guard(internal_mutex_); + watched_shared_fds_.erase(file_descriptor); + } + + AsyncFdWatcher() = default; + AsyncFdWatcher(const AsyncFdWatcher&) = delete; + AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete; + + ~AsyncFdWatcher() = default; + + int stopThread() { + if (!std::atomic_exchange(&running_, false)) { + return 0; // if not running already + } + + notifyThread(); + + if (std::this_thread::get_id() != thread_.get_id()) { + thread_.join(); + } else { + dwarning("%s: Starting thread stop from inside the reading thread itself", __func__); + } + + { + std::unique_lock<std::recursive_mutex> guard(internal_mutex_); + watched_shared_fds_.clear(); + } + + return 0; + } + + private: + // Make sure to call this with at least one file descriptor ready to be + // watched upon or the thread routine will return immediately + int tryStartThread() { + if (std::atomic_exchange(&running_, true)) { + return 0; // if already running + } + // set up the communication channel + if (android::base::socketCreatePair(¬ification_listen_fd_, ¬ification_write_fd_)) { + derror( + "%s:Unable to establish a communication channel to the reading " + "thread", + __func__); + return -1; + } + android::base::socketSetNonBlocking(notification_listen_fd_); + android::base::socketSetNonBlocking(notification_write_fd_); + + thread_ = std::thread([this]() { ThreadRoutine(); }); + if (!thread_.joinable()) { + derror("%s: Unable to start reading thread", __func__); + return -1; + } + return 0; + } + + int notifyThread() { + char buffer = '0'; + if (android::base::socketSend(notification_write_fd_, &buffer, 1) < 0) { + derror("%s: Unable to send message to reading thread", __func__); + return -1; + } + return 0; + } + + void setUpFileDescriptorSet(SocketWaiter* read_fds) { + // add comm channel to the set + read_fds->update(notification_listen_fd_, SocketWaiter::Event::kEventRead); + + // add watched FDs to the set + { + std::unique_lock<std::recursive_mutex> guard(internal_mutex_); + for (auto& fdp : watched_shared_fds_) { + read_fds->update(fdp.first, SocketWaiter::Event::kEventRead); + } + } + } + + // check the comm channel and read everything there + bool consumeThreadNotifications(SocketWaiter* read_fds) { + if (read_fds->pendingEventsFor(notification_listen_fd_)) { + char buffer[kNotificationBufferSize]; + while (HANDLE_EINTR(android::base::socketRecv(notification_listen_fd_, buffer, kNotificationBufferSize)) == + kNotificationBufferSize) { + } + return true; + } + return false; + } + + // check all file descriptors and call callbacks if necesary + void runAppropriateCallbacks(SocketWaiter* read_fds) { + // not a good idea to call a callback while holding the FD lock, + // nor to release the lock while traversing the map + std::vector<decltype(watched_shared_fds_)::value_type> fds; + std::unique_lock<std::recursive_mutex> guard(internal_mutex_); + for (auto& fdc : watched_shared_fds_) { + auto pending = read_fds->pendingEventsFor(fdc.first); + if (pending == SocketWaiter::kEventRead) { + fds.push_back(fdc); + } + } + + for (auto& p : fds) { + p.second(p.first); + } + + } + + void ThreadRoutine() { + auto read_fds = std::unique_ptr<SocketWaiter>(SocketWaiter::create()); + while (running_) { + read_fds->reset(); + setUpFileDescriptorSet(read_fds.get()); + + // wait until there is data available to read on some FD + int retval = read_fds->wait(std::numeric_limits<int64_t>::max()); + if (retval <= 0) { // there was some error or a timeout + derror( + "%s: There was an error while waiting for data on the file " + "descriptors: %s", + __func__, strerror(errno)); + continue; + } + + consumeThreadNotifications(read_fds.get()); + + // Do not read if there was a call to stop running + if (!running_) { + break; + } + + runAppropriateCallbacks(read_fds.get()); + } + } + + std::atomic_bool running_{false}; + std::thread thread_; + std::recursive_mutex internal_mutex_; + + + //android::base::SocketWaiter socket_waiter_; + std::map<int, ReadCallback> watched_shared_fds_; + + // A pair of FD to send information to the reading thread + int notification_listen_fd_{}; + int notification_write_fd_{}; +}; + +// Async task manager implementation +class AsyncManager::AsyncTaskManager { + public: + AsyncUserId GetNextUserId() { return lastUserId_++; } + + AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay, + const TaskCallback& callback) { + return scheduleTask(std::make_shared<Task>( + std::chrono::steady_clock::now() + delay, callback, user_id)); + } + + AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id, + std::chrono::milliseconds delay, + std::chrono::milliseconds period, + const TaskCallback& callback) { + return scheduleTask(std::make_shared<Task>( + std::chrono::steady_clock::now() + delay, period, callback, user_id)); + } + + bool CancelAsyncTask(AsyncTaskId async_task_id) { + // remove task from queue (and task id association) while holding lock + std::unique_lock<std::mutex> guard(internal_mutex_); + return cancel_task_with_lock_held(async_task_id); + } + + bool CancelAsyncTasksFromUser(AsyncUserId user_id) { + // remove task from queue (and task id association) while holding lock + std::unique_lock<std::mutex> guard(internal_mutex_); + if (tasks_by_user_id_.count(user_id) == 0) { + return false; + } + for (auto task : tasks_by_user_id_[user_id]) { + cancel_task_with_lock_held(task); + } + tasks_by_user_id_.erase(user_id); + return true; + } + + void Synchronize(const CriticalCallback& critical) { + std::unique_lock<std::mutex> guard(synchronization_mutex_); + critical(); + } + + AsyncTaskManager() = default; + AsyncTaskManager(const AsyncTaskManager&) = delete; + AsyncTaskManager& operator=(const AsyncTaskManager&) = delete; + + ~AsyncTaskManager() = default; + + int stopThread() { + { + std::unique_lock<std::mutex> guard(internal_mutex_); + tasks_by_id_.clear(); + task_queue_.clear(); + if (!running_) { + return 0; + } + running_ = false; + // notify the thread + internal_cond_var_.notify_one(); + } // release the lock before joining a thread that is likely waiting for it + if (std::this_thread::get_id() != thread_.get_id()) { + thread_.join(); + } else { + dwarning("%s: Starting thread stop from inside the task thread itself", __func__); + } + return 0; + } + + private: + // Holds the data for each task + class Task { + public: + Task(std::chrono::steady_clock::time_point time, + std::chrono::milliseconds period, const TaskCallback& callback, + AsyncUserId user) + : time(time), + periodic(true), + period(period), + callback(callback), + task_id(kInvalidTaskId), + user_id(user) {} + Task(std::chrono::steady_clock::time_point time, + const TaskCallback& callback, AsyncUserId user) + : time(time), + periodic(false), + callback(callback), + task_id(kInvalidTaskId), + user_id(user) {} + + // Operators needed to be in a collection + bool operator<(const Task& another) const { + return std::make_pair(time, task_id) < std::make_pair(another.time, another.task_id); + } + + bool isPeriodic() const { + return periodic; + } + + // These fields should no longer be public if the class ever becomes + // public or gets more complex + std::chrono::steady_clock::time_point time; + bool periodic; + std::chrono::milliseconds period{}; + std::mutex in_callback; // Taken when the callback is active + TaskCallback callback; + AsyncTaskId task_id; + AsyncUserId user_id; + }; + + // A comparator class to put shared pointers to tasks in an ordered set + struct task_p_comparator { + bool operator()(const std::shared_ptr<Task>& t1, const std::shared_ptr<Task>& t2) const { + return *t1 < *t2; + } + }; + + bool cancel_task_with_lock_held(AsyncTaskId async_task_id) { + if (tasks_by_id_.count(async_task_id) == 0) { + return false; + } + + // Now make sure we are not running this task. + // 2 cases + // - This is called from thread_, this means a scheduled task is actually + // unregistering. + // - Another thread is calling us, let's make sure the task is not active. + if (thread_.get_id() != std::this_thread::get_id()) { + auto task = tasks_by_id_[async_task_id]; + const std::lock_guard<std::mutex> lock(task->in_callback); + task_queue_.erase(task); + tasks_by_id_.erase(async_task_id); + } else { + task_queue_.erase(tasks_by_id_[async_task_id]); + tasks_by_id_.erase(async_task_id); + } + + return true; + } + + AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) { + { + std::unique_lock<std::mutex> guard(internal_mutex_); + // no more room for new tasks, we need a larger type for IDs + if (tasks_by_id_.size() == kMaxTaskId) // TODO potentially type unsafe + return kInvalidTaskId; + do { + lastTaskId_ = NextAsyncTaskId(lastTaskId_); + } while (isTaskIdInUse(lastTaskId_)); + task->task_id = lastTaskId_; + // add task to the queue and map + tasks_by_id_[lastTaskId_] = task; + tasks_by_user_id_[task->user_id].insert(task->task_id); + task_queue_.insert(task); + } + // start thread if necessary + int started = tryStartThread(); + if (started != 0) { + derror("%s: Unable to start thread", __func__); + return kInvalidTaskId; + } + // notify the thread so that it knows of the new task + internal_cond_var_.notify_one(); + // return task id + return task->task_id; + } + + bool isTaskIdInUse(const AsyncTaskId& task_id) const { + return tasks_by_id_.count(task_id) != 0; + } + + int tryStartThread() { + // need the lock because of the running flag and the cond var + std::unique_lock<std::mutex> guard(internal_mutex_); + // check that the thread is not yet running + if (running_) { + return 0; + } + // start the thread + running_ = true; + thread_ = std::thread([this]() { ThreadRoutine(); }); + if (!thread_.joinable()) { + derror("%s: Unable to start task thread", __func__); + return -1; + } + return 0; + } + + void ThreadRoutine() { + while (running_) { + TaskCallback callback; + std::shared_ptr<Task> task_p; + bool run_it = false; + { + std::unique_lock<std::mutex> guard(internal_mutex_); + if (!task_queue_.empty()) { + task_p = *(task_queue_.begin()); + if (task_p->time < std::chrono::steady_clock::now()) { + run_it = true; + callback = task_p->callback; + task_queue_.erase(task_p); // need to remove and add again if + // periodic to update order + if (task_p->isPeriodic()) { + task_p->time += task_p->period; + task_queue_.insert(task_p); + } else { + tasks_by_user_id_[task_p->user_id].erase(task_p->task_id); + tasks_by_id_.erase(task_p->task_id); + } + } + } + } + if (run_it) { + const std::lock_guard<std::mutex> lock(task_p->in_callback); + Synchronize(callback); + } + { + std::unique_lock<std::mutex> guard(internal_mutex_); + // check for termination right before waiting + if (!running_) break; + // wait until time for the next task (if any) + if (task_queue_.size() > 0) { + // Make a copy of the time_point because wait_until takes a reference + // to it and may read it after waiting, by which time the task may + // have been freed (e.g. via CancelAsyncTask). + std::chrono::steady_clock::time_point time = + (*task_queue_.begin())->time; + internal_cond_var_.wait_until(guard, time); + } else { + internal_cond_var_.wait(guard); + } + } + } + } + + bool running_ = false; + std::thread thread_; + std::mutex internal_mutex_; + std::mutex synchronization_mutex_; + std::condition_variable internal_cond_var_; + + AsyncTaskId lastTaskId_ = kInvalidTaskId; + AsyncUserId lastUserId_{1}; + std::map<AsyncTaskId, std::shared_ptr<Task> > tasks_by_id_; + std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_; + std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_; +}; + +// Async Manager Implementation: +AsyncManager::AsyncManager() : fdWatcher_p_(new AsyncFdWatcher()), taskManager_p_(new AsyncTaskManager()) {} + +AsyncManager::~AsyncManager() { + // Make sure the threads are stopped before destroying the object. + // The threads need to be stopped here and not in each internal class' + // destructor because unique_ptr's reset() first assigns nullptr to the + // pointer and only then calls the destructor, so any callback running + // on these threads would dereference a null pointer if they called a member + // function of this class. + fdWatcher_p_->stopThread(); + taskManager_p_->stopThread(); +} + +int AsyncManager::WatchFdForNonBlockingReads(int file_descriptor, const ReadCallback& on_read_fd_ready_callback) { + return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor, on_read_fd_ready_callback); +} + +void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) { + fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor); +} + +AsyncUserId AsyncManager::GetNextUserId() { + return taskManager_p_->GetNextUserId(); +} + +AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id, + std::chrono::milliseconds delay, + const TaskCallback& callback) { + return taskManager_p_->ExecAsync(user_id, delay, callback); +} + +AsyncTaskId AsyncManager::ExecAsyncPeriodically( + AsyncUserId user_id, std::chrono::milliseconds delay, + std::chrono::milliseconds period, const TaskCallback& callback) { + return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period, + callback); +} + +bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) { + return taskManager_p_->CancelAsyncTask(async_task_id); +} + +bool AsyncManager::CancelAsyncTasksFromUser( + rootcanal::AsyncUserId user_id) { + return taskManager_p_->CancelAsyncTasksFromUser(user_id); +} + +void AsyncManager::Synchronize(const CriticalCallback& critical) { + taskManager_p_->Synchronize(critical); +} +} // namespace rootcanal |