summaryrefslogtreecommitdiff
path: root/audio_proxy
diff options
context:
space:
mode:
authorYuchen Liu <yucliu@google.com>2021-08-18 07:35:06 +0000
committerAndroid (Google) Code Review <android-gerrit@google.com>2021-08-18 07:35:06 +0000
commit843bc1a59e96280e9a3e183eca5062f4d9b9e7b5 (patch)
treec48298b8e4050031870a0143245a089ac575a854 /audio_proxy
parent2a5625391aba65c399d6057ba74863edff49d7c5 (diff)
parent5ddd863d670dbd16e07197c5d4d5baf1a3c8b2e9 (diff)
downloadatv-843bc1a59e96280e9a3e183eca5062f4d9b9e7b5.tar.gz
Merge "[AudioProxy] Write data into IOutputStream"
Diffstat (limited to 'audio_proxy')
-rw-r--r--audio_proxy/service/Android.bp31
-rw-r--r--audio_proxy/service/BusOutputStream.cpp14
-rw-r--r--audio_proxy/service/BusOutputStream.h1
-rw-r--r--audio_proxy/service/BusStreamProvider.cpp20
-rw-r--r--audio_proxy/service/RemoteBusOutputStream.cpp159
-rw-r--r--audio_proxy/service/RemoteBusOutputStream.h71
-rw-r--r--audio_proxy/service/RingBufferUtil.cpp83
-rw-r--r--audio_proxy/service/RingBufferUtil.h28
-rw-r--r--audio_proxy/service/RingBufferUtilTest.cpp80
-rw-r--r--audio_proxy/service/StreamOutImpl.cpp198
-rw-r--r--audio_proxy/service/StreamOutImpl.h13
-rw-r--r--audio_proxy/service/WriteThread.cpp214
-rw-r--r--audio_proxy/service/WriteThread.h113
13 files changed, 994 insertions, 31 deletions
diff --git a/audio_proxy/service/Android.bp b/audio_proxy/service/Android.bp
index b231adf..0e63ff9 100644
--- a/audio_proxy/service/Android.bp
+++ b/audio_proxy/service/Android.bp
@@ -21,6 +21,18 @@ package {
default_applicable_licenses: ["device_google_atv_license"],
}
+cc_library_static {
+ name: "audio_proxy_service_util",
+ vendor_available: true,
+ host_supported: true,
+ srcs: [
+ "RingBufferUtil.cpp",
+ ],
+ shared_libs: [
+ "libbase",
+ ],
+}
+
cc_binary {
name: "device.google.atv.audio_proxy@5.1-service",
vendor: true,
@@ -33,7 +45,9 @@ cc_binary {
"DeviceImpl.cpp",
"DevicesFactoryImpl.cpp",
"DummyBusOutputStream.cpp",
+ "RemoteBusOutputStream.cpp",
"StreamOutImpl.cpp",
+ "WriteThread.cpp",
"main.cpp",
],
@@ -55,6 +69,10 @@ cc_binary {
"libutils",
],
+ static_libs: [
+ "audio_proxy_service_util",
+ ],
+
header_libs: [
"libaudio_system_headers",
],
@@ -70,3 +88,16 @@ cc_binary {
"-Wno-unused-parameter",
],
}
+
+cc_test {
+ name: "audio_proxy_service_util_test",
+ host_supported: true,
+
+ srcs: [
+ "RingBufferUtilTest.cpp",
+ ],
+ static_libs: [
+ "audio_proxy_service_util",
+ "libgtest",
+ ],
+} \ No newline at end of file
diff --git a/audio_proxy/service/BusOutputStream.cpp b/audio_proxy/service/BusOutputStream.cpp
index 1998e69..40ac8d4 100644
--- a/audio_proxy/service/BusOutputStream.cpp
+++ b/audio_proxy/service/BusOutputStream.cpp
@@ -15,6 +15,7 @@
#include "BusOutputStream.h"
#include <android-base/logging.h>
+#include <system/audio.h>
namespace audio_proxy::service {
@@ -27,6 +28,19 @@ const std::string& BusOutputStream::getAddress() const { return mAddress; }
const AidlAudioConfig& BusOutputStream::getConfig() const { return mConfig; }
int32_t BusOutputStream::getFlags() const { return mFlags; }
+int BusOutputStream::getFrameSize() const {
+ audio_format_t format = static_cast<audio_format_t>(mConfig.format);
+
+ if (!audio_has_proportional_frames(format)) {
+ return sizeof(int8_t);
+ }
+
+ size_t channelSampleSize = audio_bytes_per_sample(format);
+ return audio_channel_count_from_out_mask(
+ static_cast<audio_channel_mask_t>(mConfig.channelMask)) *
+ channelSampleSize;
+}
+
bool BusOutputStream::prepareForWriting(uint32_t frameSize,
uint32_t frameCount) {
DCHECK_EQ(mWritingFrameSize, 0);
diff --git a/audio_proxy/service/BusOutputStream.h b/audio_proxy/service/BusOutputStream.h
index e820ca8..f9fb930 100644
--- a/audio_proxy/service/BusOutputStream.h
+++ b/audio_proxy/service/BusOutputStream.h
@@ -30,6 +30,7 @@ class BusOutputStream {
const std::string& getAddress() const;
const AidlAudioConfig& getConfig() const;
int32_t getFlags() const;
+ int getFrameSize() const;
bool prepareForWriting(uint32_t frameSize, uint32_t frameCount);
uint32_t getWritingFrameSize() const;
diff --git a/audio_proxy/service/BusStreamProvider.cpp b/audio_proxy/service/BusStreamProvider.cpp
index 7fac909..26ae460 100644
--- a/audio_proxy/service/BusStreamProvider.cpp
+++ b/audio_proxy/service/BusStreamProvider.cpp
@@ -19,6 +19,9 @@
#include <algorithm>
#include "DummyBusOutputStream.h"
+#include "RemoteBusOutputStream.h"
+
+using aidl::device::google::atv::audio_proxy::IOutputStream;
namespace audio_proxy::service {
@@ -58,9 +61,20 @@ void BusStreamProvider::onStreamOutCreated(wp<StreamOutImpl> stream) {
std::shared_ptr<BusOutputStream> BusStreamProvider::openOutputStream_Locked(
const std::string& address, const AidlAudioConfig& config, int32_t flags) {
- // TODO(yucliu): Return AIDL interface based RemoteBusOutputStream when stream
- // provider is available.
- return std::make_shared<DummyBusOutputStream>(address, config, flags);
+ if (!mStreamProvider) {
+ return std::make_shared<DummyBusOutputStream>(address, config, flags);
+ }
+
+ std::shared_ptr<IOutputStream> stream;
+ ndk::ScopedAStatus status =
+ mStreamProvider->openOutputStream(address, config, flags, &stream);
+ if (!status.isOk() || !stream) {
+ LOG(ERROR) << "Failed to open output stream, status " << status.getStatus();
+ return std::make_shared<DummyBusOutputStream>(address, config, flags);
+ }
+
+ return std::make_shared<RemoteBusOutputStream>(std::move(stream), address,
+ config, flags);
}
void BusStreamProvider::cleanStreamOutList_Locked() {
diff --git a/audio_proxy/service/RemoteBusOutputStream.cpp b/audio_proxy/service/RemoteBusOutputStream.cpp
new file mode 100644
index 0000000..3b27b79
--- /dev/null
+++ b/audio_proxy/service/RemoteBusOutputStream.cpp
@@ -0,0 +1,159 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "RemoteBusOutputStream.h"
+
+#include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h>
+#include <android-base/logging.h>
+
+#include "RingBufferUtil.h"
+
+using aidl::device::google::atv::audio_proxy::MessageQueueFlag;
+using android::status_t;
+
+namespace audio_proxy {
+namespace service {
+namespace {
+
+// Time out for FMQ read in ns -- 1s.
+constexpr int64_t kFmqReadTimeoutNs = 1'000'000'000;
+
+void deleteEventFlag(EventFlag* obj) {
+ if (!obj) {
+ return;
+ }
+
+ status_t status = EventFlag::deleteEventFlag(&obj);
+ if (status != android::OK) {
+ LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status);
+ }
+}
+
+} // namespace
+
+RemoteBusOutputStream::RemoteBusOutputStream(
+ std::shared_ptr<IOutputStream> stream, const std::string& address,
+ const AidlAudioConfig& config, int32_t flags)
+ : BusOutputStream(address, config, flags),
+ mStream(std::move(stream)),
+ mEventFlag(nullptr, deleteEventFlag) {}
+RemoteBusOutputStream::~RemoteBusOutputStream() = default;
+
+bool RemoteBusOutputStream::standby() { return mStream->standby().isOk(); }
+
+bool RemoteBusOutputStream::pause() { return mStream->pause().isOk(); }
+
+bool RemoteBusOutputStream::resume() { return mStream->resume().isOk(); }
+
+bool RemoteBusOutputStream::drain(AidlAudioDrain drain) {
+ return mStream->drain(drain).isOk();
+}
+
+bool RemoteBusOutputStream::flush() { return mStream->flush().isOk(); }
+
+bool RemoteBusOutputStream::close() { return mStream->close().isOk(); }
+
+bool RemoteBusOutputStream::setVolume(float left, float right) {
+ return mStream->setVolume(left, right).isOk();
+}
+
+size_t RemoteBusOutputStream::availableToWrite() {
+ return mDataMQ->availableToWrite();
+}
+
+AidlWriteStatus RemoteBusOutputStream::writeRingBuffer(const uint8_t* firstMem,
+ size_t firstLength,
+ const uint8_t* secondMem,
+ size_t secondLength) {
+ DCHECK(mDataMQ);
+ DCHECK(mStatusMQ);
+ DCHECK(mEventFlag);
+ AidlWriteStatus status;
+ DataMQ::MemTransaction tx;
+ if (!mDataMQ->beginWrite(firstLength + secondLength, &tx)) {
+ LOG(ERROR) << "Failed to begin write.";
+ return status;
+ }
+
+ const DataMQ::MemRegion& firstRegion = tx.getFirstRegion();
+ const DataMQ::MemRegion& secondRegion = tx.getSecondRegion();
+
+ copyRingBuffer(firstRegion.getAddress(), firstRegion.getLength(),
+ secondRegion.getAddress(), secondRegion.getLength(),
+ reinterpret_cast<const int8_t*>(firstMem), firstLength,
+ reinterpret_cast<const int8_t*>(secondMem), secondLength);
+ if (!mDataMQ->commitWrite(firstLength + secondLength)) {
+ LOG(ERROR) << "Failed to commit write.";
+ return status;
+ }
+
+ mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY));
+
+ // readNotification is used to "wake" after successful read, hence we don't
+ // need it. writeNotification is used to "wait" for the other end to write
+ // enough data.
+ if (!mStatusMQ->readBlocking(
+ &status, 1 /* count */, 0 /* readNotification */,
+ static_cast<uint32_t>(
+ MessageQueueFlag::NOT_FULL) /* writeNotification */,
+ kFmqReadTimeoutNs, mEventFlag.get())) {
+ LOG(ERROR) << "Failed to read status!";
+ return status;
+ }
+
+ return status;
+}
+
+bool RemoteBusOutputStream::prepareForWritingImpl(uint32_t frameSize,
+ uint32_t frameCount) {
+ DataMQDesc dataMQDesc;
+ StatusMQDesc statusMQDesc;
+ ndk::ScopedAStatus status = mStream->prepareForWriting(
+ frameSize, frameCount, &dataMQDesc, &statusMQDesc);
+ if (!status.isOk()) {
+ LOG(ERROR) << "prepareForWriting fails.";
+ return false;
+ }
+
+ auto dataMQ = std::make_unique<DataMQ>(dataMQDesc);
+ if (!dataMQ->isValid()) {
+ LOG(ERROR) << "invalid data mq.";
+ return false;
+ }
+
+ EventFlag* rawEventFlag = nullptr;
+ status_t eventFlagStatus =
+ EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
+ std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
+ deleteEventFlag);
+ if (eventFlagStatus != android::OK || !eventFlag) {
+ LOG(ERROR) << "failed creating event flag for data MQ: "
+ << strerror(-eventFlagStatus);
+ return false;
+ }
+
+ auto statusMQ = std::make_unique<StatusMQ>(statusMQDesc);
+ if (!statusMQ->isValid()) {
+ LOG(ERROR) << "invalid status mq.";
+ return false;
+ }
+
+ mDataMQ = std::move(dataMQ);
+ mStatusMQ = std::move(statusMQ);
+ mEventFlag = std::move(eventFlag);
+ return true;
+}
+
+} // namespace service
+} // namespace audio_proxy \ No newline at end of file
diff --git a/audio_proxy/service/RemoteBusOutputStream.h b/audio_proxy/service/RemoteBusOutputStream.h
new file mode 100644
index 0000000..6f663cb
--- /dev/null
+++ b/audio_proxy/service/RemoteBusOutputStream.h
@@ -0,0 +1,71 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <aidl/device/google/atv/audio_proxy/IOutputStream.h>
+#include <fmq/AidlMessageQueue.h>
+#include <fmq/EventFlag.h>
+
+#include "BusOutputStream.h"
+
+namespace audio_proxy {
+namespace service {
+
+using aidl::android::hardware::common::fmq::MQDescriptor;
+using aidl::android::hardware::common::fmq::SynchronizedReadWrite;
+using aidl::device::google::atv::audio_proxy::IOutputStream;
+using android::AidlMessageQueue;
+using android::hardware::EventFlag;
+
+class RemoteBusOutputStream : public BusOutputStream {
+ public:
+ RemoteBusOutputStream(std::shared_ptr<IOutputStream> stream,
+ const std::string& address,
+ const AidlAudioConfig& config, int32_t flags);
+ ~RemoteBusOutputStream() override;
+
+ bool standby() override;
+ bool pause() override;
+ bool resume() override;
+ bool drain(AidlAudioDrain drain) override;
+ bool flush() override;
+ bool close() override;
+ bool setVolume(float left, float right) override;
+
+ size_t availableToWrite() override;
+ AidlWriteStatus writeRingBuffer(const uint8_t* firstMem, size_t firstLength,
+ const uint8_t* secondMem,
+ size_t secondLength) override;
+
+ protected:
+ bool prepareForWritingImpl(uint32_t frameSize, uint32_t frameCount) override;
+
+ private:
+ using DataMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
+ using DataMQDesc = MQDescriptor<int8_t, SynchronizedReadWrite>;
+ using StatusMQ = AidlMessageQueue<AidlWriteStatus, SynchronizedReadWrite>;
+ using StatusMQDesc = MQDescriptor<AidlWriteStatus, SynchronizedReadWrite>;
+
+ typedef void (*EventFlagDeleter)(EventFlag*);
+
+ std::shared_ptr<IOutputStream> mStream;
+
+ std::unique_ptr<DataMQ> mDataMQ;
+ std::unique_ptr<StatusMQ> mStatusMQ;
+ std::unique_ptr<EventFlag, EventFlagDeleter> mEventFlag;
+};
+
+} // namespace service
+} // namespace audio_proxy \ No newline at end of file
diff --git a/audio_proxy/service/RingBufferUtil.cpp b/audio_proxy/service/RingBufferUtil.cpp
new file mode 100644
index 0000000..dbe80f4
--- /dev/null
+++ b/audio_proxy/service/RingBufferUtil.cpp
@@ -0,0 +1,83 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "RingBufferUtil.h"
+
+#include <android-base/logging.h>
+
+namespace audio_proxy::service {
+namespace {
+struct CopyDesc {
+ int8_t* dst = nullptr;
+ const int8_t* src = nullptr;
+ size_t len = 0;
+};
+} // namespace
+
+void copyRingBuffer(int8_t* dstBuf1, size_t dstLen1, int8_t* dstBuf2,
+ size_t dstLen2, const int8_t* srcBuf1, size_t srcLen1,
+ const int8_t* srcBuf2, size_t srcLen2) {
+ // Caller should make sure the dst buffer has more space.
+ DCHECK_GE(dstLen1 + dstLen2, srcLen1 + srcLen2);
+
+ CopyDesc cp1 = {dstBuf1, srcBuf1, 0};
+ CopyDesc cp2;
+ CopyDesc cp3;
+
+ if (srcLen1 == dstLen1) {
+ cp1 = {dstBuf1, srcBuf1, srcLen1};
+
+ DCHECK_LE(srcLen2, dstLen2);
+ cp2 = {dstBuf2, srcBuf2, srcLen2};
+
+ // No need to copy more data, thus no need to update cp3.
+ } else if (srcLen1 < dstLen1) {
+ cp1 = {dstBuf1, srcBuf1, srcLen1};
+
+ if (dstLen1 <= srcLen1 + srcLen2) {
+ // Copy data into both dstBuf1 and dstBuf2.
+ cp2 = {cp1.dst + cp1.len, srcBuf2, dstLen1 - srcLen1};
+ cp3 = {dstBuf2, cp2.src + cp2.len, srcLen1 + srcLen2 - dstLen1};
+ } else {
+ // dstBuf1 is bigger enough to hold all the data from src.
+ cp2 = {cp1.dst + cp1.len, srcBuf2, srcLen2};
+
+ // No need to copy more data, thus no need to update cp3.
+ }
+ } else { // srcLen1 > dstLen1
+ cp1 = {dstBuf1, srcBuf1, dstLen1};
+ cp2 = {dstBuf2, cp1.src + cp1.len, srcLen1 - dstLen1};
+ cp3 = {cp2.dst + cp2.len, srcBuf2, srcLen2};
+ }
+
+ if (cp1.len > 0) {
+ DCHECK(cp1.dst);
+ DCHECK(cp1.src);
+ std::memcpy(cp1.dst, cp1.src, cp1.len);
+ }
+
+ if (cp2.len > 0) {
+ DCHECK(cp2.dst);
+ DCHECK(cp2.src);
+ std::memcpy(cp2.dst, cp2.src, cp2.len);
+ }
+
+ if (cp3.len > 0) {
+ DCHECK(cp3.dst);
+ DCHECK(cp3.src);
+ std::memcpy(cp3.dst, cp3.src, cp3.len);
+ }
+}
+
+} // namespace audio_proxy::service \ No newline at end of file
diff --git a/audio_proxy/service/RingBufferUtil.h b/audio_proxy/service/RingBufferUtil.h
new file mode 100644
index 0000000..a37f03e
--- /dev/null
+++ b/audio_proxy/service/RingBufferUtil.h
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+namespace audio_proxy::service {
+
+// Copy data from ring buffer "src" to ring buffer "dst". "dst" is guaranteed to
+// have more space than "src".
+void copyRingBuffer(int8_t* dstBuf1, size_t dstLen1, int8_t* dstBuf2,
+ size_t dstLen2, const int8_t* srcBuf1, size_t srcLen1,
+ const int8_t* srcBuf2, size_t srcLen2);
+
+} // namespace audio_proxy::service \ No newline at end of file
diff --git a/audio_proxy/service/RingBufferUtilTest.cpp b/audio_proxy/service/RingBufferUtilTest.cpp
new file mode 100644
index 0000000..59b431c
--- /dev/null
+++ b/audio_proxy/service/RingBufferUtilTest.cpp
@@ -0,0 +1,80 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gtest/gtest.h>
+
+#include "RingBufferUtil.h"
+
+using namespace audio_proxy::service;
+
+using Buffer = std::vector<int8_t>;
+
+class RingBufferUtilTest : public testing::TestWithParam<
+ std::tuple<Buffer, Buffer, Buffer, Buffer>> {};
+
+TEST_P(RingBufferUtilTest, DifferentBufferSize) {
+ auto [src1, src2, expectedDst1, expectedDst2] = GetParam();
+
+ Buffer dst1(expectedDst1.size());
+ Buffer dst2(expectedDst2.size());
+
+ copyRingBuffer(dst1.data(), dst1.size(), dst2.data(), dst2.size(),
+ src1.data(), src1.size(), src2.data(), src2.size());
+
+ EXPECT_EQ(dst1, expectedDst1);
+ EXPECT_EQ(dst2, expectedDst2);
+}
+
+// clang-format off
+const std::vector<std::tuple<Buffer, Buffer, Buffer, Buffer>> testParams = {
+ // The layout are the same for src and dst.
+ {
+ {0, 1, 2, 3, 4},
+ {5, 6, 7, 8, 9},
+ {0, 1, 2, 3, 4},
+ {5, 6, 7, 8, 9}
+ },
+ // src1 size is samller than dst1 size.
+ {
+ {0, 1, 2, 3},
+ {4, 5, 6, 7, 8, 9},
+ {0, 1, 2, 3, 4},
+ {5, 6, 7, 8, 9}
+ },
+ // src2 size is larger than dst1 size.
+ {
+ {0, 1, 2, 3, 4, 5},
+ {6, 7, 8, 9},
+ {0, 1, 2, 3, 4},
+ {5, 6, 7, 8, 9}
+ },
+ // dst1 size is larger enough to hold all the src data.
+ {
+ {0, 1, 2, 3, 4},
+ {5, 6, 7, 8, 9},
+ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0},
+ {0, 0, 0, 0, 0}
+ },
+ // Empty src
+ {{}, {}, {}, {}}
+};
+// clang-format off
+
+INSTANTIATE_TEST_SUITE_P(RingBufferUtilTestSuite, RingBufferUtilTest,
+ testing::ValuesIn(testParams));
+
+TEST(RingBufferUtilTest, CopyNullptr) {
+ // Test should not crash.
+ copyRingBuffer(nullptr, 0, nullptr, 0, nullptr, 0, nullptr, 0);
+}
diff --git a/audio_proxy/service/StreamOutImpl.cpp b/audio_proxy/service/StreamOutImpl.cpp
index 0b65707..e317d4e 100644
--- a/audio_proxy/service/StreamOutImpl.cpp
+++ b/audio_proxy/service/StreamOutImpl.cpp
@@ -16,7 +16,6 @@
#include <android-base/logging.h>
#include <inttypes.h>
-#include <system/audio.h>
#include <time.h>
#include <utils/Log.h>
@@ -24,6 +23,7 @@
#include "AidlTypes.h"
#include "BusOutputStream.h"
+#include "WriteThread.h"
using android::status_t;
@@ -35,17 +35,17 @@ namespace {
constexpr uint32_t kMaxBufferSize = 1 << 30;
constexpr uint32_t kDefaultLatencyMs = 40;
-uint64_t calcFrameSize(const AudioConfig& config) {
- audio_format_t format = static_cast<audio_format_t>(config.format);
+constexpr int64_t kOneSecInNs = 1'000'000'000;
- if (!audio_has_proportional_frames(format)) {
- return sizeof(int8_t);
+void deleteEventFlag(EventFlag* obj) {
+ if (!obj) {
+ return;
}
- size_t channelSampleSize = audio_bytes_per_sample(format);
- return audio_channel_count_from_out_mask(
- static_cast<audio_channel_mask_t>(config.channelMask)) *
- channelSampleSize;
+ status_t status = EventFlag::deleteEventFlag(&obj);
+ if (status) {
+ LOG(ERROR) << "Write MQ event flag deletion error: " << strerror(-status);
+ }
}
AudioConfig fromAidlAudioConfig(const AidlAudioConfig& aidlConfig) {
@@ -58,16 +58,48 @@ AudioConfig fromAidlAudioConfig(const AidlAudioConfig& aidlConfig) {
return config;
}
+uint64_t estimatePlayedFramesSince(const TimeSpec& timestamp,
+ uint32_t sampleRateHz) {
+ timespec now = {0, 0};
+ clock_gettime(CLOCK_MONOTONIC, &now);
+ int64_t deltaSec = 0;
+ int64_t deltaNSec = 0;
+ if (now.tv_nsec >= timestamp.tvNSec) {
+ deltaSec = now.tv_sec - timestamp.tvSec;
+ deltaNSec = now.tv_nsec - timestamp.tvNSec;
+ } else {
+ deltaSec = now.tv_sec - timestamp.tvSec - 1;
+ deltaNSec = kOneSecInNs + now.tv_nsec - timestamp.tvNSec;
+ }
+
+ if (deltaSec < 0 || deltaNSec < 0) {
+ return 0;
+ }
+
+ return deltaSec * sampleRateHz + deltaNSec * sampleRateHz / kOneSecInNs;
+}
+
} // namespace
StreamOutImpl::StreamOutImpl(std::shared_ptr<BusOutputStream> stream)
: mStream(std::move(stream)),
- mConfig(fromAidlAudioConfig(mStream->getConfig())) {}
+ mConfig(fromAidlAudioConfig(mStream->getConfig())),
+ mEventFlag(nullptr, deleteEventFlag) {}
+
+StreamOutImpl::~StreamOutImpl() {
+ if (mWriteThread) {
+ mWriteThread->stop();
+ status_t status = mWriteThread->join();
+ if (status) {
+ LOG(ERROR) << "write thread exit error " << strerror(-status);
+ }
+ }
-StreamOutImpl::~StreamOutImpl() = default;
+ mEventFlag.reset();
+}
Return<uint64_t> StreamOutImpl::getFrameSize() {
- return calcFrameSize(mConfig);
+ return mStream->getFrameSize();
}
Return<uint64_t> StreamOutImpl::getFrameCount() {
@@ -76,7 +108,7 @@ Return<uint64_t> StreamOutImpl::getFrameCount() {
Return<uint64_t> StreamOutImpl::getBufferSize() {
// TODO(yucliu): The buffer size should be provided by command line args.
- return 20 * mConfig.sampleRateHz * calcFrameSize(mConfig) / 1000;
+ return 20 * mConfig.sampleRateHz * mStream->getFrameSize() / 1000;
}
Return<uint32_t> StreamOutImpl::getSampleRate() { return mConfig.sampleRateHz; }
@@ -132,7 +164,13 @@ Return<Result> StreamOutImpl::removeEffect(uint64_t effectId) {
}
Return<Result> StreamOutImpl::standby() {
- return mStream->standby() ? Result::OK : Result::INVALID_STATE;
+ bool success = mStream->standby();
+ if (!success) {
+ return Result::INVALID_STATE;
+ }
+
+ mTotalPlayedFramesSinceStandby = estimateTotalPlayedFrames();
+ return Result::OK;
}
Return<void> StreamOutImpl::getDevices(getDevices_cb _hidl_cb) {
@@ -163,13 +201,14 @@ Return<Result> StreamOutImpl::setHwAvSync(uint32_t hwAvSync) {
}
Return<Result> StreamOutImpl::close() {
+ if (mWriteThread) {
+ mWriteThread->stop();
+ }
return mStream->close() ? Result::OK : Result::INVALID_STATE;
}
Return<uint32_t> StreamOutImpl::getLatency() {
- // TODO(yucliu): If no audio data is written into client, use the default
- // latency from command line args. Otherwise calculate the value from
- // AidlWriteStatus returned by mStream.writeRingBuffer.
+ // TODO(yucliu): Get the value from command line.
return kDefaultLatencyMs;
}
@@ -189,15 +228,98 @@ Return<void> StreamOutImpl::prepareForWriting(uint32_t frameSize,
return Void();
};
- // TODO(yucliu): Create a thread to read data from FMQ and write the data into
- // mStream.
- return sendError(Result::INVALID_STATE);
+ if (mDataMQ) {
+ LOG(ERROR) << "The client attempted to call prepareForWriting twice";
+ return sendError(Result::INVALID_STATE);
+ }
+
+ if (frameSize == 0 || framesCount == 0) {
+ LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount ("
+ << framesCount << ")";
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ if (frameSize > kMaxBufferSize / framesCount) {
+ LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount
+ << " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")";
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ auto commandMQ = std::make_unique<CommandMQ>(1);
+ if (!commandMQ->isValid()) {
+ LOG(ERROR) << "Command MQ is invalid";
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ auto dataMQ =
+ std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */);
+ if (!dataMQ->isValid()) {
+ LOG(ERROR) << "Data MQ is invalid";
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ auto statusMQ = std::make_unique<StatusMQ>(1);
+ if (!statusMQ->isValid()) {
+ LOG(ERROR) << "Status MQ is invalid";
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ EventFlag* rawEventFlag = nullptr;
+ status_t status =
+ EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag);
+ std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag,
+ deleteEventFlag);
+ if (status != ::android::OK || !eventFlag) {
+ LOG(ERROR) << "Failed creating event flag for data MQ: "
+ << strerror(-status);
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ if (!mStream->prepareForWriting(frameSize, framesCount)) {
+ LOG(ERROR) << "Failed to prepare writing channel.";
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ sp<WriteThread> writeThread =
+ sp<WriteThread>::make(mStream, commandMQ.get(), dataMQ.get(),
+ statusMQ.get(), eventFlag.get(), kDefaultLatencyMs);
+ status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO);
+ if (status != ::android::OK) {
+ LOG(ERROR) << "Failed to start writer thread: " << strerror(-status);
+ return sendError(Result::INVALID_ARGUMENTS);
+ }
+
+ mCommandMQ = std::move(commandMQ);
+ mDataMQ = std::move(dataMQ);
+ mStatusMQ = std::move(statusMQ);
+ mEventFlag = std::move(eventFlag);
+ mWriteThread = std::move(writeThread);
+ threadInfo.pid = getpid();
+ threadInfo.tid = mWriteThread->getTid();
+ _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(),
+ *mStatusMQ->getDesc(), threadInfo);
+
+ return Void();
}
Return<void> StreamOutImpl::getRenderPosition(getRenderPosition_cb _hidl_cb) {
- // TODO(yucliu): Render position can be calculated by the AidlWriteStatus
- // returned by the mStream.writeRingBuffer.
- _hidl_cb(Result::NOT_SUPPORTED, 0);
+ uint64_t totalPlayedFrames = estimateTotalPlayedFrames();
+ if (totalPlayedFrames == 0) {
+ _hidl_cb(Result::OK, 0);
+ return Void();
+ }
+
+ // getRenderPosition returns the number of frames played since the output has
+ // exited standby.
+ DCHECK_GE(totalPlayedFrames, mTotalPlayedFramesSinceStandby);
+ uint64_t position = totalPlayedFrames - mTotalPlayedFramesSinceStandby;
+
+ if (position > std::numeric_limits<uint32_t>::max()) {
+ _hidl_cb(Result::INVALID_STATE, 0);
+ return Void();
+ }
+
+ _hidl_cb(Result::OK, position);
return Void();
}
@@ -242,9 +364,13 @@ Return<Result> StreamOutImpl::flush() {
Return<void> StreamOutImpl::getPresentationPosition(
getPresentationPosition_cb _hidl_cb) {
- // TODO(yucliu): Presentation position can be calculated by the
- // AidlWriteStatus returned by the mStream.writeRingBuffer.
- _hidl_cb(Result::NOT_SUPPORTED, 0, {});
+ if (!mWriteThread) {
+ _hidl_cb(Result::INVALID_STATE, 0, {});
+ return Void();
+ }
+
+ auto [frames, timestamp] = mWriteThread->getPresentationPosition();
+ _hidl_cb(Result::OK, frames, timestamp);
return Void();
}
@@ -286,10 +412,26 @@ void StreamOutImpl::updateOutputStream(
return;
}
- // TODO(yucliu): Call mStream.prepareForWriting if audioserver starts to write
- // data.
+ if (mWriteThread) {
+ if (!stream->prepareForWriting(mStream->getWritingFrameSize(),
+ mStream->getWritingFrameCount())) {
+ LOG(ERROR) << "Failed to prepare writing channel.";
+ return;
+ }
+
+ mWriteThread->updateOutputStream(stream);
+ }
mStream = std::move(stream);
}
+uint64_t StreamOutImpl::estimateTotalPlayedFrames() const {
+ if (!mWriteThread) {
+ return 0;
+ }
+
+ auto [frames, timestamp] = mWriteThread->getPresentationPosition();
+ return frames + estimatePlayedFramesSince(timestamp, mConfig.sampleRateHz);
+}
+
} // namespace audio_proxy::service
diff --git a/audio_proxy/service/StreamOutImpl.h b/audio_proxy/service/StreamOutImpl.h
index 8ad7c49..50b348d 100644
--- a/audio_proxy/service/StreamOutImpl.h
+++ b/audio_proxy/service/StreamOutImpl.h
@@ -40,6 +40,9 @@ using namespace android::hardware::audio::CPP_VERSION;
namespace audio_proxy::service {
class BusOutputStream;
+class WriteThread;
+
+typedef void (*EventFlagDeleter)(EventFlag*);
class StreamOutImpl : public IStreamOut {
public:
@@ -113,8 +116,18 @@ class StreamOutImpl : public IStreamOut {
int32_t programId) override;
private:
+ uint64_t estimateTotalPlayedFrames() const;
+
std::shared_ptr<BusOutputStream> mStream;
const AudioConfig mConfig;
+
+ std::unique_ptr<CommandMQ> mCommandMQ;
+ std::unique_ptr<DataMQ> mDataMQ;
+ std::unique_ptr<StatusMQ> mStatusMQ;
+ std::unique_ptr<EventFlag, EventFlagDeleter> mEventFlag;
+ sp<WriteThread> mWriteThread;
+
+ uint64_t mTotalPlayedFramesSinceStandby = 0;
};
} // namespace audio_proxy::service
diff --git a/audio_proxy/service/WriteThread.cpp b/audio_proxy/service/WriteThread.cpp
new file mode 100644
index 0000000..64ba8ee
--- /dev/null
+++ b/audio_proxy/service/WriteThread.cpp
@@ -0,0 +1,214 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "WriteThread.h"
+
+#include <android-base/logging.h>
+#include <time.h>
+
+#include <atomic>
+
+#include "AidlTypes.h"
+#include "BusOutputStream.h"
+
+namespace audio_proxy::service {
+namespace {
+// Time out for FMQ read in ns -- 1s.
+constexpr int64_t kFmqReadTimeoutNs = 1'000'000'000;
+} // namespace
+
+WriteThread::WriteThread(std::shared_ptr<BusOutputStream> stream,
+ CommandMQ* commandMQ, DataMQ* dataMQ,
+ StatusMQ* statusMQ, EventFlag* eventFlag,
+ uint32_t latencyMs)
+ : Thread(false /*canCallJava*/),
+ mStream(std::move(stream)),
+ mCommandMQ(commandMQ),
+ mDataMQ(dataMQ),
+ mStatusMQ(statusMQ),
+ mEventFlag(eventFlag),
+ mLatencyMs(latencyMs) {}
+
+WriteThread::~WriteThread() = default;
+
+void WriteThread::stop() {
+ if (mStop.load(std::memory_order_relaxed)) {
+ return;
+ }
+
+ mStop.store(true, std::memory_order_release);
+ mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
+}
+
+void WriteThread::updateOutputStream(std::shared_ptr<BusOutputStream> stream) {
+ {
+ std::scoped_lock<std::mutex> lock(mStreamLock);
+ mStream = std::move(stream);
+ }
+
+ // Assume all the written frames are already played out by the old stream.
+ std::scoped_lock<std::mutex> lock(mPositionLock);
+ mPresentationFramesOffset = mTotalWrittenFrames;
+}
+
+std::pair<uint64_t, TimeSpec> WriteThread::getPresentationPosition() {
+ std::scoped_lock<std::mutex> lock(mPositionLock);
+ return std::make_pair(mPresentationFrames, mPresentationTimestamp);
+}
+
+IStreamOut::WriteStatus WriteThread::doWrite(BusOutputStream* stream) {
+ IStreamOut::WriteStatus status;
+ status.replyTo = IStreamOut::WriteCommand::WRITE;
+ status.retval = Result::INVALID_STATE;
+ status.reply.written = 0;
+
+ const size_t availToRead = mDataMQ->availableToRead();
+ if (stream->availableToWrite() < availToRead) {
+ LOG(WARNING) << "No space to write, wait...";
+ return status;
+ }
+
+ DataMQ::MemTransaction tx;
+ if (mDataMQ->beginRead(availToRead, &tx)) {
+ status.retval = Result::OK;
+ AidlWriteStatus writeStatus = stream->writeRingBuffer(
+ tx.getFirstRegion().getAddress(), tx.getFirstRegion().getLength(),
+ tx.getSecondRegion().getAddress(), tx.getSecondRegion().getLength());
+ if (writeStatus.written < availToRead) {
+ LOG(WARNING) << "Failed to write all the bytes to client. Written "
+ << writeStatus.written << ", available " << availToRead;
+ }
+
+ if (writeStatus.written < 0) {
+ writeStatus.written = 0;
+ }
+
+ status.reply.written = writeStatus.written;
+ mDataMQ->commitRead(writeStatus.written);
+
+ if (writeStatus.position.frames < 0 ||
+ writeStatus.position.timestamp.tvSec < 0 ||
+ writeStatus.position.timestamp.tvNSec < 0) {
+ LOG(WARNING) << "Invalid latency info.";
+ return status;
+ }
+
+ updatePresentationPosition(writeStatus, stream);
+ }
+
+ return status;
+}
+
+IStreamOut::WriteStatus WriteThread::doGetPresentationPosition() const {
+ IStreamOut::WriteStatus status;
+ status.replyTo = IStreamOut::WriteCommand::GET_PRESENTATION_POSITION;
+ status.retval = Result::OK;
+ // Write always happens on the same thread, there's no need to lock.
+ status.reply.presentationPosition = {mPresentationFrames,
+ mPresentationTimestamp};
+ return status;
+}
+
+IStreamOut::WriteStatus WriteThread::doGetLatency() const {
+ IStreamOut::WriteStatus status;
+ status.replyTo = IStreamOut::WriteCommand::GET_LATENCY;
+ status.retval = Result::OK;
+ // Write always happens on the same thread, there's no need to lock.
+ status.reply.latencyMs = mLatencyMs;
+ return status;
+}
+
+bool WriteThread::threadLoop() {
+ // This implementation doesn't return control back to the Thread until the
+ // parent thread decides to stop, as the Thread uses mutexes, and this can
+ // lead to priority inversion.
+ while (!mStop.load(std::memory_order_acquire)) {
+ std::shared_ptr<BusOutputStream> stream;
+ {
+ std::scoped_lock<std::mutex> lock(mStreamLock);
+ stream = mStream;
+ }
+
+ IStreamOut::WriteCommand replyTo;
+ if (!mCommandMQ->readBlocking(
+ &replyTo, 1 /* count */, 0 /* readNoticication */,
+ static_cast<uint32_t>(
+ MessageQueueFlagBits::NOT_EMPTY) /* writeNotification */,
+ kFmqReadTimeoutNs, mEventFlag)) {
+ LOG(ERROR) << "read command timeout";
+ continue;
+ }
+
+ if (replyTo == IStreamOut::WriteCommand::WRITE) {
+ mNonWriteCommandCount = 0;
+ } else {
+ mNonWriteCommandCount++;
+ }
+
+ IStreamOut::WriteStatus status;
+ switch (replyTo) {
+ case IStreamOut::WriteCommand::WRITE:
+ status = doWrite(stream.get());
+ if (status.retval != Result::OK) {
+ LOG(ERROR) << "write status not ok";
+ continue;
+ }
+ break;
+ case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION:
+ // If we don't write data for a while, the presentation position info
+ // may not be accurate. Write 0 bytes data to the client to get the
+ // latest presentation position info.
+ if (mNonWriteCommandCount >= 3 || mNonWriteCommandCount < 0) {
+ queryPresentationPosition(stream.get());
+ }
+ status = doGetPresentationPosition();
+ break;
+ case IStreamOut::WriteCommand::GET_LATENCY:
+ status = doGetLatency();
+ break;
+ default:
+ LOG(ERROR) << "Unknown write thread command code "
+ << static_cast<int>(replyTo);
+ status.retval = Result::NOT_SUPPORTED;
+ break;
+ }
+
+ if (!mStatusMQ->write(&status)) {
+ LOG(ERROR) << "Status message queue write failed";
+ }
+ mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+ }
+
+ return false;
+}
+
+void WriteThread::queryPresentationPosition(BusOutputStream* stream) {
+ AidlWriteStatus writeStatus =
+ stream->writeRingBuffer(nullptr, 0, nullptr, 0);
+ updatePresentationPosition(writeStatus, stream);
+}
+
+void WriteThread::updatePresentationPosition(const AidlWriteStatus& writeStatus,
+ BusOutputStream* stream) {
+ std::scoped_lock<std::mutex> lock(mPositionLock);
+ mPresentationFrames = mPresentationFramesOffset + writeStatus.position.frames;
+ mPresentationTimestamp = {
+ .tvSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvSec),
+ .tvNSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvNSec),
+ };
+
+ mTotalWrittenFrames += writeStatus.written / stream->getFrameSize();
+}
+
+} // namespace audio_proxy::service \ No newline at end of file
diff --git a/audio_proxy/service/WriteThread.h b/audio_proxy/service/WriteThread.h
new file mode 100644
index 0000000..d313686
--- /dev/null
+++ b/audio_proxy/service/WriteThread.h
@@ -0,0 +1,113 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <atomic>
+#include <mutex>
+
+// clang-format off
+#include PATH(android/hardware/audio/FILE_VERSION/IStreamOut.h)
+// clang-format on
+
+#include <android-base/thread_annotations.h>
+#include <fmq/EventFlag.h>
+#include <fmq/MessageQueue.h>
+#include <hidl/MQDescriptor.h>
+#include <inttypes.h>
+#include <utils/Thread.h>
+
+#include "AidlTypes.h"
+
+using android::sp;
+using android::Thread;
+using android::hardware::EventFlag;
+using android::hardware::kSynchronizedReadWrite;
+using android::hardware::MessageQueue;
+using namespace android::hardware::audio::common::CPP_VERSION;
+using namespace android::hardware::audio::CPP_VERSION;
+
+namespace audio_proxy::service {
+
+class BusOutputStream;
+
+class WriteThread : public Thread {
+ public:
+ using CommandMQ =
+ MessageQueue<IStreamOut::WriteCommand, kSynchronizedReadWrite>;
+ using DataMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>;
+ using StatusMQ =
+ MessageQueue<IStreamOut::WriteStatus, kSynchronizedReadWrite>;
+
+ // WriteThread's lifespan never exceeds StreamOut's lifespan.
+ WriteThread(std::shared_ptr<BusOutputStream> stream, CommandMQ* commandMQ,
+ DataMQ* dataMQ, StatusMQ* statusMQ, EventFlag* eventFlag,
+ uint32_t latencyMs);
+
+ ~WriteThread() override;
+
+ void stop();
+
+ void updateOutputStream(std::shared_ptr<BusOutputStream> stream);
+
+ std::pair<uint64_t, TimeSpec> getPresentationPosition();
+
+ private:
+ bool threadLoop() override;
+
+ // The following function is called on the thread and it will modify the
+ // variables which may be read from another thread.
+ IStreamOut::WriteStatus doWrite(BusOutputStream* stream);
+
+ // The following function is called on the thread and only read variable
+ // that is written on the same thread, so there's no need to lock the
+ // resources.
+ IStreamOut::WriteStatus doGetPresentationPosition() const
+ NO_THREAD_SAFETY_ANALYSIS;
+
+ IStreamOut::WriteStatus doGetLatency() const;
+
+ // Write 0 buffer to {@param stream} for latest presentation info.
+ void queryPresentationPosition(BusOutputStream* stream);
+
+ // Update presentation position info after writing to {@param stream}. Caller
+ // should validate the {@param status}.
+ void updatePresentationPosition(const AidlWriteStatus& status,
+ BusOutputStream* stream);
+
+ std::atomic<bool> mStop = false;
+
+ std::mutex mStreamLock;
+ std::shared_ptr<BusOutputStream> mStream GUARDED_BY(mStreamLock);
+
+ CommandMQ* const mCommandMQ;
+ DataMQ* const mDataMQ;
+ StatusMQ* const mStatusMQ;
+ EventFlag* const mEventFlag;
+
+ // Latency in ms, used in HIDL API getLatency.
+ const uint32_t mLatencyMs;
+
+ // Count for consecutive FMQ command that is not WRITE.
+ int64_t mNonWriteCommandCount = 0;
+
+ // Presentation position information.
+ std::mutex mPositionLock;
+ uint64_t mPresentationFramesOffset GUARDED_BY(mPositionLock) = 0;
+ uint64_t mPresentationFrames GUARDED_BY(mPositionLock) = 0;
+ TimeSpec mPresentationTimestamp GUARDED_BY(mPositionLock) = {0, 0};
+ uint64_t mTotalWrittenFrames GUARDED_BY(mPositionLock) = 0;
+};
+
+} // namespace audio_proxy::service \ No newline at end of file