aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-06-30 02:26:57 +0000
committerAndroid Build Coastguard Worker <android-build-coastguard-worker@google.com>2022-06-30 02:26:57 +0000
commite48442b57202efba93d134ee45a41a855a6fdb66 (patch)
treec5f2a2dcaf67f2585397b094ff8660b4db0a99e3
parented336c3814c34eb097b243ef57e7f474820d53ca (diff)
parentcfc3dc6bb0d629357214cdcbc8ef7290fbe0c480 (diff)
downloadmodules-utils-android-13.0.0_r2.tar.gz
Change-Id: I51c768e6a810381c84d2046a37ea3d7ff9bf936c
-rw-r--r--java/com/android/modules/utils/SynchronousResultReceiver.java84
-rw-r--r--javatests/com/android/modules/utils/SynchronousResultReceiverTest.java12
2 files changed, 77 insertions, 19 deletions
diff --git a/java/com/android/modules/utils/SynchronousResultReceiver.java b/java/com/android/modules/utils/SynchronousResultReceiver.java
index ad270d2..c12d739 100644
--- a/java/com/android/modules/utils/SynchronousResultReceiver.java
+++ b/java/com/android/modules/utils/SynchronousResultReceiver.java
@@ -25,11 +25,14 @@ import android.os.RemoteException;
import android.os.SystemClock;
import android.util.Log;
+import com.android.internal.annotations.GuardedBy;
+
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -39,19 +42,59 @@ import java.util.concurrent.TimeoutException;
* Allow the server end to synchronously wait on the response from the client.
* This enables an RPC like system but with the ability to timeout and discard late results.
*
- * <p>NOTE: Can only be used for one response.
- * Subsequent responses on the same instance will throw a {@link IllegalStateException}
+ * <p>NOTE: Use the static {@link #get} method to retrieve an available instance of this class.
+ * If no instances are available, a new one is created.
*/
public final class SynchronousResultReceiver<T> implements Parcelable {
private static final String TAG = "SynchronousResultReceiver";
private final boolean mLocal;
private boolean mIsCompleted;
+ private final static Object sLock = new Object();
+ private final static int QUEUE_THRESHOLD = 4;
+
+ @GuardedBy("sLock")
+ private CompletableFuture<Result<T>> mFuture = new CompletableFuture<>();
+
+ @GuardedBy("sLock")
+ private static final ConcurrentLinkedQueue<SynchronousResultReceiver> sAvailableReceivers
+ = new ConcurrentLinkedQueue<>();
- public SynchronousResultReceiver() {
+ public static <T> SynchronousResultReceiver<T> get() {
+ synchronized(sLock) {
+ if (sAvailableReceivers.isEmpty()) {
+ return new SynchronousResultReceiver();
+ }
+ SynchronousResultReceiver receiver = sAvailableReceivers.poll();
+ receiver.resetLocked();
+ return receiver;
+ }
+ }
+
+ private SynchronousResultReceiver() {
mLocal = true;
mIsCompleted = false;
}
+ @GuardedBy("sLock")
+ private void releaseLocked() {
+ mFuture = null;
+ if (sAvailableReceivers.size() < QUEUE_THRESHOLD) {
+ sAvailableReceivers.add(this);
+ }
+ }
+
+ @GuardedBy("sLock")
+ private void resetLocked() {
+ mFuture = new CompletableFuture<>();
+ mIsCompleted = false;
+ }
+
+ private CompletableFuture<Result<T>> getFuture() {
+ synchronized (sLock) {
+ return mFuture;
+ }
+ }
+
public static class Result<T> implements Parcelable {
private final @Nullable T mObject;
private final RuntimeException mException;
@@ -105,20 +148,24 @@ public final class SynchronousResultReceiver<T> implements Parcelable {
};
}
- private final CompletableFuture<Result<T>> mFuture = new CompletableFuture<>();
-
private void complete(Result<T> result) {
if (mIsCompleted) {
throw new IllegalStateException("Receiver has already been completed");
}
mIsCompleted = true;
if (mLocal) {
- mFuture.complete(result);
- } else if (mReceiver != null) {
- try {
- mReceiver.send(result);
- } catch (RemoteException e) {
- Log.w(TAG, "Failed to complete future");
+ getFuture().complete(result);
+ } else {
+ final ISynchronousResultReceiver rr;
+ synchronized (this) {
+ rr = mReceiver;
+ }
+ if (rr != null) {
+ try {
+ rr.send(result);
+ } catch (RemoteException e) {
+ Log.w(TAG, "Failed to complete future");
+ }
}
}
}
@@ -160,7 +207,11 @@ public final class SynchronousResultReceiver<T> implements Parcelable {
Duration remainingTime = timeout;
while (!remainingTime.isNegative()) {
try {
- return mFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
+ Result<T> result = getFuture().get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
+ synchronized (sLock) {
+ releaseLocked();
+ return result;
+ }
} catch (ExecutionException e) {
// This will NEVER happen.
throw new AssertionError("Error receiving response", e);
@@ -171,6 +222,9 @@ public final class SynchronousResultReceiver<T> implements Parcelable {
Duration.ofNanos(SystemClock.elapsedRealtimeNanos() - startWaitNanoTime));
}
}
+ synchronized (sLock) {
+ releaseLocked();
+ }
throw new TimeoutException();
}
@@ -179,7 +233,11 @@ public final class SynchronousResultReceiver<T> implements Parcelable {
private final class MyResultReceiver extends ISynchronousResultReceiver.Stub {
public void send(@SuppressWarnings("rawtypes") @NonNull Result result) {
@SuppressWarnings("unchecked") Result<T> res = (Result<T>) result;
- mFuture.complete(res);
+ CompletableFuture<Result<T>> future;
+ future = getFuture();
+ if (future != null) {
+ future.complete(res);
+ }
}
}
diff --git a/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java b/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java
index 43345a9..82aa97d 100644
--- a/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java
+++ b/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java
@@ -35,7 +35,7 @@ public class SynchronousResultReceiverTest extends TestCase {
@Test
public void testSimpleData() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(true);
final boolean result = recv.awaitResultNoInterrupt(OK_TIME).getValue(false);
assertTrue(result);
@@ -43,7 +43,7 @@ public class SynchronousResultReceiverTest extends TestCase {
@Test
public void testDoubleComplete() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(true);
Assert.assertThrows(IllegalStateException.class,
() -> recv.send(true));
@@ -51,14 +51,14 @@ public class SynchronousResultReceiverTest extends TestCase {
@Test
public void testDefaultValue() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(null);
assertTrue(recv.awaitResultNoInterrupt(OK_TIME).getValue(true));
}
@Test
public void testPropagateException() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.propagateException(new RuntimeException("Placeholder exception"));
Assert.assertThrows(RuntimeException.class,
() -> recv.awaitResultNoInterrupt(OK_TIME).getValue(false));
@@ -66,14 +66,14 @@ public class SynchronousResultReceiverTest extends TestCase {
@Test
public void testTimeout() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
Assert.assertThrows(TimeoutException.class,
() -> recv.awaitResultNoInterrupt(OK_TIME));
}
@Test
public void testNegativeTime() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(false);
Assert.assertThrows(TimeoutException.class,
() -> recv.awaitResultNoInterrupt(NEG_TIME));