diff options
author | Yuchen Liu <yucliu@google.com> | 2021-08-18 07:35:06 +0000 |
---|---|---|
committer | Android (Google) Code Review <android-gerrit@google.com> | 2021-08-18 07:35:06 +0000 |
commit | 843bc1a59e96280e9a3e183eca5062f4d9b9e7b5 (patch) | |
tree | c48298b8e4050031870a0143245a089ac575a854 /audio_proxy | |
parent | 2a5625391aba65c399d6057ba74863edff49d7c5 (diff) | |
parent | 5ddd863d670dbd16e07197c5d4d5baf1a3c8b2e9 (diff) | |
download | atv-843bc1a59e96280e9a3e183eca5062f4d9b9e7b5.tar.gz |
Merge "[AudioProxy] Write data into IOutputStream"
Diffstat (limited to 'audio_proxy')
-rw-r--r-- | audio_proxy/service/Android.bp | 31 | ||||
-rw-r--r-- | audio_proxy/service/BusOutputStream.cpp | 14 | ||||
-rw-r--r-- | audio_proxy/service/BusOutputStream.h | 1 | ||||
-rw-r--r-- | audio_proxy/service/BusStreamProvider.cpp | 20 | ||||
-rw-r--r-- | audio_proxy/service/RemoteBusOutputStream.cpp | 159 | ||||
-rw-r--r-- | audio_proxy/service/RemoteBusOutputStream.h | 71 | ||||
-rw-r--r-- | audio_proxy/service/RingBufferUtil.cpp | 83 | ||||
-rw-r--r-- | audio_proxy/service/RingBufferUtil.h | 28 | ||||
-rw-r--r-- | audio_proxy/service/RingBufferUtilTest.cpp | 80 | ||||
-rw-r--r-- | audio_proxy/service/StreamOutImpl.cpp | 198 | ||||
-rw-r--r-- | audio_proxy/service/StreamOutImpl.h | 13 | ||||
-rw-r--r-- | audio_proxy/service/WriteThread.cpp | 214 | ||||
-rw-r--r-- | audio_proxy/service/WriteThread.h | 113 |
13 files changed, 994 insertions, 31 deletions
diff --git a/audio_proxy/service/Android.bp b/audio_proxy/service/Android.bp index b231adf..0e63ff9 100644 --- a/audio_proxy/service/Android.bp +++ b/audio_proxy/service/Android.bp @@ -21,6 +21,18 @@ package { default_applicable_licenses: ["device_google_atv_license"], } +cc_library_static { + name: "audio_proxy_service_util", + vendor_available: true, + host_supported: true, + srcs: [ + "RingBufferUtil.cpp", + ], + shared_libs: [ + "libbase", + ], +} + cc_binary { name: "device.google.atv.audio_proxy@5.1-service", vendor: true, @@ -33,7 +45,9 @@ cc_binary { "DeviceImpl.cpp", "DevicesFactoryImpl.cpp", "DummyBusOutputStream.cpp", + "RemoteBusOutputStream.cpp", "StreamOutImpl.cpp", + "WriteThread.cpp", "main.cpp", ], @@ -55,6 +69,10 @@ cc_binary { "libutils", ], + static_libs: [ + "audio_proxy_service_util", + ], + header_libs: [ "libaudio_system_headers", ], @@ -70,3 +88,16 @@ cc_binary { "-Wno-unused-parameter", ], } + +cc_test { + name: "audio_proxy_service_util_test", + host_supported: true, + + srcs: [ + "RingBufferUtilTest.cpp", + ], + static_libs: [ + "audio_proxy_service_util", + "libgtest", + ], +}
\ No newline at end of file diff --git a/audio_proxy/service/BusOutputStream.cpp b/audio_proxy/service/BusOutputStream.cpp index 1998e69..40ac8d4 100644 --- a/audio_proxy/service/BusOutputStream.cpp +++ b/audio_proxy/service/BusOutputStream.cpp @@ -15,6 +15,7 @@ #include "BusOutputStream.h" #include <android-base/logging.h> +#include <system/audio.h> namespace audio_proxy::service { @@ -27,6 +28,19 @@ const std::string& BusOutputStream::getAddress() const { return mAddress; } const AidlAudioConfig& BusOutputStream::getConfig() const { return mConfig; } int32_t BusOutputStream::getFlags() const { return mFlags; } +int BusOutputStream::getFrameSize() const { + audio_format_t format = static_cast<audio_format_t>(mConfig.format); + + if (!audio_has_proportional_frames(format)) { + return sizeof(int8_t); + } + + size_t channelSampleSize = audio_bytes_per_sample(format); + return audio_channel_count_from_out_mask( + static_cast<audio_channel_mask_t>(mConfig.channelMask)) * + channelSampleSize; +} + bool BusOutputStream::prepareForWriting(uint32_t frameSize, uint32_t frameCount) { DCHECK_EQ(mWritingFrameSize, 0); diff --git a/audio_proxy/service/BusOutputStream.h b/audio_proxy/service/BusOutputStream.h index e820ca8..f9fb930 100644 --- a/audio_proxy/service/BusOutputStream.h +++ b/audio_proxy/service/BusOutputStream.h @@ -30,6 +30,7 @@ class BusOutputStream { const std::string& getAddress() const; const AidlAudioConfig& getConfig() const; int32_t getFlags() const; + int getFrameSize() const; bool prepareForWriting(uint32_t frameSize, uint32_t frameCount); uint32_t getWritingFrameSize() const; diff --git a/audio_proxy/service/BusStreamProvider.cpp b/audio_proxy/service/BusStreamProvider.cpp index 7fac909..26ae460 100644 --- a/audio_proxy/service/BusStreamProvider.cpp +++ b/audio_proxy/service/BusStreamProvider.cpp @@ -19,6 +19,9 @@ #include <algorithm> #include "DummyBusOutputStream.h" +#include "RemoteBusOutputStream.h" + +using aidl::device::google::atv::audio_proxy::IOutputStream; namespace audio_proxy::service { @@ -58,9 +61,20 @@ void BusStreamProvider::onStreamOutCreated(wp<StreamOutImpl> stream) { std::shared_ptr<BusOutputStream> BusStreamProvider::openOutputStream_Locked( const std::string& address, const AidlAudioConfig& config, int32_t flags) { - // TODO(yucliu): Return AIDL interface based RemoteBusOutputStream when stream - // provider is available. - return std::make_shared<DummyBusOutputStream>(address, config, flags); + if (!mStreamProvider) { + return std::make_shared<DummyBusOutputStream>(address, config, flags); + } + + std::shared_ptr<IOutputStream> stream; + ndk::ScopedAStatus status = + mStreamProvider->openOutputStream(address, config, flags, &stream); + if (!status.isOk() || !stream) { + LOG(ERROR) << "Failed to open output stream, status " << status.getStatus(); + return std::make_shared<DummyBusOutputStream>(address, config, flags); + } + + return std::make_shared<RemoteBusOutputStream>(std::move(stream), address, + config, flags); } void BusStreamProvider::cleanStreamOutList_Locked() { diff --git a/audio_proxy/service/RemoteBusOutputStream.cpp b/audio_proxy/service/RemoteBusOutputStream.cpp new file mode 100644 index 0000000..3b27b79 --- /dev/null +++ b/audio_proxy/service/RemoteBusOutputStream.cpp @@ -0,0 +1,159 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "RemoteBusOutputStream.h" + +#include <aidl/device/google/atv/audio_proxy/MessageQueueFlag.h> +#include <android-base/logging.h> + +#include "RingBufferUtil.h" + +using aidl::device::google::atv::audio_proxy::MessageQueueFlag; +using android::status_t; + +namespace audio_proxy { +namespace service { +namespace { + +// Time out for FMQ read in ns -- 1s. +constexpr int64_t kFmqReadTimeoutNs = 1'000'000'000; + +void deleteEventFlag(EventFlag* obj) { + if (!obj) { + return; + } + + status_t status = EventFlag::deleteEventFlag(&obj); + if (status != android::OK) { + LOG(ERROR) << "write MQ event flag deletion error: " << strerror(-status); + } +} + +} // namespace + +RemoteBusOutputStream::RemoteBusOutputStream( + std::shared_ptr<IOutputStream> stream, const std::string& address, + const AidlAudioConfig& config, int32_t flags) + : BusOutputStream(address, config, flags), + mStream(std::move(stream)), + mEventFlag(nullptr, deleteEventFlag) {} +RemoteBusOutputStream::~RemoteBusOutputStream() = default; + +bool RemoteBusOutputStream::standby() { return mStream->standby().isOk(); } + +bool RemoteBusOutputStream::pause() { return mStream->pause().isOk(); } + +bool RemoteBusOutputStream::resume() { return mStream->resume().isOk(); } + +bool RemoteBusOutputStream::drain(AidlAudioDrain drain) { + return mStream->drain(drain).isOk(); +} + +bool RemoteBusOutputStream::flush() { return mStream->flush().isOk(); } + +bool RemoteBusOutputStream::close() { return mStream->close().isOk(); } + +bool RemoteBusOutputStream::setVolume(float left, float right) { + return mStream->setVolume(left, right).isOk(); +} + +size_t RemoteBusOutputStream::availableToWrite() { + return mDataMQ->availableToWrite(); +} + +AidlWriteStatus RemoteBusOutputStream::writeRingBuffer(const uint8_t* firstMem, + size_t firstLength, + const uint8_t* secondMem, + size_t secondLength) { + DCHECK(mDataMQ); + DCHECK(mStatusMQ); + DCHECK(mEventFlag); + AidlWriteStatus status; + DataMQ::MemTransaction tx; + if (!mDataMQ->beginWrite(firstLength + secondLength, &tx)) { + LOG(ERROR) << "Failed to begin write."; + return status; + } + + const DataMQ::MemRegion& firstRegion = tx.getFirstRegion(); + const DataMQ::MemRegion& secondRegion = tx.getSecondRegion(); + + copyRingBuffer(firstRegion.getAddress(), firstRegion.getLength(), + secondRegion.getAddress(), secondRegion.getLength(), + reinterpret_cast<const int8_t*>(firstMem), firstLength, + reinterpret_cast<const int8_t*>(secondMem), secondLength); + if (!mDataMQ->commitWrite(firstLength + secondLength)) { + LOG(ERROR) << "Failed to commit write."; + return status; + } + + mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlag::NOT_EMPTY)); + + // readNotification is used to "wake" after successful read, hence we don't + // need it. writeNotification is used to "wait" for the other end to write + // enough data. + if (!mStatusMQ->readBlocking( + &status, 1 /* count */, 0 /* readNotification */, + static_cast<uint32_t>( + MessageQueueFlag::NOT_FULL) /* writeNotification */, + kFmqReadTimeoutNs, mEventFlag.get())) { + LOG(ERROR) << "Failed to read status!"; + return status; + } + + return status; +} + +bool RemoteBusOutputStream::prepareForWritingImpl(uint32_t frameSize, + uint32_t frameCount) { + DataMQDesc dataMQDesc; + StatusMQDesc statusMQDesc; + ndk::ScopedAStatus status = mStream->prepareForWriting( + frameSize, frameCount, &dataMQDesc, &statusMQDesc); + if (!status.isOk()) { + LOG(ERROR) << "prepareForWriting fails."; + return false; + } + + auto dataMQ = std::make_unique<DataMQ>(dataMQDesc); + if (!dataMQ->isValid()) { + LOG(ERROR) << "invalid data mq."; + return false; + } + + EventFlag* rawEventFlag = nullptr; + status_t eventFlagStatus = + EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag); + std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag, + deleteEventFlag); + if (eventFlagStatus != android::OK || !eventFlag) { + LOG(ERROR) << "failed creating event flag for data MQ: " + << strerror(-eventFlagStatus); + return false; + } + + auto statusMQ = std::make_unique<StatusMQ>(statusMQDesc); + if (!statusMQ->isValid()) { + LOG(ERROR) << "invalid status mq."; + return false; + } + + mDataMQ = std::move(dataMQ); + mStatusMQ = std::move(statusMQ); + mEventFlag = std::move(eventFlag); + return true; +} + +} // namespace service +} // namespace audio_proxy
\ No newline at end of file diff --git a/audio_proxy/service/RemoteBusOutputStream.h b/audio_proxy/service/RemoteBusOutputStream.h new file mode 100644 index 0000000..6f663cb --- /dev/null +++ b/audio_proxy/service/RemoteBusOutputStream.h @@ -0,0 +1,71 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <aidl/device/google/atv/audio_proxy/IOutputStream.h> +#include <fmq/AidlMessageQueue.h> +#include <fmq/EventFlag.h> + +#include "BusOutputStream.h" + +namespace audio_proxy { +namespace service { + +using aidl::android::hardware::common::fmq::MQDescriptor; +using aidl::android::hardware::common::fmq::SynchronizedReadWrite; +using aidl::device::google::atv::audio_proxy::IOutputStream; +using android::AidlMessageQueue; +using android::hardware::EventFlag; + +class RemoteBusOutputStream : public BusOutputStream { + public: + RemoteBusOutputStream(std::shared_ptr<IOutputStream> stream, + const std::string& address, + const AidlAudioConfig& config, int32_t flags); + ~RemoteBusOutputStream() override; + + bool standby() override; + bool pause() override; + bool resume() override; + bool drain(AidlAudioDrain drain) override; + bool flush() override; + bool close() override; + bool setVolume(float left, float right) override; + + size_t availableToWrite() override; + AidlWriteStatus writeRingBuffer(const uint8_t* firstMem, size_t firstLength, + const uint8_t* secondMem, + size_t secondLength) override; + + protected: + bool prepareForWritingImpl(uint32_t frameSize, uint32_t frameCount) override; + + private: + using DataMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>; + using DataMQDesc = MQDescriptor<int8_t, SynchronizedReadWrite>; + using StatusMQ = AidlMessageQueue<AidlWriteStatus, SynchronizedReadWrite>; + using StatusMQDesc = MQDescriptor<AidlWriteStatus, SynchronizedReadWrite>; + + typedef void (*EventFlagDeleter)(EventFlag*); + + std::shared_ptr<IOutputStream> mStream; + + std::unique_ptr<DataMQ> mDataMQ; + std::unique_ptr<StatusMQ> mStatusMQ; + std::unique_ptr<EventFlag, EventFlagDeleter> mEventFlag; +}; + +} // namespace service +} // namespace audio_proxy
\ No newline at end of file diff --git a/audio_proxy/service/RingBufferUtil.cpp b/audio_proxy/service/RingBufferUtil.cpp new file mode 100644 index 0000000..dbe80f4 --- /dev/null +++ b/audio_proxy/service/RingBufferUtil.cpp @@ -0,0 +1,83 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "RingBufferUtil.h" + +#include <android-base/logging.h> + +namespace audio_proxy::service { +namespace { +struct CopyDesc { + int8_t* dst = nullptr; + const int8_t* src = nullptr; + size_t len = 0; +}; +} // namespace + +void copyRingBuffer(int8_t* dstBuf1, size_t dstLen1, int8_t* dstBuf2, + size_t dstLen2, const int8_t* srcBuf1, size_t srcLen1, + const int8_t* srcBuf2, size_t srcLen2) { + // Caller should make sure the dst buffer has more space. + DCHECK_GE(dstLen1 + dstLen2, srcLen1 + srcLen2); + + CopyDesc cp1 = {dstBuf1, srcBuf1, 0}; + CopyDesc cp2; + CopyDesc cp3; + + if (srcLen1 == dstLen1) { + cp1 = {dstBuf1, srcBuf1, srcLen1}; + + DCHECK_LE(srcLen2, dstLen2); + cp2 = {dstBuf2, srcBuf2, srcLen2}; + + // No need to copy more data, thus no need to update cp3. + } else if (srcLen1 < dstLen1) { + cp1 = {dstBuf1, srcBuf1, srcLen1}; + + if (dstLen1 <= srcLen1 + srcLen2) { + // Copy data into both dstBuf1 and dstBuf2. + cp2 = {cp1.dst + cp1.len, srcBuf2, dstLen1 - srcLen1}; + cp3 = {dstBuf2, cp2.src + cp2.len, srcLen1 + srcLen2 - dstLen1}; + } else { + // dstBuf1 is bigger enough to hold all the data from src. + cp2 = {cp1.dst + cp1.len, srcBuf2, srcLen2}; + + // No need to copy more data, thus no need to update cp3. + } + } else { // srcLen1 > dstLen1 + cp1 = {dstBuf1, srcBuf1, dstLen1}; + cp2 = {dstBuf2, cp1.src + cp1.len, srcLen1 - dstLen1}; + cp3 = {cp2.dst + cp2.len, srcBuf2, srcLen2}; + } + + if (cp1.len > 0) { + DCHECK(cp1.dst); + DCHECK(cp1.src); + std::memcpy(cp1.dst, cp1.src, cp1.len); + } + + if (cp2.len > 0) { + DCHECK(cp2.dst); + DCHECK(cp2.src); + std::memcpy(cp2.dst, cp2.src, cp2.len); + } + + if (cp3.len > 0) { + DCHECK(cp3.dst); + DCHECK(cp3.src); + std::memcpy(cp3.dst, cp3.src, cp3.len); + } +} + +} // namespace audio_proxy::service
\ No newline at end of file diff --git a/audio_proxy/service/RingBufferUtil.h b/audio_proxy/service/RingBufferUtil.h new file mode 100644 index 0000000..a37f03e --- /dev/null +++ b/audio_proxy/service/RingBufferUtil.h @@ -0,0 +1,28 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +namespace audio_proxy::service { + +// Copy data from ring buffer "src" to ring buffer "dst". "dst" is guaranteed to +// have more space than "src". +void copyRingBuffer(int8_t* dstBuf1, size_t dstLen1, int8_t* dstBuf2, + size_t dstLen2, const int8_t* srcBuf1, size_t srcLen1, + const int8_t* srcBuf2, size_t srcLen2); + +} // namespace audio_proxy::service
\ No newline at end of file diff --git a/audio_proxy/service/RingBufferUtilTest.cpp b/audio_proxy/service/RingBufferUtilTest.cpp new file mode 100644 index 0000000..59b431c --- /dev/null +++ b/audio_proxy/service/RingBufferUtilTest.cpp @@ -0,0 +1,80 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <gtest/gtest.h> + +#include "RingBufferUtil.h" + +using namespace audio_proxy::service; + +using Buffer = std::vector<int8_t>; + +class RingBufferUtilTest : public testing::TestWithParam< + std::tuple<Buffer, Buffer, Buffer, Buffer>> {}; + +TEST_P(RingBufferUtilTest, DifferentBufferSize) { + auto [src1, src2, expectedDst1, expectedDst2] = GetParam(); + + Buffer dst1(expectedDst1.size()); + Buffer dst2(expectedDst2.size()); + + copyRingBuffer(dst1.data(), dst1.size(), dst2.data(), dst2.size(), + src1.data(), src1.size(), src2.data(), src2.size()); + + EXPECT_EQ(dst1, expectedDst1); + EXPECT_EQ(dst2, expectedDst2); +} + +// clang-format off +const std::vector<std::tuple<Buffer, Buffer, Buffer, Buffer>> testParams = { + // The layout are the same for src and dst. + { + {0, 1, 2, 3, 4}, + {5, 6, 7, 8, 9}, + {0, 1, 2, 3, 4}, + {5, 6, 7, 8, 9} + }, + // src1 size is samller than dst1 size. + { + {0, 1, 2, 3}, + {4, 5, 6, 7, 8, 9}, + {0, 1, 2, 3, 4}, + {5, 6, 7, 8, 9} + }, + // src2 size is larger than dst1 size. + { + {0, 1, 2, 3, 4, 5}, + {6, 7, 8, 9}, + {0, 1, 2, 3, 4}, + {5, 6, 7, 8, 9} + }, + // dst1 size is larger enough to hold all the src data. + { + {0, 1, 2, 3, 4}, + {5, 6, 7, 8, 9}, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0}, + {0, 0, 0, 0, 0} + }, + // Empty src + {{}, {}, {}, {}} +}; +// clang-format off + +INSTANTIATE_TEST_SUITE_P(RingBufferUtilTestSuite, RingBufferUtilTest, + testing::ValuesIn(testParams)); + +TEST(RingBufferUtilTest, CopyNullptr) { + // Test should not crash. + copyRingBuffer(nullptr, 0, nullptr, 0, nullptr, 0, nullptr, 0); +} diff --git a/audio_proxy/service/StreamOutImpl.cpp b/audio_proxy/service/StreamOutImpl.cpp index 0b65707..e317d4e 100644 --- a/audio_proxy/service/StreamOutImpl.cpp +++ b/audio_proxy/service/StreamOutImpl.cpp @@ -16,7 +16,6 @@ #include <android-base/logging.h> #include <inttypes.h> -#include <system/audio.h> #include <time.h> #include <utils/Log.h> @@ -24,6 +23,7 @@ #include "AidlTypes.h" #include "BusOutputStream.h" +#include "WriteThread.h" using android::status_t; @@ -35,17 +35,17 @@ namespace { constexpr uint32_t kMaxBufferSize = 1 << 30; constexpr uint32_t kDefaultLatencyMs = 40; -uint64_t calcFrameSize(const AudioConfig& config) { - audio_format_t format = static_cast<audio_format_t>(config.format); +constexpr int64_t kOneSecInNs = 1'000'000'000; - if (!audio_has_proportional_frames(format)) { - return sizeof(int8_t); +void deleteEventFlag(EventFlag* obj) { + if (!obj) { + return; } - size_t channelSampleSize = audio_bytes_per_sample(format); - return audio_channel_count_from_out_mask( - static_cast<audio_channel_mask_t>(config.channelMask)) * - channelSampleSize; + status_t status = EventFlag::deleteEventFlag(&obj); + if (status) { + LOG(ERROR) << "Write MQ event flag deletion error: " << strerror(-status); + } } AudioConfig fromAidlAudioConfig(const AidlAudioConfig& aidlConfig) { @@ -58,16 +58,48 @@ AudioConfig fromAidlAudioConfig(const AidlAudioConfig& aidlConfig) { return config; } +uint64_t estimatePlayedFramesSince(const TimeSpec& timestamp, + uint32_t sampleRateHz) { + timespec now = {0, 0}; + clock_gettime(CLOCK_MONOTONIC, &now); + int64_t deltaSec = 0; + int64_t deltaNSec = 0; + if (now.tv_nsec >= timestamp.tvNSec) { + deltaSec = now.tv_sec - timestamp.tvSec; + deltaNSec = now.tv_nsec - timestamp.tvNSec; + } else { + deltaSec = now.tv_sec - timestamp.tvSec - 1; + deltaNSec = kOneSecInNs + now.tv_nsec - timestamp.tvNSec; + } + + if (deltaSec < 0 || deltaNSec < 0) { + return 0; + } + + return deltaSec * sampleRateHz + deltaNSec * sampleRateHz / kOneSecInNs; +} + } // namespace StreamOutImpl::StreamOutImpl(std::shared_ptr<BusOutputStream> stream) : mStream(std::move(stream)), - mConfig(fromAidlAudioConfig(mStream->getConfig())) {} + mConfig(fromAidlAudioConfig(mStream->getConfig())), + mEventFlag(nullptr, deleteEventFlag) {} + +StreamOutImpl::~StreamOutImpl() { + if (mWriteThread) { + mWriteThread->stop(); + status_t status = mWriteThread->join(); + if (status) { + LOG(ERROR) << "write thread exit error " << strerror(-status); + } + } -StreamOutImpl::~StreamOutImpl() = default; + mEventFlag.reset(); +} Return<uint64_t> StreamOutImpl::getFrameSize() { - return calcFrameSize(mConfig); + return mStream->getFrameSize(); } Return<uint64_t> StreamOutImpl::getFrameCount() { @@ -76,7 +108,7 @@ Return<uint64_t> StreamOutImpl::getFrameCount() { Return<uint64_t> StreamOutImpl::getBufferSize() { // TODO(yucliu): The buffer size should be provided by command line args. - return 20 * mConfig.sampleRateHz * calcFrameSize(mConfig) / 1000; + return 20 * mConfig.sampleRateHz * mStream->getFrameSize() / 1000; } Return<uint32_t> StreamOutImpl::getSampleRate() { return mConfig.sampleRateHz; } @@ -132,7 +164,13 @@ Return<Result> StreamOutImpl::removeEffect(uint64_t effectId) { } Return<Result> StreamOutImpl::standby() { - return mStream->standby() ? Result::OK : Result::INVALID_STATE; + bool success = mStream->standby(); + if (!success) { + return Result::INVALID_STATE; + } + + mTotalPlayedFramesSinceStandby = estimateTotalPlayedFrames(); + return Result::OK; } Return<void> StreamOutImpl::getDevices(getDevices_cb _hidl_cb) { @@ -163,13 +201,14 @@ Return<Result> StreamOutImpl::setHwAvSync(uint32_t hwAvSync) { } Return<Result> StreamOutImpl::close() { + if (mWriteThread) { + mWriteThread->stop(); + } return mStream->close() ? Result::OK : Result::INVALID_STATE; } Return<uint32_t> StreamOutImpl::getLatency() { - // TODO(yucliu): If no audio data is written into client, use the default - // latency from command line args. Otherwise calculate the value from - // AidlWriteStatus returned by mStream.writeRingBuffer. + // TODO(yucliu): Get the value from command line. return kDefaultLatencyMs; } @@ -189,15 +228,98 @@ Return<void> StreamOutImpl::prepareForWriting(uint32_t frameSize, return Void(); }; - // TODO(yucliu): Create a thread to read data from FMQ and write the data into - // mStream. - return sendError(Result::INVALID_STATE); + if (mDataMQ) { + LOG(ERROR) << "The client attempted to call prepareForWriting twice"; + return sendError(Result::INVALID_STATE); + } + + if (frameSize == 0 || framesCount == 0) { + LOG(ERROR) << "Invalid frameSize (" << frameSize << ") or framesCount (" + << framesCount << ")"; + return sendError(Result::INVALID_ARGUMENTS); + } + + if (frameSize > kMaxBufferSize / framesCount) { + LOG(ERROR) << "Buffer too big: " << frameSize << "*" << framesCount + << " bytes > MAX_BUFFER_SIZE (" << kMaxBufferSize << ")"; + return sendError(Result::INVALID_ARGUMENTS); + } + + auto commandMQ = std::make_unique<CommandMQ>(1); + if (!commandMQ->isValid()) { + LOG(ERROR) << "Command MQ is invalid"; + return sendError(Result::INVALID_ARGUMENTS); + } + + auto dataMQ = + std::make_unique<DataMQ>(frameSize * framesCount, true /* EventFlag */); + if (!dataMQ->isValid()) { + LOG(ERROR) << "Data MQ is invalid"; + return sendError(Result::INVALID_ARGUMENTS); + } + + auto statusMQ = std::make_unique<StatusMQ>(1); + if (!statusMQ->isValid()) { + LOG(ERROR) << "Status MQ is invalid"; + return sendError(Result::INVALID_ARGUMENTS); + } + + EventFlag* rawEventFlag = nullptr; + status_t status = + EventFlag::createEventFlag(dataMQ->getEventFlagWord(), &rawEventFlag); + std::unique_ptr<EventFlag, EventFlagDeleter> eventFlag(rawEventFlag, + deleteEventFlag); + if (status != ::android::OK || !eventFlag) { + LOG(ERROR) << "Failed creating event flag for data MQ: " + << strerror(-status); + return sendError(Result::INVALID_ARGUMENTS); + } + + if (!mStream->prepareForWriting(frameSize, framesCount)) { + LOG(ERROR) << "Failed to prepare writing channel."; + return sendError(Result::INVALID_ARGUMENTS); + } + + sp<WriteThread> writeThread = + sp<WriteThread>::make(mStream, commandMQ.get(), dataMQ.get(), + statusMQ.get(), eventFlag.get(), kDefaultLatencyMs); + status = writeThread->run("writer", ::android::PRIORITY_URGENT_AUDIO); + if (status != ::android::OK) { + LOG(ERROR) << "Failed to start writer thread: " << strerror(-status); + return sendError(Result::INVALID_ARGUMENTS); + } + + mCommandMQ = std::move(commandMQ); + mDataMQ = std::move(dataMQ); + mStatusMQ = std::move(statusMQ); + mEventFlag = std::move(eventFlag); + mWriteThread = std::move(writeThread); + threadInfo.pid = getpid(); + threadInfo.tid = mWriteThread->getTid(); + _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), + *mStatusMQ->getDesc(), threadInfo); + + return Void(); } Return<void> StreamOutImpl::getRenderPosition(getRenderPosition_cb _hidl_cb) { - // TODO(yucliu): Render position can be calculated by the AidlWriteStatus - // returned by the mStream.writeRingBuffer. - _hidl_cb(Result::NOT_SUPPORTED, 0); + uint64_t totalPlayedFrames = estimateTotalPlayedFrames(); + if (totalPlayedFrames == 0) { + _hidl_cb(Result::OK, 0); + return Void(); + } + + // getRenderPosition returns the number of frames played since the output has + // exited standby. + DCHECK_GE(totalPlayedFrames, mTotalPlayedFramesSinceStandby); + uint64_t position = totalPlayedFrames - mTotalPlayedFramesSinceStandby; + + if (position > std::numeric_limits<uint32_t>::max()) { + _hidl_cb(Result::INVALID_STATE, 0); + return Void(); + } + + _hidl_cb(Result::OK, position); return Void(); } @@ -242,9 +364,13 @@ Return<Result> StreamOutImpl::flush() { Return<void> StreamOutImpl::getPresentationPosition( getPresentationPosition_cb _hidl_cb) { - // TODO(yucliu): Presentation position can be calculated by the - // AidlWriteStatus returned by the mStream.writeRingBuffer. - _hidl_cb(Result::NOT_SUPPORTED, 0, {}); + if (!mWriteThread) { + _hidl_cb(Result::INVALID_STATE, 0, {}); + return Void(); + } + + auto [frames, timestamp] = mWriteThread->getPresentationPosition(); + _hidl_cb(Result::OK, frames, timestamp); return Void(); } @@ -286,10 +412,26 @@ void StreamOutImpl::updateOutputStream( return; } - // TODO(yucliu): Call mStream.prepareForWriting if audioserver starts to write - // data. + if (mWriteThread) { + if (!stream->prepareForWriting(mStream->getWritingFrameSize(), + mStream->getWritingFrameCount())) { + LOG(ERROR) << "Failed to prepare writing channel."; + return; + } + + mWriteThread->updateOutputStream(stream); + } mStream = std::move(stream); } +uint64_t StreamOutImpl::estimateTotalPlayedFrames() const { + if (!mWriteThread) { + return 0; + } + + auto [frames, timestamp] = mWriteThread->getPresentationPosition(); + return frames + estimatePlayedFramesSince(timestamp, mConfig.sampleRateHz); +} + } // namespace audio_proxy::service diff --git a/audio_proxy/service/StreamOutImpl.h b/audio_proxy/service/StreamOutImpl.h index 8ad7c49..50b348d 100644 --- a/audio_proxy/service/StreamOutImpl.h +++ b/audio_proxy/service/StreamOutImpl.h @@ -40,6 +40,9 @@ using namespace android::hardware::audio::CPP_VERSION; namespace audio_proxy::service { class BusOutputStream; +class WriteThread; + +typedef void (*EventFlagDeleter)(EventFlag*); class StreamOutImpl : public IStreamOut { public: @@ -113,8 +116,18 @@ class StreamOutImpl : public IStreamOut { int32_t programId) override; private: + uint64_t estimateTotalPlayedFrames() const; + std::shared_ptr<BusOutputStream> mStream; const AudioConfig mConfig; + + std::unique_ptr<CommandMQ> mCommandMQ; + std::unique_ptr<DataMQ> mDataMQ; + std::unique_ptr<StatusMQ> mStatusMQ; + std::unique_ptr<EventFlag, EventFlagDeleter> mEventFlag; + sp<WriteThread> mWriteThread; + + uint64_t mTotalPlayedFramesSinceStandby = 0; }; } // namespace audio_proxy::service diff --git a/audio_proxy/service/WriteThread.cpp b/audio_proxy/service/WriteThread.cpp new file mode 100644 index 0000000..64ba8ee --- /dev/null +++ b/audio_proxy/service/WriteThread.cpp @@ -0,0 +1,214 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "WriteThread.h" + +#include <android-base/logging.h> +#include <time.h> + +#include <atomic> + +#include "AidlTypes.h" +#include "BusOutputStream.h" + +namespace audio_proxy::service { +namespace { +// Time out for FMQ read in ns -- 1s. +constexpr int64_t kFmqReadTimeoutNs = 1'000'000'000; +} // namespace + +WriteThread::WriteThread(std::shared_ptr<BusOutputStream> stream, + CommandMQ* commandMQ, DataMQ* dataMQ, + StatusMQ* statusMQ, EventFlag* eventFlag, + uint32_t latencyMs) + : Thread(false /*canCallJava*/), + mStream(std::move(stream)), + mCommandMQ(commandMQ), + mDataMQ(dataMQ), + mStatusMQ(statusMQ), + mEventFlag(eventFlag), + mLatencyMs(latencyMs) {} + +WriteThread::~WriteThread() = default; + +void WriteThread::stop() { + if (mStop.load(std::memory_order_relaxed)) { + return; + } + + mStop.store(true, std::memory_order_release); + mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)); +} + +void WriteThread::updateOutputStream(std::shared_ptr<BusOutputStream> stream) { + { + std::scoped_lock<std::mutex> lock(mStreamLock); + mStream = std::move(stream); + } + + // Assume all the written frames are already played out by the old stream. + std::scoped_lock<std::mutex> lock(mPositionLock); + mPresentationFramesOffset = mTotalWrittenFrames; +} + +std::pair<uint64_t, TimeSpec> WriteThread::getPresentationPosition() { + std::scoped_lock<std::mutex> lock(mPositionLock); + return std::make_pair(mPresentationFrames, mPresentationTimestamp); +} + +IStreamOut::WriteStatus WriteThread::doWrite(BusOutputStream* stream) { + IStreamOut::WriteStatus status; + status.replyTo = IStreamOut::WriteCommand::WRITE; + status.retval = Result::INVALID_STATE; + status.reply.written = 0; + + const size_t availToRead = mDataMQ->availableToRead(); + if (stream->availableToWrite() < availToRead) { + LOG(WARNING) << "No space to write, wait..."; + return status; + } + + DataMQ::MemTransaction tx; + if (mDataMQ->beginRead(availToRead, &tx)) { + status.retval = Result::OK; + AidlWriteStatus writeStatus = stream->writeRingBuffer( + tx.getFirstRegion().getAddress(), tx.getFirstRegion().getLength(), + tx.getSecondRegion().getAddress(), tx.getSecondRegion().getLength()); + if (writeStatus.written < availToRead) { + LOG(WARNING) << "Failed to write all the bytes to client. Written " + << writeStatus.written << ", available " << availToRead; + } + + if (writeStatus.written < 0) { + writeStatus.written = 0; + } + + status.reply.written = writeStatus.written; + mDataMQ->commitRead(writeStatus.written); + + if (writeStatus.position.frames < 0 || + writeStatus.position.timestamp.tvSec < 0 || + writeStatus.position.timestamp.tvNSec < 0) { + LOG(WARNING) << "Invalid latency info."; + return status; + } + + updatePresentationPosition(writeStatus, stream); + } + + return status; +} + +IStreamOut::WriteStatus WriteThread::doGetPresentationPosition() const { + IStreamOut::WriteStatus status; + status.replyTo = IStreamOut::WriteCommand::GET_PRESENTATION_POSITION; + status.retval = Result::OK; + // Write always happens on the same thread, there's no need to lock. + status.reply.presentationPosition = {mPresentationFrames, + mPresentationTimestamp}; + return status; +} + +IStreamOut::WriteStatus WriteThread::doGetLatency() const { + IStreamOut::WriteStatus status; + status.replyTo = IStreamOut::WriteCommand::GET_LATENCY; + status.retval = Result::OK; + // Write always happens on the same thread, there's no need to lock. + status.reply.latencyMs = mLatencyMs; + return status; +} + +bool WriteThread::threadLoop() { + // This implementation doesn't return control back to the Thread until the + // parent thread decides to stop, as the Thread uses mutexes, and this can + // lead to priority inversion. + while (!mStop.load(std::memory_order_acquire)) { + std::shared_ptr<BusOutputStream> stream; + { + std::scoped_lock<std::mutex> lock(mStreamLock); + stream = mStream; + } + + IStreamOut::WriteCommand replyTo; + if (!mCommandMQ->readBlocking( + &replyTo, 1 /* count */, 0 /* readNoticication */, + static_cast<uint32_t>( + MessageQueueFlagBits::NOT_EMPTY) /* writeNotification */, + kFmqReadTimeoutNs, mEventFlag)) { + LOG(ERROR) << "read command timeout"; + continue; + } + + if (replyTo == IStreamOut::WriteCommand::WRITE) { + mNonWriteCommandCount = 0; + } else { + mNonWriteCommandCount++; + } + + IStreamOut::WriteStatus status; + switch (replyTo) { + case IStreamOut::WriteCommand::WRITE: + status = doWrite(stream.get()); + if (status.retval != Result::OK) { + LOG(ERROR) << "write status not ok"; + continue; + } + break; + case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION: + // If we don't write data for a while, the presentation position info + // may not be accurate. Write 0 bytes data to the client to get the + // latest presentation position info. + if (mNonWriteCommandCount >= 3 || mNonWriteCommandCount < 0) { + queryPresentationPosition(stream.get()); + } + status = doGetPresentationPosition(); + break; + case IStreamOut::WriteCommand::GET_LATENCY: + status = doGetLatency(); + break; + default: + LOG(ERROR) << "Unknown write thread command code " + << static_cast<int>(replyTo); + status.retval = Result::NOT_SUPPORTED; + break; + } + + if (!mStatusMQ->write(&status)) { + LOG(ERROR) << "Status message queue write failed"; + } + mEventFlag->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)); + } + + return false; +} + +void WriteThread::queryPresentationPosition(BusOutputStream* stream) { + AidlWriteStatus writeStatus = + stream->writeRingBuffer(nullptr, 0, nullptr, 0); + updatePresentationPosition(writeStatus, stream); +} + +void WriteThread::updatePresentationPosition(const AidlWriteStatus& writeStatus, + BusOutputStream* stream) { + std::scoped_lock<std::mutex> lock(mPositionLock); + mPresentationFrames = mPresentationFramesOffset + writeStatus.position.frames; + mPresentationTimestamp = { + .tvSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvSec), + .tvNSec = static_cast<uint64_t>(writeStatus.position.timestamp.tvNSec), + }; + + mTotalWrittenFrames += writeStatus.written / stream->getFrameSize(); +} + +} // namespace audio_proxy::service
\ No newline at end of file diff --git a/audio_proxy/service/WriteThread.h b/audio_proxy/service/WriteThread.h new file mode 100644 index 0000000..d313686 --- /dev/null +++ b/audio_proxy/service/WriteThread.h @@ -0,0 +1,113 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include <atomic> +#include <mutex> + +// clang-format off +#include PATH(android/hardware/audio/FILE_VERSION/IStreamOut.h) +// clang-format on + +#include <android-base/thread_annotations.h> +#include <fmq/EventFlag.h> +#include <fmq/MessageQueue.h> +#include <hidl/MQDescriptor.h> +#include <inttypes.h> +#include <utils/Thread.h> + +#include "AidlTypes.h" + +using android::sp; +using android::Thread; +using android::hardware::EventFlag; +using android::hardware::kSynchronizedReadWrite; +using android::hardware::MessageQueue; +using namespace android::hardware::audio::common::CPP_VERSION; +using namespace android::hardware::audio::CPP_VERSION; + +namespace audio_proxy::service { + +class BusOutputStream; + +class WriteThread : public Thread { + public: + using CommandMQ = + MessageQueue<IStreamOut::WriteCommand, kSynchronizedReadWrite>; + using DataMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>; + using StatusMQ = + MessageQueue<IStreamOut::WriteStatus, kSynchronizedReadWrite>; + + // WriteThread's lifespan never exceeds StreamOut's lifespan. + WriteThread(std::shared_ptr<BusOutputStream> stream, CommandMQ* commandMQ, + DataMQ* dataMQ, StatusMQ* statusMQ, EventFlag* eventFlag, + uint32_t latencyMs); + + ~WriteThread() override; + + void stop(); + + void updateOutputStream(std::shared_ptr<BusOutputStream> stream); + + std::pair<uint64_t, TimeSpec> getPresentationPosition(); + + private: + bool threadLoop() override; + + // The following function is called on the thread and it will modify the + // variables which may be read from another thread. + IStreamOut::WriteStatus doWrite(BusOutputStream* stream); + + // The following function is called on the thread and only read variable + // that is written on the same thread, so there's no need to lock the + // resources. + IStreamOut::WriteStatus doGetPresentationPosition() const + NO_THREAD_SAFETY_ANALYSIS; + + IStreamOut::WriteStatus doGetLatency() const; + + // Write 0 buffer to {@param stream} for latest presentation info. + void queryPresentationPosition(BusOutputStream* stream); + + // Update presentation position info after writing to {@param stream}. Caller + // should validate the {@param status}. + void updatePresentationPosition(const AidlWriteStatus& status, + BusOutputStream* stream); + + std::atomic<bool> mStop = false; + + std::mutex mStreamLock; + std::shared_ptr<BusOutputStream> mStream GUARDED_BY(mStreamLock); + + CommandMQ* const mCommandMQ; + DataMQ* const mDataMQ; + StatusMQ* const mStatusMQ; + EventFlag* const mEventFlag; + + // Latency in ms, used in HIDL API getLatency. + const uint32_t mLatencyMs; + + // Count for consecutive FMQ command that is not WRITE. + int64_t mNonWriteCommandCount = 0; + + // Presentation position information. + std::mutex mPositionLock; + uint64_t mPresentationFramesOffset GUARDED_BY(mPositionLock) = 0; + uint64_t mPresentationFrames GUARDED_BY(mPositionLock) = 0; + TimeSpec mPresentationTimestamp GUARDED_BY(mPositionLock) = {0, 0}; + uint64_t mTotalWrittenFrames GUARDED_BY(mPositionLock) = 0; +}; + +} // namespace audio_proxy::service
\ No newline at end of file |