diff options
author | Andy Hung <hunga@google.com> | 2024-02-29 17:39:27 -0800 |
---|---|---|
committer | Andy Hung <hunga@google.com> | 2024-03-13 12:30:46 -0700 |
commit | 982b1d517247d5bf560ae0c967be6e6dd3d64834 (patch) | |
tree | 8f81cfa09cf4b5f044515bf19ca2effb65c189c1 | |
parent | c71b866ae47a656e33d3370cd89b46141be47c19 (diff) | |
download | media-982b1d517247d5bf560ae0c967be6e6dd3d64834.tar.gz |
audio_utils: Add CommandThread for command serialization
Test: atest audio_commandthread_tests
Bug: 326031258
Merged-In: I41ef7728d2bddb4a569fb44eda82a6152e01f1b3
Change-Id: I41ef7728d2bddb4a569fb44eda82a6152e01f1b3
-rw-r--r-- | audio_utils/include/audio_utils/CommandThread.h | 117 | ||||
-rw-r--r-- | audio_utils/tests/Android.bp | 18 | ||||
-rw-r--r-- | audio_utils/tests/audio_commandthread_tests.cpp | 78 |
3 files changed, 213 insertions, 0 deletions
diff --git a/audio_utils/include/audio_utils/CommandThread.h b/audio_utils/include/audio_utils/CommandThread.h new file mode 100644 index 00000000..b82188f4 --- /dev/null +++ b/audio_utils/include/audio_utils/CommandThread.h @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2024 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <deque> +#include <mutex> +#include <thread> +#include <utils/Mutex.h> // has thread safety annotations + +namespace android::audio_utils { + +/** + * CommandThread is used for serial execution of commands + * on a single worker thread. + * + * This class is thread-safe. + */ + +class CommandThread { +public: + CommandThread() { + // threadLoop() should be started after the class is initialized. + mThread = std::thread([this](){this->threadLoop();}); + } + + ~CommandThread() { + quit(); + mThread.join(); + } + + /** + * Add a command to the command queue. + * + * If the func is a closure containing references, suggest using shared_ptr + * instead to maintain proper lifetime. + * + * @param name for dump() purposes. + * @param func command to execute + */ + void add(std::string_view name, std::function<void()>&& func) { + std::lock_guard lg(mMutex); + if (mQuit) return; + mCommands.emplace_back(name, std::move(func)); + if (mCommands.size() == 1) mConditionVariable.notify_one(); + } + + /** + * Returns the string of commands, separated by newlines. + */ + std::string dump() const { + std::string result; + std::lock_guard lg(mMutex); + for (const auto &p : mCommands) { + result.append(p.first).append("\n"); + } + return result; + } + + /** + * Quits the command thread and empties the command queue. + */ + void quit() { + std::lock_guard lg(mMutex); + if (mQuit) return; + mQuit = true; + mCommands.clear(); + mConditionVariable.notify_one(); + } + + /** + * Returns the number of commands on the queue. + */ + size_t size() const { + std::lock_guard lg(mMutex); + return mCommands.size(); + } + +private: + std::thread mThread; + mutable std::mutex mMutex; + std::condition_variable mConditionVariable GUARDED_BY(mMutex); + std::deque<std::pair<std::string, std::function<void()>>> mCommands GUARDED_BY(mMutex); + bool mQuit GUARDED_BY(mMutex) = false; + + void threadLoop() NO_THREAD_SAFETY_ANALYSIS { + std::unique_lock ul(mMutex); + while (!mQuit) { + if (!mCommands.empty()) { + auto name = std::move(mCommands.front().first); + auto func = std::move(mCommands.front().second); + mCommands.pop_front(); + ul.unlock(); + // ALOGD("%s: executing %s", __func__, name.c_str()); + func(); + ul.lock(); + continue; + } + mConditionVariable.wait(ul); + } + } +}; + +} // namespace android::audio_utils diff --git a/audio_utils/tests/Android.bp b/audio_utils/tests/Android.bp index fbf09617..cd063f6a 100644 --- a/audio_utils/tests/Android.bp +++ b/audio_utils/tests/Android.bp @@ -8,6 +8,24 @@ package { default_applicable_licenses: ["system_media_license"], } +cc_test { + name: "audio_commandthread_tests", + host_supported: true, + srcs: ["audio_commandthread_tests.cpp"], + shared_libs: [ + "libbase", + "libbinder", + ], + header_libs: [ + "libaudioutils_headers", + ], + cflags: [ + "-Wall", + "-Werror", + "-Wthread-safety", + ], +} + cc_defaults { name: "audio_math_test_defaults", host_supported: true, diff --git a/audio_utils/tests/audio_commandthread_tests.cpp b/audio_utils/tests/audio_commandthread_tests.cpp new file mode 100644 index 00000000..5bafb143 --- /dev/null +++ b/audio_utils/tests/audio_commandthread_tests.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2024 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <audio_utils/CommandThread.h> + +#include <gtest/gtest.h> + +TEST(commandthread, basic) { + android::audio_utils::CommandThread ct; + + ct.add("one", [](){}); + ct.add("two", [](){}); + ct.quit(); + EXPECT_EQ(0, ct.size()); + EXPECT_EQ("", ct.dump()); +} + +TEST(commandthread, full) { + std::mutex m; + std::condition_variable cv; + int stage = 0; + android::audio_utils::CommandThread ct; + + // load the CommandThread queue. + ct.add("one", [&]{ + std::unique_lock ul(m); + stage = 1; + cv.notify_one(); + cv.wait(ul, [&] { return stage == 2; }); + }); + ct.add("two", [&]{ + std::unique_lock ul(m); + stage = 3; + cv.notify_one(); + cv.wait(ul, [&] { return stage == 4; }); + }); + ct.add("three", [&]{ + std::unique_lock ul(m); + stage = 5; + cv.notify_one(); + cv.wait(ul, [&] { return stage == 6; }); + }); + + std::unique_lock ul(m); + + // step through each command in the queue. + + cv.wait(ul, [&] { return stage == 1; }); + EXPECT_EQ(2, ct.size()); + EXPECT_EQ("two\nthree\n", ct.dump()); + stage = 2; + cv.notify_one(); + + cv.wait(ul, [&] { return stage == 3; }); + EXPECT_EQ(1, ct.size()); + EXPECT_EQ("three\n", ct.dump()); + stage = 4; + cv.notify_one(); + + cv.wait(ul, [&] { return stage == 5; }); + EXPECT_EQ(0, ct.size()); + EXPECT_EQ("", ct.dump()); + stage = 6; + cv.notify_one(); +} |