aboutsummaryrefslogtreecommitdiff
path: root/cpp/telemetry
diff options
context:
space:
mode:
authorZhomart Mukhamejanov <zhomart@google.com>2021-04-19 14:22:59 -0700
committerZhomart Mukhamejanov <zhomart@google.com>2021-05-06 20:23:20 +0000
commit4efd36dc596496b3076d808dcd53fd0e847e725f (patch)
treee8176f8444e51e2e50651855ed4332e3d2d4cea5 /cpp/telemetry
parent00ea235ad10a9ee88d7ec06c6b25a37b64950f76 (diff)
downloadCar-4efd36dc596496b3076d808dcd53fd0e847e725f.tar.gz
Add pushing CarData to the ICarDataListener
- Move business logic to TelemetryServer class. - TelemetryServer has a single mutex for all the operations. Operations are cheap, don't block for long time. - cartelemetryd sends CarDataInternal one-by-one, it drops if ICarDataListener cannot receive data. No need to retry if the listener cannot receive data. Test: atest cartelemetryd_impl_test Test: run in emulator and use sampleclient to verify Bug: 174608802 Change-Id: Iccc7275a74b2ed2f221b3e8d28bd329d010b0765 Merged-In: Iccc7275a74b2ed2f221b3e8d28bd329d010b0765 (cherry picked from commit 383708f7cd28836ecee938b36ba780e8b69a1691)
Diffstat (limited to 'cpp/telemetry')
-rw-r--r--cpp/telemetry/Android.bp8
-rw-r--r--cpp/telemetry/README.md15
-rw-r--r--cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl8
-rw-r--r--cpp/telemetry/sampleclient/README.md4
-rw-r--r--cpp/telemetry/src/CarTelemetryImpl.cpp9
-rw-r--r--cpp/telemetry/src/CarTelemetryImpl.h8
-rw-r--r--cpp/telemetry/src/CarTelemetryInternalImpl.cpp55
-rw-r--r--cpp/telemetry/src/CarTelemetryInternalImpl.h12
-rw-r--r--cpp/telemetry/src/LooperWrapper.cpp40
-rw-r--r--cpp/telemetry/src/LooperWrapper.h47
-rw-r--r--cpp/telemetry/src/RingBuffer.cpp18
-rw-r--r--cpp/telemetry/src/RingBuffer.h9
-rw-r--r--cpp/telemetry/src/TelemetryServer.cpp154
-rw-r--r--cpp/telemetry/src/TelemetryServer.h82
-rw-r--r--cpp/telemetry/src/main.cpp72
-rw-r--r--cpp/telemetry/tests/CarTelemetryImplTest.cpp99
-rw-r--r--cpp/telemetry/tests/CarTelemetryInternalImplTest.cpp85
-rw-r--r--cpp/telemetry/tests/FakeLooperWrapper.h104
-rw-r--r--cpp/telemetry/tests/RingBufferTest.cpp58
-rw-r--r--cpp/telemetry/tests/TelemetryServerTest.cpp302
20 files changed, 818 insertions, 371 deletions
diff --git a/cpp/telemetry/Android.bp b/cpp/telemetry/Android.bp
index 4e6c1c01a9..326956e076 100644
--- a/cpp/telemetry/Android.bp
+++ b/cpp/telemetry/Android.bp
@@ -31,6 +31,9 @@ cc_defaults {
"liblog",
"libutils",
],
+ header_libs: [
+ "libgtest_prod_headers", // for FRIEND_TEST
+ ],
product_variables: {
debuggable: {
cflags: [
@@ -48,6 +51,7 @@ cc_library {
srcs: [
"src/CarTelemetryImpl.cpp",
"src/CarTelemetryInternalImpl.cpp",
+ "src/LooperWrapper.cpp",
"src/RingBuffer.cpp",
"src/TelemetryServer.cpp",
],
@@ -64,9 +68,7 @@ cc_test {
],
test_suites: ["general-tests"],
srcs: [
- "tests/CarTelemetryImplTest.cpp",
- "tests/CarTelemetryInternalImplTest.cpp",
- "tests/RingBufferTest.cpp",
+ "tests/TelemetryServerTest.cpp",
],
// Statically link only in tests, for portability reason.
static_libs: [
diff --git a/cpp/telemetry/README.md b/cpp/telemetry/README.md
index a13116dd1c..633ea62bf1 100644
--- a/cpp/telemetry/README.md
+++ b/cpp/telemetry/README.md
@@ -4,15 +4,22 @@ A structured log collection service for CarTelemetryService. See ARCHITECTURE.md
## Useful Commands
-**Dump service information**
+**Dumping the service information**
`adb shell dumpsys android.automotive.telemetry.internal.ICarTelemetryInternal/default`
-**Starting emulator**
+**Enabling VERBOSE logs**
-`aae emulator run -selinux permissive -writable-system`
+```
+adb shell setprop log.tag.android.automotive.telemetryd@1.0 V
+adb shell setprop log.tag.cartelemetryd_impl_test V
+```
-**Running tests**
+**Starting emulator with cold boot**
+
+`emulator -verbose -show-kernel -selinux permissive -writable-system -no-snapshot -wipe-data`
+
+**Running the tests**
`atest cartelemetryd_impl_test:CarTelemetryInternalImplTest#TestSetListenerReturnsOk`
diff --git a/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl b/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl
index 48ab6f9d03..ccdf63aace 100644
--- a/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl
+++ b/cpp/telemetry/aidl/android/automotive/telemetry/internal/ICarDataListener.aidl
@@ -21,15 +21,17 @@ import android.automotive.telemetry.internal.CarDataInternal;
/**
* Listener for {@code ICarTelemetryInternal#registerListener}.
*/
-oneway interface ICarDataListener {
+interface ICarDataListener {
/**
- * Called by ICarTelemetry when the data are available to be consumed. ICarTelemetry removes
- * the delivered data when the callback succeeds.
+ * Called by native cartelemetryd when the data are available to be consumed. The service removes
+ * the delivered data from its buffer when the callback succeeds. If the callback fails, it
+ * immediately gives up and drops the data too and sends the next chunk of the available data.
*
* <p>If the collected data is too large, it will send only chunk of the data, and the callback
* will be fired again.
*
* @param dataList the pushed data.
+ * @throws IllegalStateException if data cannot be received.
*/
void onCarDataReceived(in CarDataInternal[] dataList);
}
diff --git a/cpp/telemetry/sampleclient/README.md b/cpp/telemetry/sampleclient/README.md
index ebbaf16b8f..5b97f00568 100644
--- a/cpp/telemetry/sampleclient/README.md
+++ b/cpp/telemetry/sampleclient/README.md
@@ -14,7 +14,9 @@ adb push $ANDROID_PRODUCT_OUT/vendor/bin/android.automotive.telemetryd-samplecli
adb shell /system/bin/android.automotive.telemetryd-sampleclient
-# Then check logcat and dumpsys to verify the results.
+# Then check logcat and dumpsys to verify the results. The following command enables VERBOSE logs.
+adb shell setprop log.tag.android.automotive.telemetryd@1.0 V
+adb logcat -v color -b all -T 1000
```
**2. Under vendor**
diff --git a/cpp/telemetry/src/CarTelemetryImpl.cpp b/cpp/telemetry/src/CarTelemetryImpl.cpp
index 34a4e60a49..cf6f61145c 100644
--- a/cpp/telemetry/src/CarTelemetryImpl.cpp
+++ b/cpp/telemetry/src/CarTelemetryImpl.cpp
@@ -31,16 +31,11 @@ namespace telemetry {
using ::aidl::android::frameworks::automotive::telemetry::CarData;
-CarTelemetryImpl::CarTelemetryImpl(RingBuffer* buffer) : mRingBuffer(buffer) {}
+CarTelemetryImpl::CarTelemetryImpl(TelemetryServer* server) : mTelemetryServer(server) {}
-// TODO(b/174608802): Add 10kb size check for the `dataList`, see the AIDL for the limits
ndk::ScopedAStatus CarTelemetryImpl::write(const std::vector<CarData>& dataList) {
uid_t publisherUid = ::AIBinder_getCallingUid();
- for (auto&& data : dataList) {
- mRingBuffer->push({.mId = data.id,
- .mContent = std::move(data.content),
- .mPublisherUid = publisherUid});
- }
+ mTelemetryServer->writeCarData(dataList, publisherUid);
return ndk::ScopedAStatus::ok();
}
diff --git a/cpp/telemetry/src/CarTelemetryImpl.h b/cpp/telemetry/src/CarTelemetryImpl.h
index a3bb6c1bcd..e10dd98036 100644
--- a/cpp/telemetry/src/CarTelemetryImpl.h
+++ b/cpp/telemetry/src/CarTelemetryImpl.h
@@ -17,7 +17,7 @@
#ifndef CPP_TELEMETRY_SRC_CARTELEMETRYIMPL_H_
#define CPP_TELEMETRY_SRC_CARTELEMETRYIMPL_H_
-#include "RingBuffer.h"
+#include "TelemetryServer.h"
#include <aidl/android/frameworks/automotive/telemetry/BnCarTelemetry.h>
#include <aidl/android/frameworks/automotive/telemetry/CarData.h>
@@ -33,15 +33,15 @@ namespace telemetry {
// Implementation of android.frameworks.automotive.telemetry.ICarTelemetry.
class CarTelemetryImpl : public aidl::android::frameworks::automotive::telemetry::BnCarTelemetry {
public:
- // Doesn't own `buffer`.
- explicit CarTelemetryImpl(RingBuffer* buffer);
+ // Doesn't own `server`.
+ explicit CarTelemetryImpl(TelemetryServer* server);
ndk::ScopedAStatus write(
const std::vector<aidl::android::frameworks::automotive::telemetry::CarData>& dataList)
override;
private:
- RingBuffer* mRingBuffer; // not owned
+ TelemetryServer* mTelemetryServer; // not owned
};
} // namespace telemetry
diff --git a/cpp/telemetry/src/CarTelemetryInternalImpl.cpp b/cpp/telemetry/src/CarTelemetryInternalImpl.cpp
index 7a3b141cd8..2abf4fcac4 100644
--- a/cpp/telemetry/src/CarTelemetryInternalImpl.cpp
+++ b/cpp/telemetry/src/CarTelemetryInternalImpl.cpp
@@ -32,68 +32,71 @@ using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
using ::aidl::android::automotive::telemetry::internal::ICarDataListener;
using ::android::base::StringPrintf;
-CarTelemetryInternalImpl::CarTelemetryInternalImpl(RingBuffer* buffer) :
- mRingBuffer(buffer),
+CarTelemetryInternalImpl::CarTelemetryInternalImpl(TelemetryServer* server) :
+ mTelemetryServer(server),
mBinderDeathRecipient(
::AIBinder_DeathRecipient_new(CarTelemetryInternalImpl::listenerBinderDied)) {}
ndk::ScopedAStatus CarTelemetryInternalImpl::setListener(
const std::shared_ptr<ICarDataListener>& listener) {
- const std::scoped_lock<std::mutex> lock(mMutex);
-
- if (mCarDataListener != nullptr) {
- return ndk::ScopedAStatus::fromExceptionCodeWithMessage(::EX_ILLEGAL_STATE,
- "ICarDataListener is already set.");
+ LOG(VERBOSE) << "Received a setListener call";
+ auto result = mTelemetryServer->setListener(listener);
+ if (!result.ok()) {
+ LOG(WARNING) << __func__ << ": " << result.error().message();
+ return ndk::ScopedAStatus::fromExceptionCodeWithMessage(result.error().code(),
+ result.error().message().c_str());
}
// If passed a local binder, AIBinder_linkToDeath will do nothing and return
// STATUS_INVALID_OPERATION. We ignore this case because we only use local binders in tests
// where this is not an error.
- if (listener->isRemote()) {
- auto status = ndk::ScopedAStatus::fromStatus(
- ::AIBinder_linkToDeath(listener->asBinder().get(), mBinderDeathRecipient.get(),
- this));
- if (!status.isOk()) {
- return ndk::ScopedAStatus::fromExceptionCodeWithMessage(::EX_ILLEGAL_STATE,
- status.getMessage());
- }
+ if (!listener->isRemote()) {
+ return ndk::ScopedAStatus::ok();
}
- mCarDataListener = listener;
+ auto status = ndk::ScopedAStatus::fromStatus(
+ ::AIBinder_linkToDeath(listener->asBinder().get(), mBinderDeathRecipient.get(), this));
+ if (!status.isOk()) {
+ LOG(WARNING) << __func__ << ": Failed to linkToDeath: " << status.getMessage();
+ mTelemetryServer->clearListener();
+ return ndk::ScopedAStatus::fromExceptionCodeWithMessage(::EX_ILLEGAL_STATE,
+ status.getMessage());
+ }
return ndk::ScopedAStatus::ok();
}
ndk::ScopedAStatus CarTelemetryInternalImpl::clearListener() {
- const std::scoped_lock<std::mutex> lock(mMutex);
- if (mCarDataListener == nullptr) {
- LOG(INFO) << __func__ << ": No ICarDataListener, ignoring the call";
+ auto listener = mTelemetryServer->getListener();
+ mTelemetryServer->clearListener();
+ if (listener == nullptr) {
return ndk::ScopedAStatus::ok();
}
auto status = ndk::ScopedAStatus::fromStatus(
- ::AIBinder_unlinkToDeath(mCarDataListener->asBinder().get(),
- mBinderDeathRecipient.get(), this));
+ ::AIBinder_unlinkToDeath(listener->asBinder().get(), mBinderDeathRecipient.get(),
+ this));
if (!status.isOk()) {
LOG(WARNING) << __func__
<< ": unlinkToDeath failed, continuing anyway: " << status.getMessage();
}
- mCarDataListener = nullptr;
return ndk::ScopedAStatus::ok();
}
binder_status_t CarTelemetryInternalImpl::dump(int fd, const char** args, uint32_t numArgs) {
dprintf(fd, "ICarTelemetryInternal:\n");
- mRingBuffer->dump(fd);
+ mTelemetryServer->dump(fd);
return ::STATUS_OK;
}
// Removes the listener if its binder dies.
void CarTelemetryInternalImpl::listenerBinderDiedImpl() {
- LOG(WARNING) << "A ICarDataListener died, removing the listener.";
- const std::scoped_lock<std::mutex> lock(mMutex);
- mCarDataListener = nullptr;
+ LOG(WARNING) << "A ICarDataListener died, clearing the listener.";
+ mTelemetryServer->clearListener();
}
+// static
void CarTelemetryInternalImpl::listenerBinderDied(void* cookie) {
+ // We expect the pointer to be alive as there is only a single instance of
+ // CarTelemetryInternalImpl and if it dies, the whole process should die too.
auto thiz = static_cast<CarTelemetryInternalImpl*>(cookie);
thiz->listenerBinderDiedImpl();
}
diff --git a/cpp/telemetry/src/CarTelemetryInternalImpl.h b/cpp/telemetry/src/CarTelemetryInternalImpl.h
index 12ad5cd5eb..599d580cb1 100644
--- a/cpp/telemetry/src/CarTelemetryInternalImpl.h
+++ b/cpp/telemetry/src/CarTelemetryInternalImpl.h
@@ -17,7 +17,7 @@
#ifndef CPP_TELEMETRY_SRC_CARTELEMETRYINTERNALIMPL_H_
#define CPP_TELEMETRY_SRC_CARTELEMETRYINTERNALIMPL_H_
-#include "RingBuffer.h"
+#include "TelemetryServer.h"
#include <aidl/android/automotive/telemetry/internal/BnCarTelemetryInternal.h>
#include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
@@ -35,8 +35,8 @@ namespace telemetry {
class CarTelemetryInternalImpl :
public aidl::android::automotive::telemetry::internal::BnCarTelemetryInternal {
public:
- // Doesn't own `buffer`.
- explicit CarTelemetryInternalImpl(RingBuffer* buffer);
+ // Doesn't own `server`.
+ explicit CarTelemetryInternalImpl(TelemetryServer* server);
ndk::ScopedAStatus setListener(
const std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener>&
@@ -53,12 +53,8 @@ private:
void listenerBinderDiedImpl();
- RingBuffer* mRingBuffer; // not owned
+ TelemetryServer* mTelemetryServer; // not owned
ndk::ScopedAIBinder_DeathRecipient mBinderDeathRecipient;
- std::mutex mMutex; // a mutex for the whole instance
-
- std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener>
- mCarDataListener GUARDED_BY(mMutex);
};
} // namespace telemetry
diff --git a/cpp/telemetry/src/LooperWrapper.cpp b/cpp/telemetry/src/LooperWrapper.cpp
new file mode 100644
index 0000000000..b14f967091
--- /dev/null
+++ b/cpp/telemetry/src/LooperWrapper.cpp
@@ -0,0 +1,40 @@
+/**
+ * Copyright (c) 2021, The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LooperWrapper.h"
+
+namespace android {
+namespace automotive {
+namespace telemetry {
+
+using ::android::sp;
+
+int LooperWrapper::pollAll(int timeoutMillis) {
+ return mLooper->pollAll(timeoutMillis);
+}
+
+void LooperWrapper::sendMessageDelayed(nsecs_t uptime, const sp<MessageHandler>& handler,
+ const Message& message) {
+ return mLooper->sendMessageDelayed(uptime, handler, message);
+}
+
+void LooperWrapper::removeMessages(const android::sp<MessageHandler>& handler, int what) {
+ return mLooper->removeMessages(handler, what);
+}
+
+} // namespace telemetry
+} // namespace automotive
+} // namespace android
diff --git a/cpp/telemetry/src/LooperWrapper.h b/cpp/telemetry/src/LooperWrapper.h
new file mode 100644
index 0000000000..171c57fc74
--- /dev/null
+++ b/cpp/telemetry/src/LooperWrapper.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021, The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CPP_TELEMETRY_SRC_LOOPERWRAPPER_H_
+#define CPP_TELEMETRY_SRC_LOOPERWRAPPER_H_
+
+#include <utils/Looper.h>
+
+namespace android {
+namespace automotive {
+namespace telemetry {
+
+// LooperWrapper is a wrapper around the actual looper implementation so tests can stub this wrapper
+// to deterministically poll the underlying looper.
+// Refer to utils/Looper.h for the class and methods descriptions.
+class LooperWrapper {
+public:
+ LooperWrapper(android::sp<Looper> looper) : mLooper(looper){};
+ virtual ~LooperWrapper(){};
+
+ virtual int pollAll(int timeoutMillis);
+ virtual void sendMessageDelayed(nsecs_t uptime, const android::sp<MessageHandler>& handler,
+ const Message& message);
+ virtual void removeMessages(const android::sp<MessageHandler>& handler, int what);
+
+private:
+ android::sp<Looper> mLooper;
+};
+
+} // namespace telemetry
+} // namespace automotive
+} // namespace android
+
+#endif // CPP_TELEMETRY_SRC_LOOPERWRAPPER_H_
diff --git a/cpp/telemetry/src/RingBuffer.cpp b/cpp/telemetry/src/RingBuffer.cpp
index 36de3f8a20..c1387f85cf 100644
--- a/cpp/telemetry/src/RingBuffer.cpp
+++ b/cpp/telemetry/src/RingBuffer.cpp
@@ -29,7 +29,6 @@ namespace telemetry {
RingBuffer::RingBuffer(int32_t limit) : mSizeLimit(limit) {}
void RingBuffer::push(BufferedCarData&& data) {
- const std::scoped_lock<std::mutex> lock(mMutex);
mList.push_back(std::move(data));
while (mList.size() > mSizeLimit) {
mList.pop_front();
@@ -37,23 +36,20 @@ void RingBuffer::push(BufferedCarData&& data) {
}
}
-BufferedCarData RingBuffer::popFront() {
- const std::scoped_lock<std::mutex> lock(mMutex);
- auto result = std::move(mList.front());
- mList.pop_front();
+BufferedCarData RingBuffer::popBack() {
+ auto result = std::move(mList.back());
+ mList.pop_back();
return result;
}
void RingBuffer::dump(int fd) const {
- const std::scoped_lock<std::mutex> lock(mMutex);
- dprintf(fd, "RingBuffer:\n");
- dprintf(fd, " mSizeLimit=%d\n", mSizeLimit);
- dprintf(fd, " mList.size=%zu\n", mList.size());
- dprintf(fd, " mTotalDroppedDataCount=%" PRIu64 "\n", mTotalDroppedDataCount);
+ dprintf(fd, " RingBuffer:\n");
+ dprintf(fd, " mSizeLimit=%d\n", mSizeLimit);
+ dprintf(fd, " mList.size=%zu\n", mList.size());
+ dprintf(fd, " mTotalDroppedDataCount=%" PRIu64 "\n", mTotalDroppedDataCount);
}
int32_t RingBuffer::size() const {
- const std::scoped_lock<std::mutex> lock(mMutex);
return mList.size();
}
diff --git a/cpp/telemetry/src/RingBuffer.h b/cpp/telemetry/src/RingBuffer.h
index 07ce709543..3247528b2a 100644
--- a/cpp/telemetry/src/RingBuffer.h
+++ b/cpp/telemetry/src/RingBuffer.h
@@ -20,14 +20,13 @@
#include "BufferedCarData.h"
#include <list>
-#include <mutex>
namespace android {
namespace automotive {
namespace telemetry {
// A ring buffer that holds BufferedCarData. It drops old data if it's full.
-// Thread-safe.
+// Not thread-safe.
class RingBuffer {
public:
// RingBuffer limits the number of elements in the buffer to the given param `sizeLimit`.
@@ -44,8 +43,8 @@ public:
// Supports moving the data to the RingBuffer.
void push(BufferedCarData&& data);
- // Returns the oldest element from the ring buffer and removes it from the buffer.
- BufferedCarData popFront();
+ // Returns the newest element from the ring buffer and removes it from the buffer.
+ BufferedCarData popBack();
// Dumps the current state for dumpsys.
void dump(int fd) const;
@@ -54,8 +53,6 @@ public:
int32_t size() const;
private:
- mutable std::mutex mMutex; // a mutex for the whole instance
-
const int32_t mSizeLimit;
// TODO(b/174608802): Improve dropped CarData handling, see ag/13818937 for details.
diff --git a/cpp/telemetry/src/TelemetryServer.cpp b/cpp/telemetry/src/TelemetryServer.cpp
index 54cd3c4c9e..0a124e5b05 100644
--- a/cpp/telemetry/src/TelemetryServer.cpp
+++ b/cpp/telemetry/src/TelemetryServer.cpp
@@ -19,68 +19,140 @@
#include "CarTelemetryImpl.h"
#include "RingBuffer.h"
-#include <android-base/chrono_utils.h>
+#include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
#include <android-base/logging.h>
-#include <android-base/properties.h>
-#include <android/binder_interface_utils.h>
-#include <android/binder_manager.h>
-#include <android/binder_process.h>
#include <inttypes.h> // for PRIu64 and friends
#include <memory>
-#include <thread> // NOLINT(build/c++11)
namespace android {
namespace automotive {
namespace telemetry {
-using ::android::automotive::telemetry::RingBuffer;
-
-constexpr const char kCarTelemetryServiceName[] =
- "android.frameworks.automotive.telemetry.ICarTelemetry/default";
-constexpr const char kCarTelemetryInternalServiceName[] =
- "android.automotive.telemetry.internal.ICarTelemetryInternal/default";
+namespace {
+
+using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
+using ::aidl::android::automotive::telemetry::internal::ICarDataListener;
+using ::aidl::android::frameworks::automotive::telemetry::CarData;
+using ::android::base::Error;
+using ::android::base::Result;
+
+enum {
+ MSG_PUSH_CAR_DATA_TO_LISTENER = 1,
+};
+
+// If ICarDataListener cannot accept data, the next push should be delayed little bit to allow
+// the listener to recover.
+constexpr const std::chrono::seconds kPushCarDataFailureDelaySeconds = 1s;
+} // namespace
+
+TelemetryServer::TelemetryServer(LooperWrapper* looper,
+ const std::chrono::nanoseconds& pushCarDataDelayNs,
+ const int maxBufferSize) :
+ mLooper(looper),
+ mPushCarDataDelayNs(pushCarDataDelayNs),
+ mRingBuffer(maxBufferSize),
+ mMessageHandler(new MessageHandlerImpl(this)) {}
+
+Result<void> TelemetryServer::setListener(const std::shared_ptr<ICarDataListener>& listener) {
+ const std::scoped_lock<std::mutex> lock(mMutex);
+ if (mCarDataListener != nullptr) {
+ return Error(::EX_ILLEGAL_STATE) << "ICarDataListener is already set";
+ }
+ mCarDataListener = listener;
+ mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
+ MSG_PUSH_CAR_DATA_TO_LISTENER);
+ return {};
+}
-// TODO(b/183444070): make it configurable using sysprop
-// CarData count limit in the RingBuffer. In worst case it will use kMaxBufferSize * 10Kb memory,
-// which is ~ 1MB.
-const int kMaxBufferSize = 100;
+void TelemetryServer::clearListener() {
+ const std::scoped_lock<std::mutex> lock(mMutex);
+ if (mCarDataListener == nullptr) {
+ return;
+ }
+ mCarDataListener = nullptr;
+ mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER);
+}
-TelemetryServer::TelemetryServer() : mRingBuffer(kMaxBufferSize) {}
+std::shared_ptr<ICarDataListener> TelemetryServer::getListener() {
+ const std::scoped_lock<std::mutex> lock(mMutex);
+ return mCarDataListener;
+}
-void TelemetryServer::registerServices() {
- std::shared_ptr<CarTelemetryImpl> telemetry =
- ndk::SharedRefBase::make<CarTelemetryImpl>(&mRingBuffer);
- std::shared_ptr<CarTelemetryInternalImpl> telemetryInternal =
- ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&mRingBuffer);
+void TelemetryServer::dump(int fd) {
+ const std::scoped_lock<std::mutex> lock(mMutex);
+ dprintf(fd, " TelemetryServer:\n");
+ mRingBuffer.dump(fd);
+}
- // Wait for the service manager before starting ICarTelemetry service.
- while (android::base::GetProperty("init.svc.servicemanager", "") != "running") {
- // Poll frequent enough so the writer clients can connect to the service during boot.
- std::this_thread::sleep_for(250ms);
+// TODO(b/174608802): Add 10kb size check for the `dataList`, see the AIDL for the limits
+void TelemetryServer::writeCarData(const std::vector<CarData>& dataList, uid_t publisherUid) {
+ const std::scoped_lock<std::mutex> lock(mMutex);
+ bool bufferWasEmptyBefore = mRingBuffer.size() == 0;
+ for (auto&& data : dataList) {
+ mRingBuffer.push({.mId = data.id,
+ .mContent = std::move(data.content),
+ .mPublisherUid = publisherUid});
+ }
+ // If the mRingBuffer was not empty, the message is already scheduled. It prevents scheduling
+ // too many unnecessary idendical messages in the looper.
+ if (mCarDataListener != nullptr && bufferWasEmptyBefore && mRingBuffer.size() > 0) {
+ mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
+ MSG_PUSH_CAR_DATA_TO_LISTENER);
}
+}
- LOG(VERBOSE) << "Registering " << kCarTelemetryServiceName;
- binder_exception_t exception =
- ::AServiceManager_addService(telemetry->asBinder().get(), kCarTelemetryServiceName);
- if (exception != ::EX_NONE) {
- LOG(FATAL) << "Unable to register " << kCarTelemetryServiceName
- << ", exception=" << exception;
+// Runs on the main thread.
+void TelemetryServer::pushCarDataToListeners() {
+ std::shared_ptr<ICarDataListener> listener;
+ std::vector<CarDataInternal> pendingCarDataInternals;
+ {
+ const std::scoped_lock<std::mutex> lock(mMutex);
+ // Remove extra messages.
+ mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER);
+ if (mCarDataListener == nullptr || mRingBuffer.size() == 0) {
+ return;
+ }
+ listener = mCarDataListener;
+ // Push elements to pendingCarDataInternals in reverse order so we can send data
+ // from the back of the pendingCarDataInternals vector.
+ while (mRingBuffer.size() > 0) {
+ auto carData = std::move(mRingBuffer.popBack());
+ CarDataInternal data;
+ data.id = carData.mId;
+ data.content = std::move(carData.mContent);
+ pendingCarDataInternals.push_back(data);
+ }
}
- LOG(VERBOSE) << "Registering " << kCarTelemetryInternalServiceName;
- exception = ::AServiceManager_addService(telemetryInternal->asBinder().get(),
- kCarTelemetryInternalServiceName);
- if (exception != ::EX_NONE) {
- LOG(FATAL) << "Unable to register " << kCarTelemetryInternalServiceName
- << ", exception=" << exception;
+ // Now the mutex is unlocked, we can do the heavy work.
+
+ // TODO(b/186477983): send data in batch to improve performance, but careful sending too
+ // many data at once, as it could clog the Binder - it has <1MB limit.
+ while (!pendingCarDataInternals.empty()) {
+ auto status = listener->onCarDataReceived({pendingCarDataInternals.back()});
+ if (!status.isOk()) {
+ LOG(WARNING) << "Failed to push CarDataInternal, will try again: "
+ << status.getMessage();
+ sleep(kPushCarDataFailureDelaySeconds.count());
+ } else {
+ pendingCarDataInternals.pop_back();
+ }
}
}
-void TelemetryServer::startAndJoinThreadPool() {
- ::ABinderProcess_startThreadPool(); // Starts the default 15 binder threads.
- ::ABinderProcess_joinThreadPool();
+TelemetryServer::MessageHandlerImpl::MessageHandlerImpl(TelemetryServer* server) :
+ mTelemetryServer(server) {}
+
+void TelemetryServer::MessageHandlerImpl::handleMessage(const Message& message) {
+ switch (message.what) {
+ case MSG_PUSH_CAR_DATA_TO_LISTENER:
+ mTelemetryServer->pushCarDataToListeners();
+ break;
+ default:
+ LOG(WARNING) << "Unknown message: " << message.what;
+ }
}
} // namespace telemetry
diff --git a/cpp/telemetry/src/TelemetryServer.h b/cpp/telemetry/src/TelemetryServer.h
index 0b400a11df..20c38396db 100644
--- a/cpp/telemetry/src/TelemetryServer.h
+++ b/cpp/telemetry/src/TelemetryServer.h
@@ -17,28 +17,90 @@
#ifndef CPP_TELEMETRY_SRC_TELEMETRYSERVER_H_
#define CPP_TELEMETRY_SRC_TELEMETRYSERVER_H_
-#include "CarTelemetryImpl.h"
-#include "CarTelemetryInternalImpl.h"
+#include "LooperWrapper.h"
+#include "RingBuffer.h"
-#include <utils/Errors.h>
+#include <aidl/android/automotive/telemetry/internal/ICarDataListener.h>
+#include <aidl/android/frameworks/automotive/telemetry/CarData.h>
+#include <android-base/chrono_utils.h>
+#include <android-base/result.h>
+#include <android-base/thread_annotations.h>
+#include <gtest/gtest_prod.h>
+#include <utils/Looper.h>
+
+#include <memory>
namespace android {
namespace automotive {
namespace telemetry {
+// This class contains the main logic of cartelemetryd native service.
+//
+// [writer clients] -> ICarTelemetry -----------.
+// [reader client] --> ICarTelemetryInternal -----`-> TelemetryServer
+//
+// TelemetryServer starts pushing CarData to ICarDataListener when there is a data available and
+// the listener is set and alive. It uses `mLooper` for periodically pushing the data.
+//
+// This class is thread-safe.
class TelemetryServer {
public:
- TelemetryServer();
+ explicit TelemetryServer(LooperWrapper* looper,
+ const std::chrono::nanoseconds& pushCarDataDelayNs, int maxBufferSize);
+
+ // Dumps the current state for dumpsys.
+ // Expected to be called from a binder thread pool.
+ void dump(int fd);
+
+ // Writes incoming CarData to the RingBuffer.
+ // Expected to be called from a binder thread pool.
+ void writeCarData(
+ const std::vector<aidl::android::frameworks::automotive::telemetry::CarData>& dataList,
+ uid_t publisherUid);
- // Registers all the implemented AIDL services. Waits until `servicemanager` is available.
- // Aborts the process if fails.
- void registerServices();
+ // Sets the listener. If the listener already set, it returns an error.
+ // Expected to be called from a binder thread pool.
+ android::base::Result<void> setListener(
+ const std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener>&
+ listener);
- // Blocks the thread.
- void startAndJoinThreadPool();
+ // Clears the listener and returns it.
+ // Expected to be called from a binder thread pool.
+ void clearListener();
+
+ // Expected to be called from a binder thread pool.
+ std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener> getListener();
private:
- RingBuffer mRingBuffer;
+ class MessageHandlerImpl : public MessageHandler {
+ public:
+ explicit MessageHandlerImpl(TelemetryServer* server);
+
+ void handleMessage(const Message& message) override;
+
+ private:
+ TelemetryServer* mTelemetryServer; // not owned
+ };
+
+private:
+ // Periodically called by mLooper if there is a "push car data" messages.
+ void pushCarDataToListeners();
+
+ LooperWrapper* mLooper; // not owned
+ const std::chrono::nanoseconds mPushCarDataDelayNs;
+
+ // A single mutex for all the sensitive operations. Threads must not lock it for long time,
+ // as clients will be writing CarData to the ring buffer under this mutex.
+ std::mutex mMutex;
+ RingBuffer mRingBuffer GUARDED_BY(mMutex);
+ std::shared_ptr<aidl::android::automotive::telemetry::internal::ICarDataListener>
+ mCarDataListener GUARDED_BY(mMutex);
+ android::sp<MessageHandlerImpl> mMessageHandler; // Handler for mLooper.
+
+ // Friends are simplest way of testing if `pushCarDataToListeners()` can handle edge cases.
+ friend class TelemetryServerTest;
+ FRIEND_TEST(TelemetryServerTest, NoListenerButMultiplePushes);
+ FRIEND_TEST(TelemetryServerTest, NoDataButMultiplePushes);
};
} // namespace telemetry
diff --git a/cpp/telemetry/src/main.cpp b/cpp/telemetry/src/main.cpp
index 1bd0dde710..615e0e105a 100644
--- a/cpp/telemetry/src/main.cpp
+++ b/cpp/telemetry/src/main.cpp
@@ -14,23 +14,85 @@
* limitations under the License.
*/
+#include "CarTelemetryImpl.h"
+#include "CarTelemetryInternalImpl.h"
+#include "LooperWrapper.h"
#include "TelemetryServer.h"
+#include <android-base/chrono_utils.h>
#include <android-base/logging.h>
+#include <android-base/properties.h>
+#include <android/binder_interface_utils.h>
+#include <android/binder_manager.h>
+#include <android/binder_process.h>
+#include <utils/Looper.h>
+#include <thread> // NOLINT(build/c++11)
+
+using ::android::automotive::telemetry::CarTelemetryImpl;
+using ::android::automotive::telemetry::CarTelemetryInternalImpl;
+using ::android::automotive::telemetry::LooperWrapper;
using ::android::automotive::telemetry::TelemetryServer;
// TODO(b/174608802): handle SIGQUIT/SIGTERM
+constexpr const char kCarTelemetryServiceName[] =
+ "android.frameworks.automotive.telemetry.ICarTelemetry/default";
+constexpr const char kCarTelemetryInternalServiceName[] =
+ "android.automotive.telemetry.internal.ICarTelemetryInternal/default";
+
+// The min delay between each ICarDataListener.onCarDataReceived() calls. It's needed to avoid
+// too frequently making binder transactions.
+// Binder has <1MS latency for 10KB data.
+// TODO(b/186477983): improve sending car data after implementing CarTelemetryService.
+// TODO(b/183444070): make it configurable using sysprop
+constexpr const std::chrono::nanoseconds kPushCarDataDelayNs = 10ms;
+
+// TODO(b/183444070): make it configurable using sysprop
+// CarData count limit in the RingBuffer. In worst case it will use kMaxBufferSize * 10Kb memory,
+// which is ~ 1MB.
+const int kMaxBufferSize = 100;
+
int main(void) {
LOG(INFO) << "Starting cartelemetryd";
- TelemetryServer server;
+ LooperWrapper looper(android::Looper::prepare(/* opts= */ 0));
+ TelemetryServer server(&looper, kPushCarDataDelayNs, kMaxBufferSize);
+ std::shared_ptr<CarTelemetryImpl> telemetry =
+ ndk::SharedRefBase::make<CarTelemetryImpl>(&server);
+ std::shared_ptr<CarTelemetryInternalImpl> telemetryInternal =
+ ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&server);
+
+ // Wait for the service manager before starting ICarTelemetry service.
+ while (android::base::GetProperty("init.svc.servicemanager", "") != "running") {
+ // Poll frequent enough so the writer clients can connect to the service during boot.
+ std::this_thread::sleep_for(250ms);
+ }
+
+ LOG(VERBOSE) << "Registering " << kCarTelemetryServiceName;
+ binder_exception_t exception =
+ ::AServiceManager_addService(telemetry->asBinder().get(), kCarTelemetryServiceName);
+ if (exception != ::EX_NONE) {
+ LOG(FATAL) << "Unable to register " << kCarTelemetryServiceName
+ << ", exception=" << exception;
+ }
+
+ LOG(VERBOSE) << "Registering " << kCarTelemetryInternalServiceName;
+ exception = ::AServiceManager_addService(telemetryInternal->asBinder().get(),
+ kCarTelemetryInternalServiceName);
+ if (exception != ::EX_NONE) {
+ LOG(FATAL) << "Unable to register " << kCarTelemetryInternalServiceName
+ << ", exception=" << exception;
+ }
- // Register AIDL services. Aborts the server if fails.
- server.registerServices();
+ LOG(VERBOSE) << "Services are registered, starting thread pool";
+ ::ABinderProcess_startThreadPool(); // Starts the default 15 binder threads.
- LOG(VERBOSE) << "Service is created, joining the threadpool";
- server.startAndJoinThreadPool();
+ LOG(VERBOSE) << "Running the server.";
+ // Loop forever -- pushing data to ICarDataListener runs on this thread, and the binder calls
+ // remain responsive in their pool of one thread.
+ while (true) {
+ looper.pollAll(/* timeoutMillis= */ -1);
+ }
return 1; // never reaches
}
diff --git a/cpp/telemetry/tests/CarTelemetryImplTest.cpp b/cpp/telemetry/tests/CarTelemetryImplTest.cpp
deleted file mode 100644
index 028647754c..0000000000
--- a/cpp/telemetry/tests/CarTelemetryImplTest.cpp
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2021 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "CarTelemetryImpl.h"
-#include "RingBuffer.h"
-
-#include <aidl/android/frameworks/automotive/telemetry/CarData.h>
-#include <aidl/android/frameworks/automotive/telemetry/ICarTelemetry.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <unistd.h>
-
-#include <memory>
-
-namespace android {
-namespace automotive {
-namespace telemetry {
-
-using ::aidl::android::frameworks::automotive::telemetry::CarData;
-using ::aidl::android::frameworks::automotive::telemetry::ICarTelemetry;
-using ::testing::ContainerEq;
-
-const size_t kMaxBufferSize = 5;
-
-CarData buildCarData(int id, const std::vector<uint8_t>& content) {
- CarData msg;
- msg.id = id;
- msg.content = content;
- return msg;
-}
-
-BufferedCarData buildBufferedCarData(const CarData& data, uid_t publisherUid) {
- return {.mId = data.id, .mContent = std::move(data.content), .mPublisherUid = publisherUid};
-}
-
-class CarTelemetryImplTest : public ::testing::Test {
-protected:
- CarTelemetryImplTest() :
- mBuffer(RingBuffer(kMaxBufferSize)),
- mTelemetry(ndk::SharedRefBase::make<CarTelemetryImpl>(&mBuffer)) {}
-
- RingBuffer mBuffer;
- std::shared_ptr<ICarTelemetry> mTelemetry;
-};
-
-TEST_F(CarTelemetryImplTest, WriteReturnsOkStatus) {
- CarData msg = buildCarData(101, {1, 0, 1, 0});
-
- auto status = mTelemetry->write({msg});
-
- EXPECT_TRUE(status.isOk()) << status.getMessage();
-}
-
-TEST_F(CarTelemetryImplTest, WriteAddsCarDataToRingBuffer) {
- CarData msg = buildCarData(101, {1, 0, 1, 0});
-
- mTelemetry->write({msg});
-
- EXPECT_EQ(mBuffer.popFront(), buildBufferedCarData(msg, getuid()));
-}
-
-TEST_F(CarTelemetryImplTest, WriteBuffersOnlyLimitedAmount) {
- RingBuffer buffer(/* sizeLimit= */ 3);
- auto telemetry = ndk::SharedRefBase::make<CarTelemetryImpl>(&buffer);
-
- CarData msg101_2 = buildCarData(101, {1, 0});
- CarData msg101_4 = buildCarData(101, {1, 0, 1, 0});
- CarData msg201_3 = buildCarData(201, {3, 3, 3});
-
- // Inserting 5 elements
- telemetry->write({msg101_2, msg101_4, msg101_4, msg201_3});
- telemetry->write({msg201_3});
-
- EXPECT_EQ(buffer.size(), 3);
- std::vector<BufferedCarData> result = {buffer.popFront(), buffer.popFront(), buffer.popFront()};
- std::vector<BufferedCarData> expected = {buildBufferedCarData(msg101_4, getuid()),
- buildBufferedCarData(msg201_3, getuid()),
- buildBufferedCarData(msg201_3, getuid())};
- EXPECT_THAT(result, ContainerEq(expected));
- EXPECT_EQ(buffer.size(), 0);
-}
-
-} // namespace telemetry
-} // namespace automotive
-} // namespace android
diff --git a/cpp/telemetry/tests/CarTelemetryInternalImplTest.cpp b/cpp/telemetry/tests/CarTelemetryInternalImplTest.cpp
deleted file mode 100644
index 22838cd2a5..0000000000
--- a/cpp/telemetry/tests/CarTelemetryInternalImplTest.cpp
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2021 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "CarTelemetryInternalImpl.h"
-#include "RingBuffer.h"
-
-#include <aidl/android/automotive/telemetry/internal/BnCarDataListener.h>
-#include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
-#include <aidl/android/automotive/telemetry/internal/ICarTelemetryInternal.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <unistd.h>
-
-#include <memory>
-
-namespace android {
-namespace automotive {
-namespace telemetry {
-
-using ::aidl::android::automotive::telemetry::internal::BnCarDataListener;
-using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
-using ::aidl::android::automotive::telemetry::internal::ICarTelemetryInternal;
-using ::ndk::ScopedAStatus;
-
-const size_t kMaxBufferSize = 5;
-
-class MockCarDataListener : public BnCarDataListener {
-public:
- MOCK_METHOD(ScopedAStatus, onCarDataReceived, (const std::vector<CarDataInternal>& dataList),
- (override));
-};
-
-// The main test class.
-class CarTelemetryInternalImplTest : public ::testing::Test {
-protected:
- CarTelemetryInternalImplTest() :
- mBuffer(RingBuffer(kMaxBufferSize)),
- mTelemetryInternal(ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&mBuffer)),
- mMockCarDataListener(ndk::SharedRefBase::make<MockCarDataListener>()) {}
-
- RingBuffer mBuffer;
- std::shared_ptr<ICarTelemetryInternal> mTelemetryInternal;
- std::shared_ptr<MockCarDataListener> mMockCarDataListener;
-};
-
-TEST_F(CarTelemetryInternalImplTest, SetListenerReturnsOk) {
- auto status = mTelemetryInternal->setListener(mMockCarDataListener);
-
- EXPECT_TRUE(status.isOk()) << status.getMessage();
-}
-
-TEST_F(CarTelemetryInternalImplTest, SetListenerFailsWhenAlreadySubscribed) {
- mTelemetryInternal->setListener(mMockCarDataListener);
-
- auto status = mTelemetryInternal->setListener(ndk::SharedRefBase::make<MockCarDataListener>());
-
- EXPECT_EQ(status.getExceptionCode(), ::EX_ILLEGAL_STATE) << status.getMessage();
-}
-
-TEST_F(CarTelemetryInternalImplTest, ClearListenerWorks) {
- mTelemetryInternal->setListener(mMockCarDataListener);
-
- mTelemetryInternal->clearListener();
- auto status = mTelemetryInternal->setListener(mMockCarDataListener);
-
- EXPECT_TRUE(status.isOk()) << status.getMessage();
-}
-
-} // namespace telemetry
-} // namespace automotive
-} // namespace android
diff --git a/cpp/telemetry/tests/FakeLooperWrapper.h b/cpp/telemetry/tests/FakeLooperWrapper.h
new file mode 100644
index 0000000000..1914ee5624
--- /dev/null
+++ b/cpp/telemetry/tests/FakeLooperWrapper.h
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021, The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CPP_TELEMETRY_SRC_TESTS_FAKELOOPERWRAPPER_H_
+#define CPP_TELEMETRY_SRC_TESTS_FAKELOOPERWRAPPER_H_
+
+#include "LooperWrapper.h"
+
+#include <algorithm>
+#include <deque>
+#include <map>
+
+namespace android {
+namespace automotive {
+namespace telemetry {
+
+// Fake `utils/Looper.h` implementation. Explicitly use `FakeLooperWrapper::poll()` method
+// to process the messages.
+//
+// Not thread-safe.
+class FakeLooperWrapper : public LooperWrapper {
+public:
+ static inline constexpr int kNoScheduledMessage = -1;
+
+ FakeLooperWrapper() : LooperWrapper(nullptr){};
+
+ int pollAll(int timeoutMillis) override { return 0; }
+
+ void sendMessageDelayed(nsecs_t uptime, const android::sp<MessageHandler>& handler,
+ const Message& message) override {
+ mUptimeEntries[::systemTime() + uptime].push_back(
+ {.mHandler = handler, .mMessage = message});
+ }
+
+ void removeMessages(const android::sp<MessageHandler>& handler, int what) override {
+ for (auto it = mUptimeEntries.begin(); it != mUptimeEntries.end();) {
+ auto [entryUptime, entries] = *it;
+ for (auto eit = entries.begin(); eit != entries.end();) {
+ if (eit->mMessage.what == what) {
+ eit = entries.erase(eit);
+ } else {
+ ++eit;
+ }
+ }
+ if (entries.empty()) {
+ it = mUptimeEntries.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ // Processes the next message.
+ void poll() {
+ auto it = mUptimeEntries.begin();
+ if (it == mUptimeEntries.end() || it->second.empty()) {
+ return;
+ }
+ auto entry = std::move(it->second.front());
+ it->second.pop_front();
+ if (it->second.empty()) {
+ // if entries is empty, erase the uptime from the map.
+ mUptimeEntries.erase(it);
+ }
+ entry.mHandler->handleMessage(entry.mMessage);
+ }
+
+ // Returns the next scheduled message uptime. kNoScheduledMessage if there is no message.
+ nsecs_t getNextMessageUptime() {
+ auto it = mUptimeEntries.begin();
+ return it == mUptimeEntries.end() ? kNoScheduledMessage : it->first;
+ }
+
+private:
+ struct LooperEntry {
+ sp<MessageHandler> mHandler;
+ Message mMessage;
+ };
+
+private:
+ using Entries = std::deque<LooperEntry>;
+
+ // <uptimeNanos, entries> - where uptimeNanos is time since boot.
+ std::map<nsecs_t, Entries> mUptimeEntries;
+};
+
+} // namespace telemetry
+} // namespace automotive
+} // namespace android
+
+#endif // CPP_TELEMETRY_SRC_TESTS_FAKELOOPERWRAPPER_H_
diff --git a/cpp/telemetry/tests/RingBufferTest.cpp b/cpp/telemetry/tests/RingBufferTest.cpp
deleted file mode 100644
index c5b5bc7aa4..0000000000
--- a/cpp/telemetry/tests/RingBufferTest.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2021 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RingBuffer.h"
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <memory>
-
-// NOTE: many of RingBuffer's behaviors are tested as part of CarTelemetryImpl.
-
-namespace android {
-namespace automotive {
-namespace telemetry {
-
-using testing::ContainerEq;
-
-BufferedCarData buildBufferedCarData(int32_t id, const std::vector<uint8_t>& content) {
- return {.mId = id, .mContent = content, .mPublisherUid = 0};
-}
-
-TEST(RingBufferTest, PopFrontReturnsCorrectResults) {
- RingBuffer buffer(/* sizeLimit= */ 10);
- buffer.push(buildBufferedCarData(101, {7}));
- buffer.push(buildBufferedCarData(102, {7}));
-
- BufferedCarData result = buffer.popFront();
-
- EXPECT_EQ(result, buildBufferedCarData(101, {7}));
-}
-
-TEST(RingBufferTest, PopFrontRemovesFromBuffer) {
- RingBuffer buffer(/* sizeLimit= */ 10);
- buffer.push(buildBufferedCarData(101, {7}));
- buffer.push(buildBufferedCarData(102, {7, 8}));
-
- buffer.popFront();
-
- EXPECT_EQ(buffer.size(), 1); // only ID=102 left
-}
-
-} // namespace telemetry
-} // namespace automotive
-} // namespace android
diff --git a/cpp/telemetry/tests/TelemetryServerTest.cpp b/cpp/telemetry/tests/TelemetryServerTest.cpp
new file mode 100644
index 0000000000..8a6fab5b13
--- /dev/null
+++ b/cpp/telemetry/tests/TelemetryServerTest.cpp
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CarTelemetryImpl.h"
+#include "CarTelemetryInternalImpl.h"
+#include "FakeLooperWrapper.h"
+#include "LooperWrapper.h"
+#include "RingBuffer.h"
+#include "TelemetryServer.h"
+
+#include <aidl/android/automotive/telemetry/internal/BnCarDataListener.h>
+#include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
+#include <aidl/android/automotive/telemetry/internal/ICarTelemetryInternal.h>
+#include <aidl/android/frameworks/automotive/telemetry/CarData.h>
+#include <aidl/android/frameworks/automotive/telemetry/ICarTelemetry.h>
+#include <android-base/chrono_utils.h>
+#include <android-base/logging.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <utils/Timers.h> // for ::systemTime()
+
+#include <unistd.h>
+
+#include <memory>
+
+namespace android {
+namespace automotive {
+namespace telemetry {
+
+using ::aidl::android::automotive::telemetry::internal::BnCarDataListener;
+using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
+using ::aidl::android::automotive::telemetry::internal::ICarTelemetryInternal;
+using ::aidl::android::frameworks::automotive::telemetry::CarData;
+using ::aidl::android::frameworks::automotive::telemetry::ICarTelemetry;
+using ::ndk::ScopedAStatus;
+using ::testing::_;
+using ::testing::ByMove;
+using ::testing::Return;
+
+constexpr const std::chrono::nanoseconds kPushCarDataDelayNs = 1000ms;
+constexpr const std::chrono::nanoseconds kAllowedErrorNs = 100ms;
+const int kMaxBufferSize = 3;
+
+// Because `ScopedAStatus` is move-only, `EXPECT_CALL().WillRepeatedly()` will not work.
+inline testing::internal::ReturnAction<testing::internal::ByMoveWrapper<ScopedAStatus>> ReturnOk() {
+ return testing::Return(ByMove(ScopedAStatus::ok()));
+}
+
+// Builds incoming CarData from writer clients.
+CarData buildCarData(int id, const std::vector<uint8_t>& content) {
+ CarData msg;
+ msg.id = id;
+ msg.content = content;
+ return msg;
+}
+
+// Builds outgoing CarDataInternal to the CarTelemetryService.
+CarDataInternal buildCarDataInternal(int id, const std::vector<uint8_t>& content) {
+ CarDataInternal msg;
+ msg.id = id;
+ msg.content = content;
+ return msg;
+}
+
+// Mock listener, behaves as CarTelemetryService.
+class MockCarDataListener : public BnCarDataListener {
+public:
+ MOCK_METHOD(ScopedAStatus, onCarDataReceived, (const std::vector<CarDataInternal>& dataList),
+ (override));
+};
+
+// The main test class. Tests using `ICarTelemetry` and `ICarTelemetryInternal` interfaces.
+// Pushing data to the listener is done in the looper - always call `mFakeLooper.poll()`.
+class TelemetryServerTest : public ::testing::Test {
+protected:
+ TelemetryServerTest() :
+ mTelemetryServer(&mFakeLooper, kPushCarDataDelayNs, kMaxBufferSize),
+ mMockCarDataListener(ndk::SharedRefBase::make<MockCarDataListener>()),
+ mTelemetry(ndk::SharedRefBase::make<CarTelemetryImpl>(&mTelemetryServer)),
+ mTelemetryInternal(
+ ndk::SharedRefBase::make<CarTelemetryInternalImpl>(&mTelemetryServer)) {}
+
+ // Creates an expectation. This is a nice helper that accepts a std::vector, original
+ // EXPECT_CALL() requires creating std::vector variable.
+ testing::internal::TypedExpectation<ScopedAStatus(const std::vector<CarDataInternal>&)>&
+ expectMockListenerToReceive(const std::vector<CarDataInternal>& expected) {
+ return EXPECT_CALL(*mMockCarDataListener, onCarDataReceived(expected));
+ }
+
+ FakeLooperWrapper mFakeLooper;
+ TelemetryServer mTelemetryServer;
+ std::shared_ptr<MockCarDataListener> mMockCarDataListener;
+ std::shared_ptr<ICarTelemetry> mTelemetry;
+ std::shared_ptr<ICarTelemetryInternal> mTelemetryInternal;
+};
+
+TEST_F(TelemetryServerTest, WriteReturnsOk) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+
+ auto status = mTelemetry->write(dataList);
+
+ EXPECT_TRUE(status.isOk()) << status.getMessage();
+}
+
+TEST_F(TelemetryServerTest, SetListenerReturnsOk) {
+ auto status = mTelemetryInternal->setListener(mMockCarDataListener);
+
+ EXPECT_TRUE(status.isOk()) << status.getMessage();
+}
+
+TEST_F(TelemetryServerTest, SetListenerFailsWhenAlreadySubscribed) {
+ mTelemetryInternal->setListener(mMockCarDataListener);
+
+ auto status = mTelemetryInternal->setListener(ndk::SharedRefBase::make<MockCarDataListener>());
+
+ EXPECT_EQ(status.getExceptionCode(), ::EX_ILLEGAL_STATE) << status.getMessage();
+}
+
+TEST_F(TelemetryServerTest, ClearListenerWorks) {
+ mTelemetryInternal->setListener(mMockCarDataListener);
+
+ mTelemetryInternal->clearListener();
+
+ auto status = mTelemetryInternal->setListener(mMockCarDataListener);
+ EXPECT_TRUE(status.isOk()) << status.getMessage();
+}
+
+TEST_F(TelemetryServerTest, ClearListenerRemovesPushMessagesFromLooper) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+ mTelemetry->write(dataList);
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ EXPECT_NE(mFakeLooper.getNextMessageUptime(), FakeLooperWrapper::kNoScheduledMessage);
+
+ mTelemetryInternal->clearListener();
+
+ EXPECT_EQ(mFakeLooper.getNextMessageUptime(), FakeLooperWrapper::kNoScheduledMessage);
+}
+
+TEST_F(TelemetryServerTest, WriteSchedulesNextMessageAfterRightDelay) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+ mTelemetryInternal->setListener(mMockCarDataListener);
+
+ mTelemetry->write(dataList);
+
+ EXPECT_NEAR(mFakeLooper.getNextMessageUptime(), ::systemTime() + kPushCarDataDelayNs.count(),
+ kAllowedErrorNs.count());
+}
+
+TEST_F(TelemetryServerTest, SetListenerSchedulesNextMessageAfterRightDelay) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+ mTelemetry->write(dataList);
+
+ mTelemetryInternal->setListener(mMockCarDataListener);
+
+ EXPECT_NEAR(mFakeLooper.getNextMessageUptime(), ::systemTime() + kPushCarDataDelayNs.count(),
+ kAllowedErrorNs.count());
+}
+
+TEST_F(TelemetryServerTest, BuffersOnlyLimitedData) {
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ std::vector<CarData> dataList1 = {buildCarData(10, {1, 2}), buildCarData(11, {2, 3})};
+ std::vector<CarData> dataList2 = {buildCarData(101, {1, 2}), buildCarData(102, {2, 3}),
+ buildCarData(103, {3, 4}), buildCarData(104, {4, 5})};
+
+ mTelemetry->write(dataList1);
+ mTelemetry->write(dataList2);
+
+ // Only the last 3 CarData should be received, because kMaxBufferSize = 3.
+ expectMockListenerToReceive({buildCarDataInternal(102, {2, 3})}).WillOnce(ReturnOk());
+ expectMockListenerToReceive({buildCarDataInternal(103, {3, 4})}).WillOnce(ReturnOk());
+ expectMockListenerToReceive({buildCarDataInternal(104, {4, 5})}).WillOnce(ReturnOk());
+
+ mFakeLooper.poll();
+ mFakeLooper.poll();
+ mFakeLooper.poll();
+ mFakeLooper.poll();
+}
+
+// First sets the listener, then writes CarData.
+TEST_F(TelemetryServerTest, WhenListenerIsAlreadyItPushesData) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ mTelemetry->write(dataList);
+
+ expectMockListenerToReceive({buildCarDataInternal(101, {1})}).Times(1).WillOnce(ReturnOk());
+
+ mFakeLooper.poll();
+}
+
+// First writes CarData, only then sets the listener.
+TEST_F(TelemetryServerTest, WhenListenerIsSetLaterItPushesData) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+
+ mTelemetry->write(dataList);
+ mTelemetryInternal->setListener(mMockCarDataListener);
+
+ expectMockListenerToReceive({buildCarDataInternal(101, {1})}).Times(1).WillOnce(ReturnOk());
+
+ mFakeLooper.poll();
+}
+
+TEST_F(TelemetryServerTest, WriteDuringPushingDataToListener) {
+ std::vector<CarData> dataList = {buildCarData(101, {1}), buildCarData(102, {1})};
+ std::vector<CarData> dataList2 = {buildCarData(103, {1})};
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ mTelemetry->write(dataList);
+
+ expectMockListenerToReceive({buildCarDataInternal(101, {1})}).WillOnce(ReturnOk());
+ expectMockListenerToReceive({buildCarDataInternal(102, {1})}).WillOnce(ReturnOk());
+ expectMockListenerToReceive({buildCarDataInternal(103, {1})}).WillOnce(ReturnOk());
+
+ mFakeLooper.poll(); // sends only 1 CarData (or possibly 2 depenending on impl)
+ mTelemetry->write(dataList2);
+ mFakeLooper.poll(); // all the polls below send the rest of the CarData
+ mFakeLooper.poll();
+ mFakeLooper.poll(); // extra poll to verify there was not excess push calls
+}
+
+TEST_F(TelemetryServerTest, ClearListenerDuringPushingDataToListener) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ mTelemetry->write(dataList);
+
+ expectMockListenerToReceive({buildCarDataInternal(101, {1})}).Times(1).WillOnce(ReturnOk());
+
+ mFakeLooper.poll();
+ mTelemetry->write(dataList);
+ mTelemetryInternal->clearListener();
+ mFakeLooper.poll();
+}
+
+TEST_F(TelemetryServerTest, RetriesPushAgainIfListenerFails) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ mTelemetry->write(dataList);
+
+ expectMockListenerToReceive({buildCarDataInternal(101, {1})})
+ .WillOnce(Return(ByMove(ScopedAStatus::fromExceptionCode(::EX_TRANSACTION_FAILED))))
+ .WillOnce(ReturnOk());
+
+ mFakeLooper.poll(); // listener returns ::EX_TRANSACTION_FAILED
+ mFakeLooper.poll();
+}
+
+// Tests a corner case to make sure `TelemetryServer::mPendingCarDataInternals` variable
+// is handled properly when transaction fails and clearListener() is called.
+TEST_F(TelemetryServerTest, ClearListenerDuringPushingDataAndSetListenerAgain) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ mTelemetry->write(dataList);
+
+ expectMockListenerToReceive({buildCarDataInternal(101, {1})})
+ .WillOnce(Return(ByMove(ScopedAStatus::fromExceptionCode(::EX_TRANSACTION_FAILED))))
+ .WillOnce(ReturnOk());
+
+ mFakeLooper.poll(); // listener returns ::EX_TRANSACTION_FAILED
+ mTelemetryInternal->clearListener();
+ mFakeLooper.poll(); // nothing happens
+ mTelemetryInternal->setListener(mMockCarDataListener);
+ mFakeLooper.poll(); // should work
+}
+
+// Directly calls pushCarDataToListeners() to make sure it can handle edge-cases.
+TEST_F(TelemetryServerTest, NoListenerButMultiplePushes) {
+ std::vector<CarData> dataList = {buildCarData(101, {1})};
+ mTelemetry->write(dataList);
+
+ mTelemetryServer.pushCarDataToListeners();
+ mTelemetryServer.pushCarDataToListeners();
+ mTelemetryServer.pushCarDataToListeners();
+
+ EXPECT_CALL(*mMockCarDataListener, onCarDataReceived(_)).Times(0);
+}
+
+// Directly calls pushCarDataToListeners() to make sure it can handle edge-cases.
+TEST_F(TelemetryServerTest, NoDataButMultiplePushes) {
+ mTelemetryInternal->setListener(mMockCarDataListener);
+
+ mTelemetryServer.pushCarDataToListeners();
+ mTelemetryServer.pushCarDataToListeners();
+ mTelemetryServer.pushCarDataToListeners();
+
+ EXPECT_CALL(*mMockCarDataListener, onCarDataReceived(_)).Times(0);
+}
+
+} // namespace telemetry
+} // namespace automotive
+} // namespace android