diff options
author | Zhomart Mukhamejanov <zhomart@google.com> | 2021-04-19 14:22:59 -0700 |
---|---|---|
committer | Zhomart Mukhamejanov <zhomart@google.com> | 2021-05-06 20:23:20 +0000 |
commit | 4efd36dc596496b3076d808dcd53fd0e847e725f (patch) | |
tree | e8176f8444e51e2e50651855ed4332e3d2d4cea5 /cpp/telemetry | |
parent | 00ea235ad10a9ee88d7ec06c6b25a37b64950f76 (diff) | |
download | Car-4efd36dc596496b3076d808dcd53fd0e847e725f.tar.gz |
Add pushing CarData to the ICarDataListener
- Move business logic to TelemetryServer class.
- TelemetryServer has a single mutex for all the
operations. Operations are cheap, don't block
for long time.
- cartelemetryd sends CarDataInternal one-by-one,
it drops if ICarDataListener cannot receive data.
No need to retry if the listener cannot receive data.
Test: atest cartelemetryd_impl_test
Test: run in emulator and use sampleclient to verify
Bug: 174608802
Change-Id: Iccc7275a74b2ed2f221b3e8d28bd329d010b0765
Merged-In: Iccc7275a74b2ed2f221b3e8d28bd329d010b0765
(cherry picked from commit 383708f7cd28836ecee938b36ba780e8b69a1691)
Diffstat (limited to 'cpp/telemetry')
20 files changed, 818 insertions, 371 deletions
diff --git a/cpp/telemetry/Android.bp b/cpp/telemetry/Android.bp index 4e6c1c01a9..326956e076 100644 --- a/cpp/telemetry/Android.bp +++ b/cpp/telemetry/Android.bp @@ -31,6 +31,9 @@ cc_defaults { "liblog", "libutils", ], + header_libs: [ + "libgtest_prod_headers", // for FRIEND_TEST + ], product_variables: { debuggable: { cflags: [ @@ -48,6 +51,7 @@ cc_library { srcs: [ "src/CarTelemetryImpl.cpp", "src/CarTelemetryInternalImpl.cpp", + "src/LooperWrapper.cpp", "src/RingBuffer.cpp", "src/TelemetryServer.cpp", ], @@ -64,9 +68,7 @@ cc_test { ], test_suites: ["general-tests"], srcs: [ - "tests/CarTelemetryImplTest.cpp", - "tests/CarTelemetryInternalImplTest.cpp", - "tests/RingBufferTest.cpp", + "tests/TelemetryServerTest.cpp", ], // Statically link only in tests, for portability reason. static_libs: [ diff --git a/cpp/telemetry/README.md b/cpp/telemetry/README.md index a13116dd1c..633ea62bf1 100644 --- a/cpp/telemetry/README.md +++ b/cpp/telemetry/README.md @@ -4,15 +4,22 @@ A structured log collection service for CarTelemetryService. See ARCHITECTURE.md ## Useful Commands -**Dump service information** +**Dumping the service information** `adb shell dumpsys android.automotive.telemetry.internal.ICarTelemetryInternal/default` -**Starting emulator** +**Enabling VERBOSE logs** -`aae emulator run -selinux permissive -writable-system` +``` +adb shell setprop log.tag.android.automotive.telemetryd@1.0 V +adb shell setprop log.tag.cartelemetryd_impl_test V +``` -**Running tests** +**Starting emulator with cold boot** + +`emulator -verbose -show-kernel -selinux permissive -writable-system -no-snapshot -wipe-data` + +**Running the tests** `atest cartelemetryd_impl_test:CarTelemetryInternalImplTest#TestSetListenerReturnsOk` diff --git a/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl b/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl index 48ab6f9d03..ccdf63aace 100644 --- a/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl +++ b/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl @@ -21,15 +21,17 @@ import android.automotive.telemetry.internal.CarDataInternal; /** * Listener for {@code ICarTelemetryInternal#registerListener}. */ -oneway interface ICarDataListener { +interface ICarDataListener { /** - * Called by ICarTelemetry when the data are available to be consumed. ICarTelemetry removes - * the delivered data when the callback succeeds. + * Called by native cartelemetryd when the data are available to be consumed. The service removes + * the delivered data from its buffer when the callback succeeds. If the callback fails, it + * immediately gives up and drops the data too and sends the next chunk of the available data. * * <p>If the collected data is too large, it will send only chunk of the data, and the callback * will be fired again. * * @param dataList the pushed data. + * @throws IllegalStateException if data cannot be received. */ void onCarDataReceived(in CarDataInternal[] dataList); } diff --git a/cpp/telemetry/sampleclient/README.md b/cpp/telemetry/sampleclient/README.md index ebbaf16b8f..5b97f00568 100644 --- a/cpp/telemetry/sampleclient/README.md +++ b/cpp/telemetry/sampleclient/README.md @@ -14,7 +14,9 @@ adb push $ANDROID_PRODUCT_OUT/vendor/bin/android.automotive.telemetryd-samplecli adb shell /system/bin/android.automotive.telemetryd-sampleclient -# Then check logcat and dumpsys to verify the results. +# Then check logcat and dumpsys to verify the results. The following command enables VERBOSE logs. +adb shell setprop log.tag.android.automotive.telemetryd@1.0 V +adb logcat -v color -b all -T 1000 ``` **2. Under vendor** diff --git a/cpp/telemetry/src/CarTelemetryImpl.cpp b/cpp/telemetry/src/CarTelemetryImpl.cpp index 34a4e60a49..cf6f61145c 100644 --- a/cpp/telemetry/src/CarTelemetryImpl.cpp +++ b/cpp/telemetry/src/CarTelemetryImpl.cpp @@ -31,16 +31,11 @@ namespace telemetry { using ::aidl::android::frameworks::automotive::telemetry::CarData; -CarTelemetryImpl::CarTelemetryImpl(RingBuffer* buffer) : mRingBuffer(buffer) {} +CarTelemetryImpl::CarTelemetryImpl(TelemetryServer* server) : mTelemetryServer(server) {} -// TODO(b/174608802): Add 10kb size check for the `dataList`, see the AIDL for the limits ndk::ScopedAStatus CarTelemetryImpl::write(const std::vector<CarData>& dataList) { uid_t publisherUid = ::AIBinder_getCallingUid(); - for (auto&& data : dataList) { - mRingBuffer->push({.mId = data.id, - .mContent = std::move(data.content), - .mPublisherUid = publisherUid}); - } + mTelemetryServer->writeCarData(dataList, publisherUid); return ndk::ScopedAStatus::ok(); } diff --git a/cpp/telemetry/src/CarTelemetryImpl.h b/cpp/telemetry/src/CarTelemetryImpl.h index a3bb6c1bcd..e10dd98036 100644 --- a/cpp/telemetry/src/CarTelemetryImpl.h +++ b/cpp/telemetry/src/CarTelemetryImpl.h @@ -17,7 +17,7 @@ #ifndef CPP_TELEMETRY_SRC_CARTELEMETRYIMPL_H_ #define CPP_TELEMETRY_SRC_CARTELEMETRYIMPL_H_ -#include "RingBuffer.h" +#include "TelemetryServer.h" #include <aidl/android/frameworks/automotive/telemetry/BnCarTelemetry.h> #include <aidl/android/frameworks/automotive/telemetry/CarData.h> @@ -33,15 +33,15 @@ namespace telemetry { // Implementation of android.frameworks.automotive.telemetry.ICarTelemetry. class CarTelemetryImpl : public aidl::android::frameworks::automotive::telemetry::BnCarTelemetry { public: - // Doesn't own `buffer`. - explicit CarTelemetryImpl(RingBuffer* buffer); + // Doesn't own `server`. + explicit CarTelemetryImpl(TelemetryServer* server); ndk::ScopedAStatus write( const std::vector<aidl::android::frameworks::automotive::telemetry::CarData>& dataList) override; private: - RingBuffer* mRingBuffer; // not owned + TelemetryServer* mTelemetryServer; // not owned }; } // namespace telemetry diff --git a/cpp/telemetry/src/CarTelemetryInternalImpl.cpp b/cpp/telemetry/src/CarTelemetryInternalImpl.cpp index 7a3b141cd8..2abf4fcac4 100644 --- a/cpp/telemetry/src/CarTelemetryInternalImpl.cpp +++ b/cpp/telemetry/src/CarTelemetryInternalImpl.cpp @@ -32,68 +32,71 @@ using ::aidl::android::automotive::telemetry::internal::CarDataInternal; using ::aidl::android::automotive::telemetry::internal::ICarDataListener; using ::android::base::StringPrintf; -CarTelemetryInternalImpl::CarTelemetryInternalImpl(RingBuffer* buffer) : - mRingBuffer(buffer), +CarTelemetryInternalImpl::CarTelemetryInternalImpl(TelemetryServer* server) : + mTelemetryServer(server), mBinderDeathRecipient( ::AIBinder_DeathRecipient_new(CarTelemetryInternalImpl::listenerBinderDied)) {} ndk::ScopedAStatus CarTelemetryInternalImpl::setListener( const std::shared_ptr<ICarDataListener>& listener) { - const std::scoped_lock<std::mutex> lock(mMutex); - - if (mCarDataListener != nullptr) { - return ndk::ScopedAStatus::fromExceptionCodeWithMessage(::EX_ILLEGAL_STATE, - "ICarDataListener is already set."); + LOG(VERBOSE) << "Received a setListener call"; + auto result = mTelemetryServer->setListener(listener); + if (!result.ok()) { + LOG(WARNING) << __func__ << ": " << result.error().message(); + return ndk::ScopedAStatus::fromExceptionCodeWithMessage(result.error().code(), + result.error().message().c_str()); } // If passed a local binder, AIBinder_linkToDeath will do nothing and return // STATUS_INVALID_OPERATION. We ignore this case because we only use local binders in tests // where this is not an error. - if (listener->isRemote()) { - auto status = ndk::ScopedAStatus::fromStatus( - ::AIBinder_linkToDeath(listener->asBinder().get(), mBinderDeathRecipient.get(), - this)); - if (!status.isOk()) { - return ndk::ScopedAStatus::fromExceptionCodeWithMessage(::EX_ILLEGAL_STATE, - status.getMessage()); - } + if (!listener->isRemote()) { + return ndk::ScopedAStatus::ok(); } - mCarDataListener = listener; + auto status = ndk::ScopedAStatus::fromStatus( + ::AIBinder_linkToDeath(listener->asBinder().get(), mBinderDeathRecipient.get(), this)); + if (!status.isOk()) { + LOG(WARNING) << __func__ << ": Failed to linkToDeath: " << status.getMessage(); + mTelemetryServer->clearListener(); + return ndk::ScopedAStatus::fromExceptionCodeWithMessage(::EX_ILLEGAL_STATE, + status.getMessage()); + } return ndk::ScopedAStatus::ok(); } ndk::ScopedAStatus CarTelemetryInternalImpl::clearListener() { - const std::scoped_lock<std::mutex> lock(mMutex); - if (mCarDataListener == nullptr) { - LOG(INFO) << __func__ << ": No ICarDataListener, ignoring the call"; + auto listener = mTelemetryServer->getListener(); + mTelemetryServer->clearListener(); + if (listener == nullptr) { return ndk::ScopedAStatus::ok(); } auto status = ndk::ScopedAStatus::fromStatus( - ::AIBinder_unlinkToDeath(mCarDataListener->asBinder().get(), - mBinderDeathRecipient.get(), this)); + ::AIBinder_unlinkToDeath(listener->asBinder().get(), mBinderDeathRecipient.get(), + this)); if (!status.isOk()) { LOG(WARNING) << __func__ << ": unlinkToDeath failed, continuing anyway: " << status.getMessage(); } - mCarDataListener = nullptr; return ndk::ScopedAStatus::ok(); } binder_status_t CarTelemetryInternalImpl::dump(int fd, const char** args, uint32_t numArgs) { dprintf(fd, "ICarTelemetryInternal:\n"); - mRingBuffer->dump(fd); + mTelemetryServer->dump(fd); return ::STATUS_OK; } // Removes the listener if its binder dies. void CarTelemetryInternalImpl::listenerBinderDiedImpl() { - LOG(WARNING) << "A ICarDataListener died, removing the listener."; - const std::scoped_lock<std::mutex> lock(mMutex); - mCarDataListener = nullptr; + LOG(WARNING) << "A ICarDataListener died, clearing the listener."; + mTelemetryServer->clearListener(); } +// static void CarTelemetryInternalImpl::listenerBinderDied(void* cookie) { + // We expect the pointer to be alive as there is only a single instance of + // CarTelemetryInternalImpl and if it dies, the whole process should die too. auto thiz = static_cast<CarTelemetryInternalImpl*>(cookie); thiz->listenerBinderDiedImpl(); } diff --git a/cpp/telemetry/src/CarTelemetryInternalImpl.h b/cpp/telemetry/src/CarTelemetryInternalImpl.h index 12ad5cd5eb..599d580cb1 100644 --- a/cpp/telemetry/src/CarTelemetryInternalImpl.h +++ b/cpp/telemetry/src/CarTelemetryInternalImpl.h @@ -17,7 +17,7 @@ #ifndef CPP_TELEMETRY_SRC_CARTELEMETRYINTERNALIMPL_H_ #define CPP_TELEMETRY_SRC_CARTELEMETRYINTERNALIMPL_H_ -#include "RingBuffer.h" +#include "TelemetryServer.h" #include <aidl/android/automotive/telemetry/internal/BnCarTelemetryInternal.h> #include <aidl/android/automotive/telemetry/internal/CarDataInternal.h> @@ -35,8 +35,8 @@ namespace telemetry { class CarTelemetryInternalImpl : public aidl::android::automotive::telemetry::internal::BnCarTelemetryInternal { public: - // Doesn't own `buffer`. - explicit CarTelemetryInternalImpl(RingBuffer* buffer); + // Doesn't own `server`. + explicit CarTelemetryInternalImpl(TelemetryServer* server); ndk::ScopedAStatus setListener( const std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener>& @@ -53,12 +53,8 @@ private: void listenerBinderDiedImpl(); - RingBuffer* mRingBuffer; // not owned + TelemetryServer* mTelemetryServer; // not owned ndk::ScopedAIBinder_DeathRecipient mBinderDeathRecipient; - std::mutex mMutex; // a mutex for the whole instance - - std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener> - mCarDataListener GUARDED_BY(mMutex); }; } // namespace telemetry diff --git a/cpp/telemetry/src/LooperWrapper.cpp b/cpp/telemetry/src/LooperWrapper.cpp new file mode 100644 index 0000000000..b14f967091 --- /dev/null +++ b/cpp/telemetry/src/LooperWrapper.cpp @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2021, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LooperWrapper.h" + +namespace android { +namespace automotive { +namespace telemetry { + +using ::android::sp; + +int LooperWrapper::pollAll(int timeoutMillis) { + return mLooper->pollAll(timeoutMillis); +} + +void LooperWrapper::sendMessageDelayed(nsecs_t uptime, const sp<MessageHandler>& handler, + const Message& message) { + return mLooper->sendMessageDelayed(uptime, handler, message); +} + +void LooperWrapper::removeMessages(const android::sp<MessageHandler>& handler, int what) { + return mLooper->removeMessages(handler, what); +} + +} // namespace telemetry +} // namespace automotive +} // namespace android diff --git a/cpp/telemetry/src/LooperWrapper.h b/cpp/telemetry/src/LooperWrapper.h new file mode 100644 index 0000000000..171c57fc74 --- /dev/null +++ b/cpp/telemetry/src/LooperWrapper.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CPP_TELEMETRY_SRC_LOOPERWRAPPER_H_ +#define CPP_TELEMETRY_SRC_LOOPERWRAPPER_H_ + +#include <utils/Looper.h> + +namespace android { +namespace automotive { +namespace telemetry { + +// LooperWrapper is a wrapper around the actual looper implementation so tests can stub this wrapper +// to deterministically poll the underlying looper. +// Refer to utils/Looper.h for the class and methods descriptions. +class LooperWrapper { +public: + LooperWrapper(android::sp<Looper> looper) : mLooper(looper){}; + virtual ~LooperWrapper(){}; + + virtual int pollAll(int timeoutMillis); + virtual void sendMessageDelayed(nsecs_t uptime, const android::sp<MessageHandler>& handler, + const Message& message); + virtual void removeMessages(const android::sp<MessageHandler>& handler, int what); + +private: + android::sp<Looper> mLooper; +}; + +} // namespace telemetry +} // namespace automotive +} // namespace android + +#endif // CPP_TELEMETRY_SRC_LOOPERWRAPPER_H_ diff --git a/cpp/telemetry/src/RingBuffer.cpp b/cpp/telemetry/src/RingBuffer.cpp index 36de3f8a20..c1387f85cf 100644 --- a/cpp/telemetry/src/RingBuffer.cpp +++ b/cpp/telemetry/src/RingBuffer.cpp @@ -29,7 +29,6 @@ namespace telemetry { RingBuffer::RingBuffer(int32_t limit) : mSizeLimit(limit) {} void RingBuffer::push(BufferedCarData&& data) { - const std::scoped_lock<std::mutex> lock(mMutex); mList.push_back(std::move(data)); while (mList.size() > mSizeLimit) { mList.pop_front(); @@ -37,23 +36,20 @@ void RingBuffer::push(BufferedCarData&& data) { } } -BufferedCarData RingBuffer::popFront() { - const std::scoped_lock<std::mutex> lock(mMutex); - auto result = std::move(mList.front()); - mList.pop_front(); +BufferedCarData RingBuffer::popBack() { + auto result = std::move(mList.back()); + mList.pop_back(); return result; } void RingBuffer::dump(int fd) const { - const std::scoped_lock<std::mutex> lock(mMutex); - dprintf(fd, "RingBuffer:\n"); - dprintf(fd, " mSizeLimit=%d\n", mSizeLimit); - dprintf(fd, " mList.size=%zu\n", mList.size()); - dprintf(fd, " mTotalDroppedDataCount=%" PRIu64 "\n", mTotalDroppedDataCount); + dprintf(fd, " RingBuffer:\n"); + dprintf(fd, " mSizeLimit=%d\n", mSizeLimit); + dprintf(fd, " mList.size=%zu\n", mList.size()); + dprintf(fd, " mTotalDroppedDataCount=%" PRIu64 "\n", mTotalDroppedDataCount); } int32_t RingBuffer::size() const { - const std::scoped_lock<std::mutex> lock(mMutex); return mList.size(); } diff --git a/cpp/telemetry/src/RingBuffer.h b/cpp/telemetry/src/RingBuffer.h index 07ce709543..3247528b2a 100644 --- a/cpp/telemetry/src/RingBuffer.h +++ b/cpp/telemetry/src/RingBuffer.h @@ -20,14 +20,13 @@ #include "BufferedCarData.h" #include <list> -#include <mutex> namespace android { namespace automotive { namespace telemetry { // A ring buffer that holds BufferedCarData. It drops old data if it's full. -// Thread-safe. +// Not thread-safe. class RingBuffer { public: // RingBuffer limits the number of elements in the buffer to the given param `sizeLimit`. @@ -44,8 +43,8 @@ public: // Supports moving the data to the RingBuffer. void push(BufferedCarData&& data); - // Returns the oldest element from the ring buffer and removes it from the buffer. - BufferedCarData popFront(); + // Returns the newest element from the ring buffer and removes it from the buffer. + BufferedCarData popBack(); // Dumps the current state for dumpsys. void dump(int fd) const; @@ -54,8 +53,6 @@ public: int32_t size() const; private: - mutable std::mutex mMutex; // a mutex for the whole instance - const int32_t mSizeLimit; // TODO(b/174608802): Improve dropped CarData handling, see ag/13818937 for details. diff --git a/cpp/telemetry/src/TelemetryServer.cpp b/cpp/telemetry/src/TelemetryServer.cpp index 54cd3c4c9e..0a124e5b05 100644 --- a/cpp/telemetry/src/TelemetryServer.cpp +++ b/cpp/telemetry/src/TelemetryServer.cpp @@ -19,68 +19,140 @@ #include "CarTelemetryImpl.h" #include "RingBuffer.h" -#include <android-base/chrono_utils.h> +#include <aidl/android/automotive/telemetry/internal/CarDataInternal.h> #include <android-base/logging.h> -#include <android-base/properties.h> -#include <android/binder_interface_utils.h> -#include <android/binder_manager.h> -#include <android/binder_process.h> #include <inttypes.h> // for PRIu64 and friends #include <memory> -#include <thread> // NOLINT(build/c++11) namespace android { namespace automotive { namespace telemetry { -using ::android::automotive::telemetry::RingBuffer; - -constexpr const char kCarTelemetryServiceName[] = - "android.frameworks.automotive.telemetry.ICarTelemetry/default"; -constexpr const char kCarTelemetryInternalServiceName[] = - "android.automotive.telemetry.internal.ICarTelemetryInternal/default"; +namespace { + +using ::aidl::android::automotive::telemetry::internal::CarDataInternal; +using ::aidl::android::automotive::telemetry::internal::ICarDataListener; +using ::aidl::android::frameworks::automotive::telemetry::CarData; +using ::android::base::Error; +using ::android::base::Result; + +enum { + MSG_PUSH_CAR_DATA_TO_LISTENER = 1, +}; + +// If ICarDataListener cannot accept data, the next push should be delayed little bit to allow +// the listener to recover. +constexpr const std::chrono::seconds kPushCarDataFailureDelaySeconds = 1s; +} // namespace + +TelemetryServer::TelemetryServer(LooperWrapper* looper, + const std::chrono::nanoseconds& pushCarDataDelayNs, + const int maxBufferSize) : + mLooper(looper), + mPushCarDataDelayNs(pushCarDataDelayNs), + mRingBuffer(maxBufferSize), + mMessageHandler(new MessageHandlerImpl(this)) {} + +Result<void> TelemetryServer::setListener(const std::shared_ptr<ICarDataListener>& listener) { + const std::scoped_lock<std::mutex> lock(mMutex); + if (mCarDataListener != nullptr) { + return Error(::EX_ILLEGAL_STATE) << "ICarDataListener is already set"; + } + mCarDataListener = listener; + mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler, + MSG_PUSH_CAR_DATA_TO_LISTENER); + return {}; +} -// TODO(b/183444070): make it configurable using sysprop -// CarData count limit in the RingBuffer. In worst case it will use kMaxBufferSize * 10Kb memory, -// which is ~ 1MB. -const int kMaxBufferSize = 100; +void TelemetryServer::clearListener() { + const std::scoped_lock<std::mutex> lock(mMutex); + if (mCarDataListener == nullptr) { + return; + } + mCarDataListener = nullptr; + mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER); +} -TelemetryServer::TelemetryServer() : mRingBuffer(kMaxBufferSize) {} +std::shared_ptr<ICarDataListener> TelemetryServer::getListener() { + const std::scoped_lock<std::mutex> lock(mMutex); + return mCarDataListener; +} -void TelemetryServer::registerServices() { - std::shared_ptr<CarTelemetryImpl> telemetry = - ndk::SharedRefBase::make<CarTelemetryImpl>(&mRingBuffer); - std::shared_ptr<CarTelemetryInternalImpl> telemetryInternal = - ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&mRingBuffer); +void TelemetryServer::dump(int fd) { + const std::scoped_lock<std::mutex> lock(mMutex); + dprintf(fd, " TelemetryServer:\n"); + mRingBuffer.dump(fd); +} - // Wait for the service manager before starting ICarTelemetry service. - while (android::base::GetProperty("init.svc.servicemanager", "") != "running") { - // Poll frequent enough so the writer clients can connect to the service during boot. - std::this_thread::sleep_for(250ms); +// TODO(b/174608802): Add 10kb size check for the `dataList`, see the AIDL for the limits +void TelemetryServer::writeCarData(const std::vector<CarData>& dataList, uid_t publisherUid) { + const std::scoped_lock<std::mutex> lock(mMutex); + bool bufferWasEmptyBefore = mRingBuffer.size() == 0; + for (auto&& data : dataList) { + mRingBuffer.push({.mId = data.id, + .mContent = std::move(data.content), + .mPublisherUid = publisherUid}); + } + // If the mRingBuffer was not empty, the message is already scheduled. It prevents scheduling + // too many unnecessary idendical messages in the looper. + if (mCarDataListener != nullptr && bufferWasEmptyBefore && mRingBuffer.size() > 0) { + mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler, + MSG_PUSH_CAR_DATA_TO_LISTENER); } +} - LOG(VERBOSE) << "Registering " << kCarTelemetryServiceName; - binder_exception_t exception = - ::AServiceManager_addService(telemetry->asBinder().get(), kCarTelemetryServiceName); - if (exception != ::EX_NONE) { - LOG(FATAL) << "Unable to register " << kCarTelemetryServiceName - << ", exception=" << exception; +// Runs on the main thread. +void TelemetryServer::pushCarDataToListeners() { + std::shared_ptr<ICarDataListener> listener; + std::vector<CarDataInternal> pendingCarDataInternals; + { + const std::scoped_lock<std::mutex> lock(mMutex); + // Remove extra messages. + mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER); + if (mCarDataListener == nullptr || mRingBuffer.size() == 0) { + return; + } + listener = mCarDataListener; + // Push elements to pendingCarDataInternals in reverse order so we can send data + // from the back of the pendingCarDataInternals vector. + while (mRingBuffer.size() > 0) { + auto carData = std::move(mRingBuffer.popBack()); + CarDataInternal data; + data.id = carData.mId; + data.content = std::move(carData.mContent); + pendingCarDataInternals.push_back(data); + } } - LOG(VERBOSE) << "Registering " << kCarTelemetryInternalServiceName; - exception = ::AServiceManager_addService(telemetryInternal->asBinder().get(), - kCarTelemetryInternalServiceName); - if (exception != ::EX_NONE) { - LOG(FATAL) << "Unable to register " << kCarTelemetryInternalServiceName - << ", exception=" << exception; + // Now the mutex is unlocked, we can do the heavy work. + + // TODO(b/186477983): send data in batch to improve performance, but careful sending too + // many data at once, as it could clog the Binder - it has <1MB limit. + while (!pendingCarDataInternals.empty()) { + auto status = listener->onCarDataReceived({pendingCarDataInternals.back()}); + if (!status.isOk()) { + LOG(WARNING) << "Failed to push CarDataInternal, will try again: " + << status.getMessage(); + sleep(kPushCarDataFailureDelaySeconds.count()); + } else { + pendingCarDataInternals.pop_back(); + } } } -void TelemetryServer::startAndJoinThreadPool() { - ::ABinderProcess_startThreadPool(); // Starts the default 15 binder threads. - ::ABinderProcess_joinThreadPool(); +TelemetryServer::MessageHandlerImpl::MessageHandlerImpl(TelemetryServer* server) : + mTelemetryServer(server) {} + +void TelemetryServer::MessageHandlerImpl::handleMessage(const Message& message) { + switch (message.what) { + case MSG_PUSH_CAR_DATA_TO_LISTENER: + mTelemetryServer->pushCarDataToListeners(); + break; + default: + LOG(WARNING) << "Unknown message: " << message.what; + } } } // namespace telemetry diff --git a/cpp/telemetry/src/TelemetryServer.h b/cpp/telemetry/src/TelemetryServer.h index 0b400a11df..20c38396db 100644 --- a/cpp/telemetry/src/TelemetryServer.h +++ b/cpp/telemetry/src/TelemetryServer.h @@ -17,28 +17,90 @@ #ifndef CPP_TELEMETRY_SRC_TELEMETRYSERVER_H_ #define CPP_TELEMETRY_SRC_TELEMETRYSERVER_H_ -#include "CarTelemetryImpl.h" -#include "CarTelemetryInternalImpl.h" +#include "LooperWrapper.h" +#include "RingBuffer.h" -#include <utils/Errors.h> +#include <aidl/android/automotive/telemetry/internal/ICarDataListener.h> +#include <aidl/android/frameworks/automotive/telemetry/CarData.h> +#include <android-base/chrono_utils.h> +#include <android-base/result.h> +#include <android-base/thread_annotations.h> +#include <gtest/gtest_prod.h> +#include <utils/Looper.h> + +#include <memory> namespace android { namespace automotive { namespace telemetry { +// This class contains the main logic of cartelemetryd native service. +// +// [writer clients] -> ICarTelemetry -----------. +// [reader client] --> ICarTelemetryInternal -----`-> TelemetryServer +// +// TelemetryServer starts pushing CarData to ICarDataListener when there is a data available and +// the listener is set and alive. It uses `mLooper` for periodically pushing the data. +// +// This class is thread-safe. class TelemetryServer { public: - TelemetryServer(); + explicit TelemetryServer(LooperWrapper* looper, + const std::chrono::nanoseconds& pushCarDataDelayNs, int maxBufferSize); + + // Dumps the current state for dumpsys. + // Expected to be called from a binder thread pool. + void dump(int fd); + + // Writes incoming CarData to the RingBuffer. + // Expected to be called from a binder thread pool. + void writeCarData( + const std::vector<aidl::android::frameworks::automotive::telemetry::CarData>& dataList, + uid_t publisherUid); - // Registers all the implemented AIDL services. Waits until `servicemanager` is available. - // Aborts the process if fails. - void registerServices(); + // Sets the listener. If the listener already set, it returns an error. + // Expected to be called from a binder thread pool. + android::base::Result<void> setListener( + const std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener>& + listener); - // Blocks the thread. - void startAndJoinThreadPool(); + // Clears the listener and returns it. + // Expected to be called from a binder thread pool. + void clearListener(); + + // Expected to be called from a binder thread pool. + std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener> getListener(); private: - RingBuffer mRingBuffer; + class MessageHandlerImpl : public MessageHandler { + public: + explicit MessageHandlerImpl(TelemetryServer* server); + + void handleMessage(const Message& message) override; + + private: + TelemetryServer* mTelemetryServer; // not owned + }; + +private: + // Periodically called by mLooper if there is a "push car data" messages. + void pushCarDataToListeners(); + + LooperWrapper* mLooper; // not owned + const std::chrono::nanoseconds mPushCarDataDelayNs; + + // A single mutex for all the sensitive operations. Threads must not lock it for long time, + // as clients will be writing CarData to the ring buffer under this mutex. + std::mutex mMutex; + RingBuffer mRingBuffer GUARDED_BY(mMutex); + std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener> + mCarDataListener GUARDED_BY(mMutex); + android::sp<MessageHandlerImpl> mMessageHandler; // Handler for mLooper. + + // Friends are simplest way of testing if `pushCarDataToListeners()` can handle edge cases. + friend class TelemetryServerTest; + FRIEND_TEST(TelemetryServerTest, NoListenerButMultiplePushes); + FRIEND_TEST(TelemetryServerTest, NoDataButMultiplePushes); }; } // namespace telemetry diff --git a/cpp/telemetry/src/main.cpp b/cpp/telemetry/src/main.cpp index 1bd0dde710..615e0e105a 100644 --- a/cpp/telemetry/src/main.cpp +++ b/cpp/telemetry/src/main.cpp @@ -14,23 +14,85 @@ * limitations under the License. */ +#include "CarTelemetryImpl.h" +#include "CarTelemetryInternalImpl.h" +#include "LooperWrapper.h" #include "TelemetryServer.h" +#include <android-base/chrono_utils.h> #include <android-base/logging.h> +#include <android-base/properties.h> +#include <android/binder_interface_utils.h> +#include <android/binder_manager.h> +#include <android/binder_process.h> +#include <utils/Looper.h> +#include <thread> // NOLINT(build/c++11) + +using ::android::automotive::telemetry::CarTelemetryImpl; +using ::android::automotive::telemetry::CarTelemetryInternalImpl; +using ::android::automotive::telemetry::LooperWrapper; using ::android::automotive::telemetry::TelemetryServer; // TODO(b/174608802): handle SIGQUIT/SIGTERM +constexpr const char kCarTelemetryServiceName[] = + "android.frameworks.automotive.telemetry.ICarTelemetry/default"; +constexpr const char kCarTelemetryInternalServiceName[] = + "android.automotive.telemetry.internal.ICarTelemetryInternal/default"; + +// The min delay between each ICarDataListener.onCarDataReceived() calls. It's needed to avoid +// too frequently making binder transactions. +// Binder has <1MS latency for 10KB data. +// TODO(b/186477983): improve sending car data after implementing CarTelemetryService. +// TODO(b/183444070): make it configurable using sysprop +constexpr const std::chrono::nanoseconds kPushCarDataDelayNs = 10ms; + +// TODO(b/183444070): make it configurable using sysprop +// CarData count limit in the RingBuffer. In worst case it will use kMaxBufferSize * 10Kb memory, +// which is ~ 1MB. +const int kMaxBufferSize = 100; + int main(void) { LOG(INFO) << "Starting cartelemetryd"; - TelemetryServer server; + LooperWrapper looper(android::Looper::prepare(/* opts= */ 0)); + TelemetryServer server(&looper, kPushCarDataDelayNs, kMaxBufferSize); + std::shared_ptr<CarTelemetryImpl> telemetry = + ndk::SharedRefBase::make<CarTelemetryImpl>(&server); + std::shared_ptr<CarTelemetryInternalImpl> telemetryInternal = + ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&server); + + // Wait for the service manager before starting ICarTelemetry service. + while (android::base::GetProperty("init.svc.servicemanager", "") != "running") { + // Poll frequent enough so the writer clients can connect to the service during boot. + std::this_thread::sleep_for(250ms); + } + + LOG(VERBOSE) << "Registering " << kCarTelemetryServiceName; + binder_exception_t exception = + ::AServiceManager_addService(telemetry->asBinder().get(), kCarTelemetryServiceName); + if (exception != ::EX_NONE) { + LOG(FATAL) << "Unable to register " << kCarTelemetryServiceName + << ", exception=" << exception; + } + + LOG(VERBOSE) << "Registering " << kCarTelemetryInternalServiceName; + exception = ::AServiceManager_addService(telemetryInternal->asBinder().get(), + kCarTelemetryInternalServiceName); + if (exception != ::EX_NONE) { + LOG(FATAL) << "Unable to register " << kCarTelemetryInternalServiceName + << ", exception=" << exception; + } - // Register AIDL services. Aborts the server if fails. - server.registerServices(); + LOG(VERBOSE) << "Services are registered, starting thread pool"; + ::ABinderProcess_startThreadPool(); // Starts the default 15 binder threads. - LOG(VERBOSE) << "Service is created, joining the threadpool"; - server.startAndJoinThreadPool(); + LOG(VERBOSE) << "Running the server."; + // Loop forever -- pushing data to ICarDataListener runs on this thread, and the binder calls + // remain responsive in their pool of one thread. + while (true) { + looper.pollAll(/* timeoutMillis= */ -1); + } return 1; // never reaches } diff --git a/cpp/telemetry/tests/CarTelemetryImplTest.cpp b/cpp/telemetry/tests/CarTelemetryImplTest.cpp deleted file mode 100644 index 028647754c..0000000000 --- a/cpp/telemetry/tests/CarTelemetryImplTest.cpp +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2021 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "CarTelemetryImpl.h" -#include "RingBuffer.h" - -#include <aidl/android/frameworks/automotive/telemetry/CarData.h> -#include <aidl/android/frameworks/automotive/telemetry/ICarTelemetry.h> -#include <gmock/gmock.h> -#include <gtest/gtest.h> - -#include <unistd.h> - -#include <memory> - -namespace android { -namespace automotive { -namespace telemetry { - -using ::aidl::android::frameworks::automotive::telemetry::CarData; -using ::aidl::android::frameworks::automotive::telemetry::ICarTelemetry; -using ::testing::ContainerEq; - -const size_t kMaxBufferSize = 5; - -CarData buildCarData(int id, const std::vector<uint8_t>& content) { - CarData msg; - msg.id = id; - msg.content = content; - return msg; -} - -BufferedCarData buildBufferedCarData(const CarData& data, uid_t publisherUid) { - return {.mId = data.id, .mContent = std::move(data.content), .mPublisherUid = publisherUid}; -} - -class CarTelemetryImplTest : public ::testing::Test { -protected: - CarTelemetryImplTest() : - mBuffer(RingBuffer(kMaxBufferSize)), - mTelemetry(ndk::SharedRefBase::make<CarTelemetryImpl>(&mBuffer)) {} - - RingBuffer mBuffer; - std::shared_ptr<ICarTelemetry> mTelemetry; -}; - -TEST_F(CarTelemetryImplTest, WriteReturnsOkStatus) { - CarData msg = buildCarData(101, {1, 0, 1, 0}); - - auto status = mTelemetry->write({msg}); - - EXPECT_TRUE(status.isOk()) << status.getMessage(); -} - -TEST_F(CarTelemetryImplTest, WriteAddsCarDataToRingBuffer) { - CarData msg = buildCarData(101, {1, 0, 1, 0}); - - mTelemetry->write({msg}); - - EXPECT_EQ(mBuffer.popFront(), buildBufferedCarData(msg, getuid())); -} - -TEST_F(CarTelemetryImplTest, WriteBuffersOnlyLimitedAmount) { - RingBuffer buffer(/* sizeLimit= */ 3); - auto telemetry = ndk::SharedRefBase::make<CarTelemetryImpl>(&buffer); - - CarData msg101_2 = buildCarData(101, {1, 0}); - CarData msg101_4 = buildCarData(101, {1, 0, 1, 0}); - CarData msg201_3 = buildCarData(201, {3, 3, 3}); - - // Inserting 5 elements - telemetry->write({msg101_2, msg101_4, msg101_4, msg201_3}); - telemetry->write({msg201_3}); - - EXPECT_EQ(buffer.size(), 3); - std::vector<BufferedCarData> result = {buffer.popFront(), buffer.popFront(), buffer.popFront()}; - std::vector<BufferedCarData> expected = {buildBufferedCarData(msg101_4, getuid()), - buildBufferedCarData(msg201_3, getuid()), - buildBufferedCarData(msg201_3, getuid())}; - EXPECT_THAT(result, ContainerEq(expected)); - EXPECT_EQ(buffer.size(), 0); -} - -} // namespace telemetry -} // namespace automotive -} // namespace android diff --git a/cpp/telemetry/tests/CarTelemetryInternalImplTest.cpp b/cpp/telemetry/tests/CarTelemetryInternalImplTest.cpp deleted file mode 100644 index 22838cd2a5..0000000000 --- a/cpp/telemetry/tests/CarTelemetryInternalImplTest.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2021 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "CarTelemetryInternalImpl.h" -#include "RingBuffer.h" - -#include <aidl/android/automotive/telemetry/internal/BnCarDataListener.h> -#include <aidl/android/automotive/telemetry/internal/CarDataInternal.h> -#include <aidl/android/automotive/telemetry/internal/ICarTelemetryInternal.h> -#include <gmock/gmock.h> -#include <gtest/gtest.h> - -#include <unistd.h> - -#include <memory> - -namespace android { -namespace automotive { -namespace telemetry { - -using ::aidl::android::automotive::telemetry::internal::BnCarDataListener; -using ::aidl::android::automotive::telemetry::internal::CarDataInternal; -using ::aidl::android::automotive::telemetry::internal::ICarTelemetryInternal; -using ::ndk::ScopedAStatus; - -const size_t kMaxBufferSize = 5; - -class MockCarDataListener : public BnCarDataListener { -public: - MOCK_METHOD(ScopedAStatus, onCarDataReceived, (const std::vector<CarDataInternal>& dataList), - (override)); -}; - -// The main test class. -class CarTelemetryInternalImplTest : public ::testing::Test { -protected: - CarTelemetryInternalImplTest() : - mBuffer(RingBuffer(kMaxBufferSize)), - mTelemetryInternal(ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&mBuffer)), - mMockCarDataListener(ndk::SharedRefBase::make<MockCarDataListener>()) {} - - RingBuffer mBuffer; - std::shared_ptr<ICarTelemetryInternal> mTelemetryInternal; - std::shared_ptr<MockCarDataListener> mMockCarDataListener; -}; - -TEST_F(CarTelemetryInternalImplTest, SetListenerReturnsOk) { - auto status = mTelemetryInternal->setListener(mMockCarDataListener); - - EXPECT_TRUE(status.isOk()) << status.getMessage(); -} - -TEST_F(CarTelemetryInternalImplTest, SetListenerFailsWhenAlreadySubscribed) { - mTelemetryInternal->setListener(mMockCarDataListener); - - auto status = mTelemetryInternal->setListener(ndk::SharedRefBase::make<MockCarDataListener>()); - - EXPECT_EQ(status.getExceptionCode(), ::EX_ILLEGAL_STATE) << status.getMessage(); -} - -TEST_F(CarTelemetryInternalImplTest, ClearListenerWorks) { - mTelemetryInternal->setListener(mMockCarDataListener); - - mTelemetryInternal->clearListener(); - auto status = mTelemetryInternal->setListener(mMockCarDataListener); - - EXPECT_TRUE(status.isOk()) << status.getMessage(); -} - -} // namespace telemetry -} // namespace automotive -} // namespace android diff --git a/cpp/telemetry/tests/FakeLooperWrapper.h b/cpp/telemetry/tests/FakeLooperWrapper.h new file mode 100644 index 0000000000..1914ee5624 --- /dev/null +++ b/cpp/telemetry/tests/FakeLooperWrapper.h @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021, The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CPP_TELEMETRY_SRC_TESTS_FAKELOOPERWRAPPER_H_ +#define CPP_TELEMETRY_SRC_TESTS_FAKELOOPERWRAPPER_H_ + +#include "LooperWrapper.h" + +#include <algorithm> +#include <deque> +#include <map> + +namespace android { +namespace automotive { +namespace telemetry { + +// Fake `utils/Looper.h` implementation. Explicitly use `FakeLooperWrapper::poll()` method +// to process the messages. +// +// Not thread-safe. +class FakeLooperWrapper : public LooperWrapper { +public: + static inline constexpr int kNoScheduledMessage = -1; + + FakeLooperWrapper() : LooperWrapper(nullptr){}; + + int pollAll(int timeoutMillis) override { return 0; } + + void sendMessageDelayed(nsecs_t uptime, const android::sp<MessageHandler>& handler, + const Message& message) override { + mUptimeEntries[::systemTime() + uptime].push_back( + {.mHandler = handler, .mMessage = message}); + } + + void removeMessages(const android::sp<MessageHandler>& handler, int what) override { + for (auto it = mUptimeEntries.begin(); it != mUptimeEntries.end();) { + auto [entryUptime, entries] = *it; + for (auto eit = entries.begin(); eit != entries.end();) { + if (eit->mMessage.what == what) { + eit = entries.erase(eit); + } else { + ++eit; + } + } + if (entries.empty()) { + it = mUptimeEntries.erase(it); + } else { + ++it; + } + } + } + + // Processes the next message. + void poll() { + auto it = mUptimeEntries.begin(); + if (it == mUptimeEntries.end() || it->second.empty()) { + return; + } + auto entry = std::move(it->second.front()); + it->second.pop_front(); + if (it->second.empty()) { + // if entries is empty, erase the uptime from the map. + mUptimeEntries.erase(it); + } + entry.mHandler->handleMessage(entry.mMessage); + } + + // Returns the next scheduled message uptime. kNoScheduledMessage if there is no message. + nsecs_t getNextMessageUptime() { + auto it = mUptimeEntries.begin(); + return it == mUptimeEntries.end() ? kNoScheduledMessage : it->first; + } + +private: + struct LooperEntry { + sp<MessageHandler> mHandler; + Message mMessage; + }; + +private: + using Entries = std::deque<LooperEntry>; + + // <uptimeNanos, entries> - where uptimeNanos is time since boot. + std::map<nsecs_t, Entries> mUptimeEntries; +}; + +} // namespace telemetry +} // namespace automotive +} // namespace android + +#endif // CPP_TELEMETRY_SRC_TESTS_FAKELOOPERWRAPPER_H_ diff --git a/cpp/telemetry/tests/RingBufferTest.cpp b/cpp/telemetry/tests/RingBufferTest.cpp deleted file mode 100644 index c5b5bc7aa4..0000000000 --- a/cpp/telemetry/tests/RingBufferTest.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2021 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "RingBuffer.h" - -#include <gmock/gmock.h> -#include <gtest/gtest.h> - -#include <memory> - -// NOTE: many of RingBuffer's behaviors are tested as part of CarTelemetryImpl. - -namespace android { -namespace automotive { -namespace telemetry { - -using testing::ContainerEq; - -BufferedCarData buildBufferedCarData(int32_t id, const std::vector<uint8_t>& content) { - return {.mId = id, .mContent = content, .mPublisherUid = 0}; -} - -TEST(RingBufferTest, PopFrontReturnsCorrectResults) { - RingBuffer buffer(/* sizeLimit= */ 10); - buffer.push(buildBufferedCarData(101, {7})); - buffer.push(buildBufferedCarData(102, {7})); - - BufferedCarData result = buffer.popFront(); - - EXPECT_EQ(result, buildBufferedCarData(101, {7})); -} - -TEST(RingBufferTest, PopFrontRemovesFromBuffer) { - RingBuffer buffer(/* sizeLimit= */ 10); - buffer.push(buildBufferedCarData(101, {7})); - buffer.push(buildBufferedCarData(102, {7, 8})); - - buffer.popFront(); - - EXPECT_EQ(buffer.size(), 1); // only ID=102 left -} - -} // namespace telemetry -} // namespace automotive -} // namespace android diff --git a/cpp/telemetry/tests/TelemetryServerTest.cpp b/cpp/telemetry/tests/TelemetryServerTest.cpp new file mode 100644 index 0000000000..8a6fab5b13 --- /dev/null +++ b/cpp/telemetry/tests/TelemetryServerTest.cpp @@ -0,0 +1,302 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CarTelemetryImpl.h" +#include "CarTelemetryInternalImpl.h" +#include "FakeLooperWrapper.h" +#include "LooperWrapper.h" +#include "RingBuffer.h" +#include "TelemetryServer.h" + +#include <aidl/android/automotive/telemetry/internal/BnCarDataListener.h> +#include <aidl/android/automotive/telemetry/internal/CarDataInternal.h> +#include <aidl/android/automotive/telemetry/internal/ICarTelemetryInternal.h> +#include <aidl/android/frameworks/automotive/telemetry/CarData.h> +#include <aidl/android/frameworks/automotive/telemetry/ICarTelemetry.h> +#include <android-base/chrono_utils.h> +#include <android-base/logging.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> +#include <utils/Timers.h> // for ::systemTime() + +#include <unistd.h> + +#include <memory> + +namespace android { +namespace automotive { +namespace telemetry { + +using ::aidl::android::automotive::telemetry::internal::BnCarDataListener; +using ::aidl::android::automotive::telemetry::internal::CarDataInternal; +using ::aidl::android::automotive::telemetry::internal::ICarTelemetryInternal; +using ::aidl::android::frameworks::automotive::telemetry::CarData; +using ::aidl::android::frameworks::automotive::telemetry::ICarTelemetry; +using ::ndk::ScopedAStatus; +using ::testing::_; +using ::testing::ByMove; +using ::testing::Return; + +constexpr const std::chrono::nanoseconds kPushCarDataDelayNs = 1000ms; +constexpr const std::chrono::nanoseconds kAllowedErrorNs = 100ms; +const int kMaxBufferSize = 3; + +// Because `ScopedAStatus` is move-only, `EXPECT_CALL().WillRepeatedly()` will not work. +inline testing::internal::ReturnAction<testing::internal::ByMoveWrapper<ScopedAStatus>> ReturnOk() { + return testing::Return(ByMove(ScopedAStatus::ok())); +} + +// Builds incoming CarData from writer clients. +CarData buildCarData(int id, const std::vector<uint8_t>& content) { + CarData msg; + msg.id = id; + msg.content = content; + return msg; +} + +// Builds outgoing CarDataInternal to the CarTelemetryService. +CarDataInternal buildCarDataInternal(int id, const std::vector<uint8_t>& content) { + CarDataInternal msg; + msg.id = id; + msg.content = content; + return msg; +} + +// Mock listener, behaves as CarTelemetryService. +class MockCarDataListener : public BnCarDataListener { +public: + MOCK_METHOD(ScopedAStatus, onCarDataReceived, (const std::vector<CarDataInternal>& dataList), + (override)); +}; + +// The main test class. Tests using `ICarTelemetry` and `ICarTelemetryInternal` interfaces. +// Pushing data to the listener is done in the looper - always call `mFakeLooper.poll()`. +class TelemetryServerTest : public ::testing::Test { +protected: + TelemetryServerTest() : + mTelemetryServer(&mFakeLooper, kPushCarDataDelayNs, kMaxBufferSize), + mMockCarDataListener(ndk::SharedRefBase::make<MockCarDataListener>()), + mTelemetry(ndk::SharedRefBase::make<CarTelemetryImpl>(&mTelemetryServer)), + mTelemetryInternal( + ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&mTelemetryServer)) {} + + // Creates an expectation. This is a nice helper that accepts a std::vector, original + // EXPECT_CALL() requires creating std::vector variable. + testing::internal::TypedExpectation<ScopedAStatus(const std::vector<CarDataInternal>&)>& + expectMockListenerToReceive(const std::vector<CarDataInternal>& expected) { + return EXPECT_CALL(*mMockCarDataListener, onCarDataReceived(expected)); + } + + FakeLooperWrapper mFakeLooper; + TelemetryServer mTelemetryServer; + std::shared_ptr<MockCarDataListener> mMockCarDataListener; + std::shared_ptr<ICarTelemetry> mTelemetry; + std::shared_ptr<ICarTelemetryInternal> mTelemetryInternal; +}; + +TEST_F(TelemetryServerTest, WriteReturnsOk) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + + auto status = mTelemetry->write(dataList); + + EXPECT_TRUE(status.isOk()) << status.getMessage(); +} + +TEST_F(TelemetryServerTest, SetListenerReturnsOk) { + auto status = mTelemetryInternal->setListener(mMockCarDataListener); + + EXPECT_TRUE(status.isOk()) << status.getMessage(); +} + +TEST_F(TelemetryServerTest, SetListenerFailsWhenAlreadySubscribed) { + mTelemetryInternal->setListener(mMockCarDataListener); + + auto status = mTelemetryInternal->setListener(ndk::SharedRefBase::make<MockCarDataListener>()); + + EXPECT_EQ(status.getExceptionCode(), ::EX_ILLEGAL_STATE) << status.getMessage(); +} + +TEST_F(TelemetryServerTest, ClearListenerWorks) { + mTelemetryInternal->setListener(mMockCarDataListener); + + mTelemetryInternal->clearListener(); + + auto status = mTelemetryInternal->setListener(mMockCarDataListener); + EXPECT_TRUE(status.isOk()) << status.getMessage(); +} + +TEST_F(TelemetryServerTest, ClearListenerRemovesPushMessagesFromLooper) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + mTelemetry->write(dataList); + mTelemetryInternal->setListener(mMockCarDataListener); + EXPECT_NE(mFakeLooper.getNextMessageUptime(), FakeLooperWrapper::kNoScheduledMessage); + + mTelemetryInternal->clearListener(); + + EXPECT_EQ(mFakeLooper.getNextMessageUptime(), FakeLooperWrapper::kNoScheduledMessage); +} + +TEST_F(TelemetryServerTest, WriteSchedulesNextMessageAfterRightDelay) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + mTelemetryInternal->setListener(mMockCarDataListener); + + mTelemetry->write(dataList); + + EXPECT_NEAR(mFakeLooper.getNextMessageUptime(), ::systemTime() + kPushCarDataDelayNs.count(), + kAllowedErrorNs.count()); +} + +TEST_F(TelemetryServerTest, SetListenerSchedulesNextMessageAfterRightDelay) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + mTelemetry->write(dataList); + + mTelemetryInternal->setListener(mMockCarDataListener); + + EXPECT_NEAR(mFakeLooper.getNextMessageUptime(), ::systemTime() + kPushCarDataDelayNs.count(), + kAllowedErrorNs.count()); +} + +TEST_F(TelemetryServerTest, BuffersOnlyLimitedData) { + mTelemetryInternal->setListener(mMockCarDataListener); + std::vector<CarData> dataList1 = {buildCarData(10, {1, 2}), buildCarData(11, {2, 3})}; + std::vector<CarData> dataList2 = {buildCarData(101, {1, 2}), buildCarData(102, {2, 3}), + buildCarData(103, {3, 4}), buildCarData(104, {4, 5})}; + + mTelemetry->write(dataList1); + mTelemetry->write(dataList2); + + // Only the last 3 CarData should be received, because kMaxBufferSize = 3. + expectMockListenerToReceive({buildCarDataInternal(102, {2, 3})}).WillOnce(ReturnOk()); + expectMockListenerToReceive({buildCarDataInternal(103, {3, 4})}).WillOnce(ReturnOk()); + expectMockListenerToReceive({buildCarDataInternal(104, {4, 5})}).WillOnce(ReturnOk()); + + mFakeLooper.poll(); + mFakeLooper.poll(); + mFakeLooper.poll(); + mFakeLooper.poll(); +} + +// First sets the listener, then writes CarData. +TEST_F(TelemetryServerTest, WhenListenerIsAlreadyItPushesData) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + + mTelemetryInternal->setListener(mMockCarDataListener); + mTelemetry->write(dataList); + + expectMockListenerToReceive({buildCarDataInternal(101, {1})}).Times(1).WillOnce(ReturnOk()); + + mFakeLooper.poll(); +} + +// First writes CarData, only then sets the listener. +TEST_F(TelemetryServerTest, WhenListenerIsSetLaterItPushesData) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + + mTelemetry->write(dataList); + mTelemetryInternal->setListener(mMockCarDataListener); + + expectMockListenerToReceive({buildCarDataInternal(101, {1})}).Times(1).WillOnce(ReturnOk()); + + mFakeLooper.poll(); +} + +TEST_F(TelemetryServerTest, WriteDuringPushingDataToListener) { + std::vector<CarData> dataList = {buildCarData(101, {1}), buildCarData(102, {1})}; + std::vector<CarData> dataList2 = {buildCarData(103, {1})}; + mTelemetryInternal->setListener(mMockCarDataListener); + mTelemetry->write(dataList); + + expectMockListenerToReceive({buildCarDataInternal(101, {1})}).WillOnce(ReturnOk()); + expectMockListenerToReceive({buildCarDataInternal(102, {1})}).WillOnce(ReturnOk()); + expectMockListenerToReceive({buildCarDataInternal(103, {1})}).WillOnce(ReturnOk()); + + mFakeLooper.poll(); // sends only 1 CarData (or possibly 2 depenending on impl) + mTelemetry->write(dataList2); + mFakeLooper.poll(); // all the polls below send the rest of the CarData + mFakeLooper.poll(); + mFakeLooper.poll(); // extra poll to verify there was not excess push calls +} + +TEST_F(TelemetryServerTest, ClearListenerDuringPushingDataToListener) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + mTelemetryInternal->setListener(mMockCarDataListener); + mTelemetry->write(dataList); + + expectMockListenerToReceive({buildCarDataInternal(101, {1})}).Times(1).WillOnce(ReturnOk()); + + mFakeLooper.poll(); + mTelemetry->write(dataList); + mTelemetryInternal->clearListener(); + mFakeLooper.poll(); +} + +TEST_F(TelemetryServerTest, RetriesPushAgainIfListenerFails) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + mTelemetryInternal->setListener(mMockCarDataListener); + mTelemetry->write(dataList); + + expectMockListenerToReceive({buildCarDataInternal(101, {1})}) + .WillOnce(Return(ByMove(ScopedAStatus::fromExceptionCode(::EX_TRANSACTION_FAILED)))) + .WillOnce(ReturnOk()); + + mFakeLooper.poll(); // listener returns ::EX_TRANSACTION_FAILED + mFakeLooper.poll(); +} + +// Tests a corner case to make sure `TelemetryServer::mPendingCarDataInternals` variable +// is handled properly when transaction fails and clearListener() is called. +TEST_F(TelemetryServerTest, ClearListenerDuringPushingDataAndSetListenerAgain) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + mTelemetryInternal->setListener(mMockCarDataListener); + mTelemetry->write(dataList); + + expectMockListenerToReceive({buildCarDataInternal(101, {1})}) + .WillOnce(Return(ByMove(ScopedAStatus::fromExceptionCode(::EX_TRANSACTION_FAILED)))) + .WillOnce(ReturnOk()); + + mFakeLooper.poll(); // listener returns ::EX_TRANSACTION_FAILED + mTelemetryInternal->clearListener(); + mFakeLooper.poll(); // nothing happens + mTelemetryInternal->setListener(mMockCarDataListener); + mFakeLooper.poll(); // should work +} + +// Directly calls pushCarDataToListeners() to make sure it can handle edge-cases. +TEST_F(TelemetryServerTest, NoListenerButMultiplePushes) { + std::vector<CarData> dataList = {buildCarData(101, {1})}; + mTelemetry->write(dataList); + + mTelemetryServer.pushCarDataToListeners(); + mTelemetryServer.pushCarDataToListeners(); + mTelemetryServer.pushCarDataToListeners(); + + EXPECT_CALL(*mMockCarDataListener, onCarDataReceived(_)).Times(0); +} + +// Directly calls pushCarDataToListeners() to make sure it can handle edge-cases. +TEST_F(TelemetryServerTest, NoDataButMultiplePushes) { + mTelemetryInternal->setListener(mMockCarDataListener); + + mTelemetryServer.pushCarDataToListeners(); + mTelemetryServer.pushCarDataToListeners(); + mTelemetryServer.pushCarDataToListeners(); + + EXPECT_CALL(*mMockCarDataListener, onCarDataReceived(_)).Times(0); +} + +} // namespace telemetry +} // namespace automotive +} // namespace android |