summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Hung <hunga@google.com>2024-02-29 17:39:27 -0800
committerAndy Hung <hunga@google.com>2024-03-13 12:30:46 -0700
commit982b1d517247d5bf560ae0c967be6e6dd3d64834 (patch)
tree8f81cfa09cf4b5f044515bf19ca2effb65c189c1
parentc71b866ae47a656e33d3370cd89b46141be47c19 (diff)
downloadmedia-982b1d517247d5bf560ae0c967be6e6dd3d64834.tar.gz
audio_utils: Add CommandThread for command serialization
Test: atest audio_commandthread_tests Bug: 326031258 Merged-In: I41ef7728d2bddb4a569fb44eda82a6152e01f1b3 Change-Id: I41ef7728d2bddb4a569fb44eda82a6152e01f1b3
-rw-r--r--audio_utils/include/audio_utils/CommandThread.h117
-rw-r--r--audio_utils/tests/Android.bp18
-rw-r--r--audio_utils/tests/audio_commandthread_tests.cpp78
3 files changed, 213 insertions, 0 deletions
diff --git a/audio_utils/include/audio_utils/CommandThread.h b/audio_utils/include/audio_utils/CommandThread.h
new file mode 100644
index 00000000..b82188f4
--- /dev/null
+++ b/audio_utils/include/audio_utils/CommandThread.h
@@ -0,0 +1,117 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <deque>
+#include <mutex>
+#include <thread>
+#include <utils/Mutex.h> // has thread safety annotations
+
+namespace android::audio_utils {
+
+/**
+ * CommandThread is used for serial execution of commands
+ * on a single worker thread.
+ *
+ * This class is thread-safe.
+ */
+
+class CommandThread {
+public:
+ CommandThread() {
+ // threadLoop() should be started after the class is initialized.
+ mThread = std::thread([this](){this->threadLoop();});
+ }
+
+ ~CommandThread() {
+ quit();
+ mThread.join();
+ }
+
+ /**
+ * Add a command to the command queue.
+ *
+ * If the func is a closure containing references, suggest using shared_ptr
+ * instead to maintain proper lifetime.
+ *
+ * @param name for dump() purposes.
+ * @param func command to execute
+ */
+ void add(std::string_view name, std::function<void()>&& func) {
+ std::lock_guard lg(mMutex);
+ if (mQuit) return;
+ mCommands.emplace_back(name, std::move(func));
+ if (mCommands.size() == 1) mConditionVariable.notify_one();
+ }
+
+ /**
+ * Returns the string of commands, separated by newlines.
+ */
+ std::string dump() const {
+ std::string result;
+ std::lock_guard lg(mMutex);
+ for (const auto &p : mCommands) {
+ result.append(p.first).append("\n");
+ }
+ return result;
+ }
+
+ /**
+ * Quits the command thread and empties the command queue.
+ */
+ void quit() {
+ std::lock_guard lg(mMutex);
+ if (mQuit) return;
+ mQuit = true;
+ mCommands.clear();
+ mConditionVariable.notify_one();
+ }
+
+ /**
+ * Returns the number of commands on the queue.
+ */
+ size_t size() const {
+ std::lock_guard lg(mMutex);
+ return mCommands.size();
+ }
+
+private:
+ std::thread mThread;
+ mutable std::mutex mMutex;
+ std::condition_variable mConditionVariable GUARDED_BY(mMutex);
+ std::deque<std::pair<std::string, std::function<void()>>> mCommands GUARDED_BY(mMutex);
+ bool mQuit GUARDED_BY(mMutex) = false;
+
+ void threadLoop() NO_THREAD_SAFETY_ANALYSIS {
+ std::unique_lock ul(mMutex);
+ while (!mQuit) {
+ if (!mCommands.empty()) {
+ auto name = std::move(mCommands.front().first);
+ auto func = std::move(mCommands.front().second);
+ mCommands.pop_front();
+ ul.unlock();
+ // ALOGD("%s: executing %s", __func__, name.c_str());
+ func();
+ ul.lock();
+ continue;
+ }
+ mConditionVariable.wait(ul);
+ }
+ }
+};
+
+} // namespace android::audio_utils
diff --git a/audio_utils/tests/Android.bp b/audio_utils/tests/Android.bp
index fbf09617..cd063f6a 100644
--- a/audio_utils/tests/Android.bp
+++ b/audio_utils/tests/Android.bp
@@ -8,6 +8,24 @@ package {
default_applicable_licenses: ["system_media_license"],
}
+cc_test {
+ name: "audio_commandthread_tests",
+ host_supported: true,
+ srcs: ["audio_commandthread_tests.cpp"],
+ shared_libs: [
+ "libbase",
+ "libbinder",
+ ],
+ header_libs: [
+ "libaudioutils_headers",
+ ],
+ cflags: [
+ "-Wall",
+ "-Werror",
+ "-Wthread-safety",
+ ],
+}
+
cc_defaults {
name: "audio_math_test_defaults",
host_supported: true,
diff --git a/audio_utils/tests/audio_commandthread_tests.cpp b/audio_utils/tests/audio_commandthread_tests.cpp
new file mode 100644
index 00000000..5bafb143
--- /dev/null
+++ b/audio_utils/tests/audio_commandthread_tests.cpp
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <audio_utils/CommandThread.h>
+
+#include <gtest/gtest.h>
+
+TEST(commandthread, basic) {
+ android::audio_utils::CommandThread ct;
+
+ ct.add("one", [](){});
+ ct.add("two", [](){});
+ ct.quit();
+ EXPECT_EQ(0, ct.size());
+ EXPECT_EQ("", ct.dump());
+}
+
+TEST(commandthread, full) {
+ std::mutex m;
+ std::condition_variable cv;
+ int stage = 0;
+ android::audio_utils::CommandThread ct;
+
+ // load the CommandThread queue.
+ ct.add("one", [&]{
+ std::unique_lock ul(m);
+ stage = 1;
+ cv.notify_one();
+ cv.wait(ul, [&] { return stage == 2; });
+ });
+ ct.add("two", [&]{
+ std::unique_lock ul(m);
+ stage = 3;
+ cv.notify_one();
+ cv.wait(ul, [&] { return stage == 4; });
+ });
+ ct.add("three", [&]{
+ std::unique_lock ul(m);
+ stage = 5;
+ cv.notify_one();
+ cv.wait(ul, [&] { return stage == 6; });
+ });
+
+ std::unique_lock ul(m);
+
+ // step through each command in the queue.
+
+ cv.wait(ul, [&] { return stage == 1; });
+ EXPECT_EQ(2, ct.size());
+ EXPECT_EQ("two\nthree\n", ct.dump());
+ stage = 2;
+ cv.notify_one();
+
+ cv.wait(ul, [&] { return stage == 3; });
+ EXPECT_EQ(1, ct.size());
+ EXPECT_EQ("three\n", ct.dump());
+ stage = 4;
+ cv.notify_one();
+
+ cv.wait(ul, [&] { return stage == 5; });
+ EXPECT_EQ(0, ct.size());
+ EXPECT_EQ("", ct.dump());
+ stage = 6;
+ cv.notify_one();
+}