diff options
author | Alan Viverette <alanv@google.com> | 2019-06-13 10:44:42 -0400 |
---|---|---|
committer | android-build-prod (mdb) <android-build-team-robot@google.com> | 2019-06-14 17:03:07 +0000 |
commit | 583d4a73e224b965b882cb71003e9561f2aa074d (patch) | |
tree | adba274dd22ecbb35de051303bcdb99ce7862a94 | |
parent | 98698bb066c4cc4162cad2b337e3b58227c9399b (diff) | |
download | support-583d4a73e224b965b882cb71003e9561f2aa074d.tar.gz |
Inline concurrent-futures classes into media2
Temporary to avoid blocking media2 release on API Council feedback for
callback library. Reverts futures library version to beta02. Replaces
unused media2-widget futures dependency with direct dependency on Guava
ListenableFuture.
Bug: 135057831
Test: ./gradlew buildOnServer
Change-Id: I2d7a49cd98cd9f100213b34ce7d57b5d9013805e
(cherry picked from commit 386a2ce143d59278750ca34b6f5fbf5a79178498)
23 files changed, 3958 insertions, 16 deletions
diff --git a/buildSrc/src/main/kotlin/androidx/build/LibraryVersions.kt b/buildSrc/src/main/kotlin/androidx/build/LibraryVersions.kt index 7cc500f09f1..ac3eb56a1ed 100644 --- a/buildSrc/src/main/kotlin/androidx/build/LibraryVersions.kt +++ b/buildSrc/src/main/kotlin/androidx/build/LibraryVersions.kt @@ -55,7 +55,7 @@ object LibraryVersions { val ENTERPRISE = Version("1.0.0-alpha03") val EXIFINTERFACE = Version("1.1.0-alpha02") val FRAGMENT = Version("1.1.0-beta02") - val FUTURES = Version("1.0.0-rc01") + val FUTURES = Version("1.0.0-beta02") val GRIDLAYOUT = Version("1.1.0-alpha01") val HEIFWRITER = Version("1.1.0-alpha01") val INTERPOLATOR = Version("1.1.0-alpha01") diff --git a/media2/common/build.gradle b/media2/common/build.gradle index 3244e6925e3..2c99d5a1174 100644 --- a/media2/common/build.gradle +++ b/media2/common/build.gradle @@ -11,7 +11,8 @@ plugins { dependencies { api(project(":media")) api(GUAVA_LISTENABLE_FUTURE) - implementation(project(":concurrent:concurrent-futures")) + // Temporarily removed to unblock stable release. + //implementation(project(":concurrent:concurrent-futures")) implementation("androidx.collection:collection:1.0.0") compileOnly(CHECKER_FRAMEWORK) diff --git a/media2/common/src/main/java/androidx/media2/common/SessionPlayer.java b/media2/common/src/main/java/androidx/media2/common/SessionPlayer.java index 94eff886f87..486e4f2656f 100644 --- a/media2/common/src/main/java/androidx/media2/common/SessionPlayer.java +++ b/media2/common/src/main/java/androidx/media2/common/SessionPlayer.java @@ -33,9 +33,9 @@ import androidx.annotation.IntRange; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.annotation.RestrictTo; -import androidx.concurrent.futures.ResolvableFuture; import androidx.core.util.Pair; import androidx.media.AudioAttributesCompat; +import androidx.media2.common.futures.ResolvableFuture; import androidx.versionedparcelable.CustomVersionedParcelable; import androidx.versionedparcelable.NonParcelField; import androidx.versionedparcelable.ParcelField; diff --git a/media2/common/src/main/java/androidx/media2/common/futures/AbstractResolvableFuture.java b/media2/common/src/main/java/androidx/media2/common/futures/AbstractResolvableFuture.java new file mode 100644 index 00000000000..f5a23430929 --- /dev/null +++ b/media2/common/src/main/java/androidx/media2/common/futures/AbstractResolvableFuture.java @@ -0,0 +1,1206 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.common.futures; + +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; + +import androidx.annotation.NonNull; +import androidx.annotation.Nullable; +import androidx.annotation.RestrictTo; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Locale; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An AndroidX version of Guava's {@code AbstractFuture}. + * <p> + * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. A more + * common ways to create a {@code ListenableFuture} is to instantiate {@link ResolvableFuture}. + * + * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way + * to set the result of the computation through the protected methods {@link #set(Object)}, {@link + * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override + * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses + * should rarely override other methods. + * + * @author Sven Mawson + * @author Luke Sandberg + * @hide + */ +// TODO(b/119308748): Implement InternalFutureFailureAccess +@SuppressWarnings("ShortCircuitBoolean") // we use non-short circuiting comparisons intentionally +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +public abstract class AbstractResolvableFuture<V> implements ListenableFuture<V> { + + // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static final boolean GENERATE_CANCELLATION_CAUSES = + Boolean.parseBoolean( + System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); + + // Logger to log exceptions caught when running listeners. + private static final Logger log = Logger.getLogger( + AbstractResolvableFuture.class.getName()); + + // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of + // blocking. This value is what AbstractQueuedSynchronizer uses. + private static final long SPIN_THRESHOLD_NANOS = 1000L; + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static final AtomicHelper ATOMIC_HELPER; + + static { + AtomicHelper helper; + Throwable thrownAtomicReferenceFieldUpdaterFailure = null; + + // The access control checks that ARFU does means the caller class has to be + // AbstractFuture instead of SafeAtomicHelper, so we annoyingly define these here + try { + helper = + new SafeAtomicHelper( + newUpdater(Waiter.class, Thread.class, "thread"), + newUpdater(Waiter.class, Waiter.class, "next"), + newUpdater(AbstractResolvableFuture.class, Waiter.class, "waiters"), + newUpdater( + AbstractResolvableFuture.class, + Listener.class, + "listeners"), + newUpdater(AbstractResolvableFuture.class, Object.class, "value")); + } catch (Throwable atomicReferenceFieldUpdaterFailure) { + // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause + // getDeclaredField to throw a NoSuchFieldException when the field is definitely + // there. For these users fallback to a suboptimal implementation, + // based on synchronized. This will be a definite performance hit to those users. + thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; + helper = new SynchronizedHelper(); + } + + ATOMIC_HELPER = helper; + + // Prevent rare disastrous classloading in first call to LockSupport.park. + // See: https://bugs.openjdk.java.net/browse/JDK-8074773 + @SuppressWarnings("unused") + Class<?> ensureLoaded = LockSupport.class; + + // Log after all static init is finished; if an installed logger uses any Futures + // methods, it shouldn't break in cases where reflection is missing/broken. + if (thrownAtomicReferenceFieldUpdaterFailure != null) { + log.log(Level.SEVERE, "SafeAtomicHelper is broken!", + thrownAtomicReferenceFieldUpdaterFailure); + } + } + + /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ + private static final class Waiter { + static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); + + @Nullable + volatile Thread thread; + @Nullable + volatile Waiter next; + + /** + * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded + * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. + */ + Waiter(boolean unused) { + } + + Waiter() { + // avoid volatile write, write is made visible by subsequent CAS on waiters field + ATOMIC_HELPER.putThread(this, Thread.currentThread()); + } + + // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters + // field. + void setNext(Waiter next) { + ATOMIC_HELPER.putNext(this, next); + } + + void unpark() { + // This is racy with removeWaiter. The consequence of the race is that we may + // spuriously call unpark even though the thread has already removed itself + // from the list. But even if we did use a CAS, that race would still exist + // (it would just be ever so slightly smaller). + Thread w = thread; + if (w != null) { + thread = null; + LockSupport.unpark(w); + } + } + } + + /** + * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted + * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are + * saved by two things. + * + * <ul> + * <li>This is only called when a waiting thread times out or is interrupted. Both of which + * should be rare. + * <li>The waiters list should be very short. + * </ul> + */ + private void removeWaiter(Waiter node) { + node.thread = null; // mark as 'deleted' + restart: + while (true) { + Waiter pred = null; + Waiter curr = waiters; + if (curr == Waiter.TOMBSTONE) { + return; // give up if someone is calling complete + } + Waiter succ; + while (curr != null) { + succ = curr.next; + if (curr.thread != null) { // we aren't unlinking this node, update pred. + pred = curr; + } else if (pred != null) { // We are unlinking this node and it has a predecessor. + pred.next = succ; + if (pred.thread == null) { + // We raced with another node that unlinked pred. Restart. + continue restart; + } + } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { + // We are unlinking head + continue restart; // We raced with an add or complete + } + curr = succ; + } + break; + } + } + + /** Listeners also form a stack through the {@link #listeners} field. */ + private static final class Listener { + static final Listener TOMBSTONE = new Listener(null, null); + final Runnable task; + final Executor executor; + + // writes to next are made visible by subsequent CAS's on the listeners field + @Nullable + Listener next; + + Listener(Runnable task, Executor executor) { + this.task = task; + this.executor = executor; + } + } + + /** A special value to represent {@code null}. */ + private static final Object NULL = new Object(); + + /** A special value to represent failure, when {@link #setException} is called successfully. */ + private static final class Failure { + static final Failure FALLBACK_INSTANCE = + new Failure( + new Throwable("Failure occurred while trying to finish a future.") { + @Override + public synchronized Throwable fillInStackTrace() { + return this; // no stack trace + } + }); + final Throwable exception; + + Failure(Throwable exception) { + this.exception = checkNotNull(exception); + } + } + + /** A special value to represent cancellation and the 'wasInterrupted' bit. */ + private static final class Cancellation { + // constants to use when GENERATE_CANCELLATION_CAUSES = false + static final Cancellation CAUSELESS_INTERRUPTED; + static final Cancellation CAUSELESS_CANCELLED; + + static { + if (GENERATE_CANCELLATION_CAUSES) { + CAUSELESS_CANCELLED = null; + CAUSELESS_INTERRUPTED = null; + } else { + CAUSELESS_CANCELLED = new Cancellation(false, null); + CAUSELESS_INTERRUPTED = new Cancellation(true, null); + } + } + + final boolean wasInterrupted; + @Nullable + final Throwable cause; + + Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { + this.wasInterrupted = wasInterrupted; + this.cause = cause; + } + } + + /** A special value that encodes the 'setFuture' state. */ + private static final class SetFuture<V> implements Runnable { + final AbstractResolvableFuture<V> owner; + final ListenableFuture<? extends V> future; + + SetFuture(AbstractResolvableFuture<V> owner, ListenableFuture<? extends V> future) { + this.owner = owner; + this.future = future; + } + + @Override + public void run() { + if (owner.value != this) { + // nothing to do, we must have been cancelled, don't bother inspecting the future. + return; + } + Object valueToSet = getFutureValue(future); + if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { + complete(owner); + } + } + } + + // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is + // available. + /** + * This field encodes the current state of the future. + * + * <p>The valid values are: + * + * <ul> + * <li>{@code null} initial state, nothing has happened. + * <li>{@link Cancellation} terminal state, {@code cancel} was called. + * <li>{@link Failure} terminal state, {@code setException} was called. + * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. + * <li>{@link #NULL} terminal state, {@code set(null)} was called. + * <li>Any other non-null value, terminal state, {@code set} was called with a non-null + * argument. + * </ul> + */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Object value; + + /** All listeners. */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Listener listeners; + + /** All waiting threads. */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Waiter waiters; + + /** Constructor for use by subclasses. */ + protected AbstractResolvableFuture() { + } + + // Gets and Timed Gets + // + // * Be responsive to interruption + // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the + // waiters field. + // * Future completion is defined by when #value becomes non-null/non SetFuture + // * Future completion can be observed if the waiters field contains a TOMBSTONE + + // Timed Get + // There are a few design constraints to consider + // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I + // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the + // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of + // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for + // similar purposes. + // * We want to behave reasonably for timeouts of 0 + // * We are more responsive to completion than timeouts. This is because parkNanos depends on + // system scheduling and as such we could either miss our deadline, or unpark() could be + // delayed so that it looks like we timed out even though we didn't. For comparison FutureTask + // respects completion preferably and AQS is non-deterministic (depends on where in the queue + // the waiter is). If we wanted to be strict about it, we could store the unpark() time in + // the Waiter node and we could use that to make a decision about whether or not we timed out + // prior to being unparked. + + /** + * {@inheritDoc} + * + * <p>The default {@link AbstractResolvableFuture} implementation throws + * {@code InterruptedException} if the current thread is interrupted during the call, even if + * the value is already available. + * + * @throws CancellationException {@inheritDoc} + */ + @Override + public final V get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException { + // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) + // loop at the bottom and throw a timeoutexception. + // we rely on the implicit null check on unit. + final long timeoutNanos = unit.toNanos(timeout); + long remainingNanos = timeoutNanos; + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + // we delay calling nanoTime until we know we will need to either park or spin + final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; + long_wait_loop: + if (remainingNanos >= SPIN_THRESHOLD_NANOS) { + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + while (true) { + LockSupport.parkNanos(this, remainingNanos); + // Check interruption first, if we woke up due to interruption we + // need to honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + + // Otherwise re-read and check doneness. If we loop then it must have + // been a spurious wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + + // timed out? + remainingNanos = endNanos - System.nanoTime(); + if (remainingNanos < SPIN_THRESHOLD_NANOS) { + // Remove the waiter, one way or another we are done parking this + // thread. + removeWaiter(node); + break long_wait_loop; // jump down to the busy wait loop + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE while trying + // to add a waiter. + return getDoneValue(value); + } + // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node + // on the waiters list + while (remainingNanos > 0) { + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + if (Thread.interrupted()) { + throw new InterruptedException(); + } + remainingNanos = endNanos - System.nanoTime(); + } + + String futureToString = toString(); + final String unitString = unit.toString().toLowerCase(Locale.ROOT); + String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); + // Only report scheduling delay if larger than our spin threshold - otherwise it's just + // noise + if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { + // We over-waited for our timeout. + message += " (plus "; + long overWaitNanos = -remainingNanos; + long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); + long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); + boolean shouldShowExtraNanos = + overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; + if (overWaitUnits > 0) { + message += overWaitUnits + " " + unitString; + if (shouldShowExtraNanos) { + message += ","; + } + message += " "; + } + if (shouldShowExtraNanos) { + message += overWaitLeftoverNanos + " nanoseconds "; + } + + message += "delay)"; + } + // It's confusing to see a completed future in a timeout message; if isDone() returns false, + // then we know it must have given a pending toString value earlier. If not, then the future + // completed after the timeout expired, and the message might be success. + if (isDone()) { + throw new TimeoutException(message + " but future completed as timeout expired"); + } + throw new TimeoutException(message + " for " + futureToString); + } + + /** + * {@inheritDoc} + * + * <p>The default {@link AbstractResolvableFuture} implementation throws + * {@code InterruptedException} if the current thread is interrupted during the call, even if + * the value is already available. + * + * @throws CancellationException {@inheritDoc} + */ + @Override + public final V get() throws InterruptedException, ExecutionException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + // we are on the stack, now wait for completion. + while (true) { + LockSupport.park(this); + // Check interruption first, if we woke up due to interruption we need to + // honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + // Otherwise re-read and check doneness. If we loop then it must have + // been a spurious + // wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE while trying to + // add a waiter. + return getDoneValue(value); + } + + /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ + private V getDoneValue(Object obj) throws ExecutionException { + // While this seems like it might be too branch-y, simple benchmarking proves it to be + // unmeasurable (comparing done AbstractFutures with immediateFuture) + if (obj instanceof Cancellation) { + throw cancellationExceptionWithCause( + "Task was cancelled.", + ((Cancellation) obj).cause); + } else if (obj instanceof Failure) { + throw new ExecutionException(((Failure) obj).exception); + } else if (obj == NULL) { + return null; + } else { + @SuppressWarnings("unchecked") // this is the only other option + V asV = (V) obj; + return asV; + } + } + + @Override + public final boolean isDone() { + final Object localValue = value; + return localValue != null & !(localValue instanceof SetFuture); + } + + @Override + public final boolean isCancelled() { + final Object localValue = value; + return localValue instanceof Cancellation; + } + + /** + * {@inheritDoc} + * + * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been + * {@linkplain #setFuture set asynchronously}, then the cancellation will also be propagated + * to the delegate {@code Future} that was supplied in the {@code setFuture} call. + * + * <p>Rather than override this method to perform additional cancellation work or cleanup, + * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link + * #wasInterrupted} as necessary. This ensures that the work is done even if the future is + * cancelled without a call to {@code cancel}, such as by calling {@code + * setFuture(cancelledFuture)}. + */ + @Override + public final boolean cancel(boolean mayInterruptIfRunning) { + Object localValue = value; + boolean rValue = false; + if (localValue == null | localValue instanceof SetFuture) { + // Try to delay allocating the exception. At this point we may still lose the CAS, + // but it is certainly less likely. + Object valueToSet = + GENERATE_CANCELLATION_CAUSES + ? new Cancellation( + mayInterruptIfRunning, + new CancellationException("Future.cancel() was called.")) + : (mayInterruptIfRunning + ? Cancellation.CAUSELESS_INTERRUPTED + : Cancellation.CAUSELESS_CANCELLED); + AbstractResolvableFuture<?> abstractFuture = this; + while (true) { + if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { + rValue = true; + // We call interuptTask before calling complete(), which is consistent with + // FutureTask + if (mayInterruptIfRunning) { + abstractFuture.interruptTask(); + } + complete(abstractFuture); + if (localValue instanceof SetFuture) { + // propagate cancellation to the future set in setfuture, this is racy, + // and we don't + // care if we are successful or not. + ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; + if (futureToPropagateTo instanceof AbstractResolvableFuture) { + // If the future is a trusted then we specifically avoid + // calling cancel() this has 2 benefits + // 1. for long chains of futures strung together with setFuture we + // consume less stack + // 2. we avoid allocating Cancellation objects at every level of the + // cancellation chain + // We can only do this for TrustedFuture, because TrustedFuture + // .cancel is final and does nothing but delegate to this method. + AbstractResolvableFuture<?> trusted = + (AbstractResolvableFuture<?>) futureToPropagateTo; + localValue = trusted.value; + if (localValue == null | localValue instanceof SetFuture) { + abstractFuture = trusted; + continue; // loop back up and try to complete the new future + } + } else { + // not a TrustedFuture, call cancel directly. + futureToPropagateTo.cancel(mayInterruptIfRunning); + } + } + break; + } + // obj changed, reread + localValue = abstractFuture.value; + if (!(localValue instanceof SetFuture)) { + // obj cannot be null at this point, because value can only change from null + // to non-null. So if value changed (and it did since we lost the CAS), + // then it cannot be null and since it isn't a SetFuture, then the future must + // be done and we should exit the loop + break; + } + } + } + return rValue; + } + + /** + * Subclasses can override this method to implement interruption of the future's computation. + * The method is invoked automatically by a successful call to + * {@link #cancel(boolean) cancel(true)}. + * + * <p>The default implementation does nothing. + * + * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking + * {@link #wasInterrupted} to decide whether to interrupt your task. + * + * @since 10.0 + */ + protected void interruptTask() { + } + + /** + * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code + * true}. + * + * @since 14.0 + */ + protected final boolean wasInterrupted() { + final Object localValue = value; + return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; + } + + /** + * {@inheritDoc} + * + * @since 10.0 + */ + @Override + public final void addListener(Runnable listener, Executor executor) { + checkNotNull(listener); + checkNotNull(executor); + Listener oldHead = listeners; + if (oldHead != Listener.TOMBSTONE) { + Listener newNode = new Listener(listener, executor); + do { + newNode.next = oldHead; + if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { + return; + } + oldHead = listeners; // re-read + } while (oldHead != Listener.TOMBSTONE); + } + // If we get here then the Listener TOMBSTONE was set, which means the future is done, call + // the listener. + executeListener(listener, executor); + } + + /** + * Sets the result of this {@code Future} unless this {@code Future} has already been + * cancelled or set (including {@linkplain #setFuture set asynchronously}). + * When a call to this method returns, the {@code Future} is guaranteed to be + * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which case it returns + * {@code true}). If it returns {@code false}, the {@code Future} may have previously been set + * asynchronously, in which case its result may not be known yet. That result, + * though not yet known, cannot be overridden by a call to a {@code set*} method, + * only by a call to {@link #cancel}. + * + * @param value the value to be used as the result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean set(@Nullable V value) { + Object valueToSet = value == null ? NULL : value; + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the failed result of this {@code Future} unless this {@code Future} has already been + * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this + * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only + * if</b> + * the call was accepted (in which case it returns {@code true}). If it returns {@code + * false}, the + * {@code Future} may have previously been set asynchronously, in which case its result may + * not be + * known yet. That result, though not yet known, cannot be overridden by a call to a {@code + * set*} + * method, only by a call to {@link #cancel}. + * + * @param throwable the exception to be used as the failed result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean setException(Throwable throwable) { + Object valueToSet = new Failure(checkNotNull(throwable)); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the result of this {@code Future} to match the supplied input {@code Future} once the + * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set + * (including "set asynchronously," defined below). + * + * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the + * call is accepted, then this future is guaranteed to have been completed with the supplied + * future by the time this method returns. If the supplied future is not done and the call + * is accepted, then the future will be <i>set asynchronously</i>. Note that such a result, + * though not yet known, cannot be overridden by a call to a {@code set*} method, + * only by a call to {@link #cancel}. + * + * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later + * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to + * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code + * Future}. + * + * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, + * it will never trigger interruption behavior. In particular, it will not cause this future to + * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not + * return {@code true}. + * + * @param future the future to delegate to + * @return true if the attempt was accepted, indicating that the {@code Future} was not + * previously cancelled or set. + * @since 19.0 + */ + protected boolean setFuture(ListenableFuture<? extends V> future) { + checkNotNull(future); + Object localValue = value; + if (localValue == null) { + if (future.isDone()) { + Object value = getFutureValue(future); + if (ATOMIC_HELPER.casValue(this, null, value)) { + complete(this); + return true; + } + return false; + } + SetFuture valueToSet = new SetFuture<V>(this, future); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + // the listener is responsible for calling completeWithFuture, directExecutor is + // appropriate since all we are doing is unpacking a completed future + // which should be fast. + try { + future.addListener(valueToSet, DirectExecutor.INSTANCE); + } catch (Throwable t) { + // addListener has thrown an exception! SetFuture.run can't throw any + // exceptions so this must have been caused by addListener itself. + // The most likely explanation is a misconfigured mock. + // Try to switch to Failure. + Failure failure; + try { + failure = new Failure(t); + } catch (Throwable oomMostLikely) { + failure = Failure.FALLBACK_INSTANCE; + } + // Note: The only way this CAS could fail is if cancel() has raced with us. + // That is ok. + boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); + } + return true; + } + localValue = value; // we lost the cas, fall through and maybe cancel + } + // The future has already been set to something. If it is cancellation we should cancel the + // incoming future. + if (localValue instanceof Cancellation) { + // we don't care if it fails, this is best-effort. + future.cancel(((Cancellation) localValue).wasInterrupted); + } + return false; + } + + /** + * Returns a value that satisfies the contract of the {@link #value} field based on the state of + * given future. + * + * <p>This is approximately the inverse of {@link #getDoneValue(Object)} + */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static Object getFutureValue(ListenableFuture<?> future) { + if (future instanceof AbstractResolvableFuture) { + // Break encapsulation for TrustedFuture instances since we know that subclasses cannot + // override .get() (since it is final) and therefore this is equivalent to calling + // .get() and unpacking the exceptions like we do below (just much faster because it is + // a single field read instead of a read, several branches and possibly + // creating exceptions). + Object v = ((AbstractResolvableFuture<?>) future).value; + if (v instanceof Cancellation) { + // If the other future was interrupted, clear the interrupted bit while + // preserving the cause this will make it consistent with how non-trustedfutures + // work which cannot propagate the wasInterrupted bit + Cancellation c = (Cancellation) v; + if (c.wasInterrupted) { + v = c.cause != null ? new Cancellation(/* wasInterrupted= */ false, c.cause) + : Cancellation.CAUSELESS_CANCELLED; + } + } + return v; + } + boolean wasCancelled = future.isCancelled(); + // Don't allocate a CancellationException if it's not necessary + if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { + return Cancellation.CAUSELESS_CANCELLED; + } + // Otherwise calculate the value by calling .get() + try { + Object v = getUninterruptibly(future); + return v == null ? NULL : v; + } catch (ExecutionException exception) { + return new Failure(exception.getCause()); + } catch (CancellationException cancellation) { + if (!wasCancelled) { + return new Failure( + new IllegalArgumentException( + "get() threw CancellationException, despite reporting isCancelled" + + "() == false: " + + future, + cancellation)); + } + return new Cancellation(false, cancellation); + } catch (Throwable t) { + return new Failure(t); + } + } + + /** + * internal dependency on other /util/concurrent classes. + */ + private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { + boolean interrupted = false; + try { + while (true) { + try { + return future.get(); + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** Unblocks all threads and runs all listeners. */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static void complete(AbstractResolvableFuture<?> future) { + Listener next = null; + outer: + while (true) { + future.releaseWaiters(); + // We call this before the listeners in order to avoid needing to manage a separate + // stack data structure for them. Also, some implementations rely on this running + // prior to listeners so that the cleanup work is visible to listeners. + // afterDone() should be generally fast and only used for cleanup work... but in + // theory can also be recursive and create StackOverflowErrors + future.afterDone(); + // push the current set of listeners onto next + next = future.clearListeners(next); + future = null; + while (next != null) { + Listener curr = next; + next = next.next; + Runnable task = curr.task; + if (task instanceof SetFuture) { + SetFuture<?> setFuture = (SetFuture<?>) task; + // We unwind setFuture specifically to avoid StackOverflowErrors in the case + // of long chains of SetFutures + // Handling this special case is important because there is no way to pass an + // executor to setFuture, so a user couldn't break the chain by doing this + // themselves. It is also potentially common if someone writes a recursive + // Futures.transformAsync transformer. + future = setFuture.owner; + if (future.value == setFuture) { + Object valueToSet = getFutureValue(setFuture.future); + if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { + continue outer; + } + } + // other wise the future we were trying to set is already done. + } else { + executeListener(task, curr.executor); + } + } + break; + } + } + + /** + * Callback method that is called exactly once after the future is completed. + * + * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. + * + * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is + * intended for very lightweight cleanup work, for example, timing statistics or clearing + * fields. + * If your task does anything heavier consider, just using a listener with an executor. + * + * @since 20.0 + */ + protected void afterDone() { + } + + /** + * If this future has been cancelled (and possibly interrupted), cancels (and possibly + * interrupts) the given future (if available). + */ + @SuppressWarnings("ParameterNotNullable") + final void maybePropagateCancellationTo(@Nullable Future<?> related) { + if (related != null & isCancelled()) { + related.cancel(wasInterrupted()); + } + } + + /** Releases all threads in the {@link #waiters} list, and clears the list. */ + private void releaseWaiters() { + Waiter head; + do { + head = waiters; + } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); + for (Waiter currentWaiter = head; currentWaiter != null; + currentWaiter = currentWaiter.next) { + currentWaiter.unpark(); + } + } + + /** + * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently + * added first. + */ + private Listener clearListeners(Listener onto) { + // We need to + // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to + // to synchronize with us + // 2. reverse the linked list, because despite our rather clear contract, people depend + // on us executing listeners in the order they were added + // 3. push all the items onto 'onto' and return the new head of the stack + Listener head; + do { + head = listeners; + } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); + Listener reversedList = onto; + while (head != null) { + Listener tmp = head; + head = head.next; + tmp.next = reversedList; + reversedList = tmp; + } + return reversedList; + } + + // TODO(clm): move parts into a default method on ListenableFuture? + @Override + public String toString() { + StringBuilder builder = new StringBuilder().append(super.toString()).append("[status="); + if (isCancelled()) { + builder.append("CANCELLED"); + } else if (isDone()) { + addDoneString(builder); + } else { + String pendingDescription; + try { + pendingDescription = pendingToString(); + } catch (RuntimeException e) { + // Don't call getMessage or toString() on the exception, in case the exception + // thrown by the subclass is implemented with bugs similar to the subclass. + pendingDescription = "Exception thrown from implementation: " + e.getClass(); + } + // The future may complete during or before the call to getPendingToString, so we use + // null as a signal that we should try checking if the future is done again. + if (pendingDescription != null && !pendingDescription.isEmpty()) { + builder.append("PENDING, info=[").append(pendingDescription).append("]"); + } else if (isDone()) { + addDoneString(builder); + } else { + builder.append("PENDING"); + } + } + return builder.append("]").toString(); + } + + /** + * Provide a human-readable explanation of why this future has not yet completed. + * + * @return null if an explanation cannot be provided because the future is done. + * @since 23.0 + */ + @Nullable + protected String pendingToString() { + Object localValue = value; + if (localValue instanceof SetFuture) { + return "setFuture=[" + userObjectToString(((SetFuture) localValue).future) + "]"; + } else if (this instanceof ScheduledFuture) { + return "remaining delay=[" + + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) + + " ms]"; + } + return null; + } + + private void addDoneString(StringBuilder builder) { + try { + V value = getUninterruptibly(this); + builder.append("SUCCESS, result=[").append(userObjectToString(value)).append("]"); + } catch (ExecutionException e) { + builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); + } catch (CancellationException e) { + builder.append("CANCELLED"); // shouldn't be reachable + } catch (RuntimeException e) { + builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); + } + } + + /** Helper for printing user supplied objects into our toString method. */ + private String userObjectToString(Object o) { + // This is some basic recursion detection for when people create cycles via set/setFuture + // This is however only partial protection though since it only detects self loops. We + // could detect arbitrary cycles using a thread local or possibly by catching + // StackOverflowExceptions but this should be a good enough solution + // (it is also what jdk collections do in these cases) + if (o == this) { + return "this future"; + } + return String.valueOf(o); + } + + /** + * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain + * RuntimeException runtime exceptions} thrown by the executor. + */ + private static void executeListener(Runnable runnable, Executor executor) { + try { + executor.execute(runnable); + } catch (RuntimeException e) { + // Log it and keep going -- bad runnable and/or executor. Don't punish the other + // runnables if we're given a bad one. We only catch RuntimeException + // because we want Errors to propagate up. + log.log( + Level.SEVERE, + "RuntimeException while executing runnable " + runnable + " with executor " + + executor, + e); + } + } + + private abstract static class AtomicHelper { + /** Non volatile write of the thread to the {@link Waiter#thread} field. */ + abstract void putThread(Waiter waiter, Thread newValue); + + /** Non volatile write of the waiter to the {@link Waiter#next} field. */ + abstract void putNext(Waiter waiter, Waiter newValue); + + /** Performs a CAS operation on the {@link #waiters} field. */ + abstract boolean casWaiters( + AbstractResolvableFuture<?> future, + Waiter expect, + Waiter update); + + /** Performs a CAS operation on the {@link #listeners} field. */ + abstract boolean casListeners( + AbstractResolvableFuture<?> future, + Listener expect, + Listener update); + + /** Performs a CAS operation on the {@link #value} field. */ + abstract boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update); + } + + /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ + private static final class SafeAtomicHelper extends AtomicHelper { + final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; + final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Waiter> waitersUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Listener> listenersUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Object> valueUpdater; + + SafeAtomicHelper( + AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, + AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Waiter> waitersUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Listener> listenersUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Object> valueUpdater) { + this.waiterThreadUpdater = waiterThreadUpdater; + this.waiterNextUpdater = waiterNextUpdater; + this.waitersUpdater = waitersUpdater; + this.listenersUpdater = listenersUpdater; + this.valueUpdater = valueUpdater; + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + waiterThreadUpdater.lazySet(waiter, newValue); + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiterNextUpdater.lazySet(waiter, newValue); + } + + @Override + boolean casWaiters(AbstractResolvableFuture<?> future, Waiter expect, Waiter update) { + return waitersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casListeners(AbstractResolvableFuture<?> future, Listener expect, Listener update) { + return listenersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update) { + return valueUpdater.compareAndSet(future, expect, update); + } + } + + /** + * {@link AtomicHelper} based on {@code synchronized} and volatile writes. + * + * <p>This is an implementation of last resort for when certain basic VM features are broken + * (like AtomicReferenceFieldUpdater). + */ + private static final class SynchronizedHelper extends AtomicHelper { + SynchronizedHelper() { + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + waiter.thread = newValue; + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiter.next = newValue; + } + + @Override + boolean casWaiters(AbstractResolvableFuture<?> future, Waiter expect, Waiter update) { + synchronized (future) { + if (future.waiters == expect) { + future.waiters = update; + return true; + } + return false; + } + } + + @Override + boolean casListeners(AbstractResolvableFuture<?> future, Listener expect, Listener update) { + synchronized (future) { + if (future.listeners == expect) { + future.listeners = update; + return true; + } + return false; + } + } + + @Override + boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update) { + synchronized (future) { + if (future.value == expect) { + future.value = update; + return true; + } + return false; + } + } + } + + private static CancellationException cancellationExceptionWithCause( + @Nullable String message, @Nullable Throwable cause) { + CancellationException exception = new CancellationException(message); + exception.initCause(cause); + return exception; + } + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + @NonNull + static <T> T checkNotNull(@Nullable T reference) { + if (reference == null) { + throw new NullPointerException(); + } + return reference; + } +} diff --git a/media2/common/src/main/java/androidx/media2/common/futures/DirectExecutor.java b/media2/common/src/main/java/androidx/media2/common/futures/DirectExecutor.java new file mode 100644 index 00000000000..8552fed5758 --- /dev/null +++ b/media2/common/src/main/java/androidx/media2/common/futures/DirectExecutor.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.common.futures; + +import androidx.annotation.RestrictTo; + +import java.util.concurrent.Executor; + +/** + * An {@link Executor} that runs each task in the thread that invokes {@link Executor#execute + * execute}. + * @hide + */ +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public String toString() { + return "DirectExecutor"; + } +}
\ No newline at end of file diff --git a/media2/common/src/main/java/androidx/media2/common/futures/ResolvableFuture.java b/media2/common/src/main/java/androidx/media2/common/futures/ResolvableFuture.java new file mode 100644 index 00000000000..fcd374aaf51 --- /dev/null +++ b/media2/common/src/main/java/androidx/media2/common/futures/ResolvableFuture.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.common.futures; + + +import androidx.annotation.Nullable; +import androidx.annotation.RestrictTo; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * An AndroidX version of Guava's {@code SettableFuture}. + * <p> + * A {@link ListenableFuture} whose result can be set by a {@link #set(Object)}, {@link + * #setException(Throwable)} or {@link #setFuture(ListenableFuture)} call. It can also, like any + * other {@code Future}, be {@linkplain #cancel cancelled}. + * <p> + * If your needs are more complex than {@code ResolvableFuture} supports, use {@link + * AbstractResolvableFuture}, which offers an extensible version of the API. + * + * @hide + * @author Sven Mawson + */ +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +public final class ResolvableFuture<V> extends AbstractResolvableFuture<V> { + /** + * Creates a new {@code ResolvableFuture} that can be completed or cancelled by a later method + * call. + */ + public static <V> ResolvableFuture<V> create() { + return new ResolvableFuture<>(); + } + + @Override + public boolean set(@Nullable V value) { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + + @Override + public boolean setFuture(ListenableFuture<? extends V> future) { + return super.setFuture(future); + } + + private ResolvableFuture() { + } +} + diff --git a/media2/player/build.gradle b/media2/player/build.gradle index e831eff4104..b8a30257ad0 100644 --- a/media2/player/build.gradle +++ b/media2/player/build.gradle @@ -13,7 +13,8 @@ dependencies { api(project(":media")) api(project(":media2:media2-common")) api(GUAVA_LISTENABLE_FUTURE) - implementation(project(":concurrent:concurrent-futures")) + // Temporarily removed to unblock stable release. + //implementation(project(":concurrent:concurrent-futures")) implementation("androidx.collection:collection:1.0.0") compileOnly(CHECKER_FRAMEWORK) // Depend on media2-exoplayer so that the library groupId is set to match media2. diff --git a/media2/player/src/main/java/androidx/media2/player/MediaPlayer.java b/media2/player/src/main/java/androidx/media2/player/MediaPlayer.java index fba2555621c..eaef992a98d 100644 --- a/media2/player/src/main/java/androidx/media2/player/MediaPlayer.java +++ b/media2/player/src/main/java/androidx/media2/player/MediaPlayer.java @@ -47,8 +47,6 @@ import androidx.annotation.RequiresApi; import androidx.annotation.RestrictTo; import androidx.annotation.VisibleForTesting; import androidx.collection.ArrayMap; -import androidx.concurrent.futures.AbstractResolvableFuture; -import androidx.concurrent.futures.ResolvableFuture; import androidx.core.util.Pair; import androidx.media.AudioAttributesCompat; import androidx.media2.common.FileMediaItem; @@ -57,6 +55,8 @@ import androidx.media2.common.MediaMetadata; import androidx.media2.common.SessionPlayer; import androidx.media2.common.SubtitleData; import androidx.media2.common.UriMediaItem; +import androidx.media2.player.futures.AbstractResolvableFuture; +import androidx.media2.player.futures.ResolvableFuture; import com.google.common.util.concurrent.ListenableFuture; diff --git a/media2/player/src/main/java/androidx/media2/player/exoplayer/ExoPlayerMediaPlayer2Impl.java b/media2/player/src/main/java/androidx/media2/player/exoplayer/ExoPlayerMediaPlayer2Impl.java index ee4b1676b03..119c8904c73 100644 --- a/media2/player/src/main/java/androidx/media2/player/exoplayer/ExoPlayerMediaPlayer2Impl.java +++ b/media2/player/src/main/java/androidx/media2/player/exoplayer/ExoPlayerMediaPlayer2Impl.java @@ -33,7 +33,6 @@ import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.annotation.RequiresApi; import androidx.annotation.RestrictTo; -import androidx.concurrent.futures.ResolvableFuture; import androidx.core.util.ObjectsCompat; import androidx.core.util.Preconditions; import androidx.media.AudioAttributesCompat; @@ -44,6 +43,7 @@ import androidx.media2.player.MediaPlayer2; import androidx.media2.player.MediaTimestamp; import androidx.media2.player.PlaybackParams; import androidx.media2.player.TimedMetaData; +import androidx.media2.player.futures.ResolvableFuture; import java.io.IOException; import java.util.ArrayDeque; diff --git a/media2/player/src/main/java/androidx/media2/player/futures/AbstractResolvableFuture.java b/media2/player/src/main/java/androidx/media2/player/futures/AbstractResolvableFuture.java new file mode 100644 index 00000000000..9711dc3979b --- /dev/null +++ b/media2/player/src/main/java/androidx/media2/player/futures/AbstractResolvableFuture.java @@ -0,0 +1,1206 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.player.futures; + +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; + +import androidx.annotation.NonNull; +import androidx.annotation.Nullable; +import androidx.annotation.RestrictTo; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Locale; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An AndroidX version of Guava's {@code AbstractFuture}. + * <p> + * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. A more + * common ways to create a {@code ListenableFuture} is to instantiate {@link ResolvableFuture}. + * + * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way + * to set the result of the computation through the protected methods {@link #set(Object)}, {@link + * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override + * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses + * should rarely override other methods. + * + * @author Sven Mawson + * @author Luke Sandberg + * @hide + */ +// TODO(b/119308748): Implement InternalFutureFailureAccess +@SuppressWarnings("ShortCircuitBoolean") // we use non-short circuiting comparisons intentionally +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +public abstract class AbstractResolvableFuture<V> implements ListenableFuture<V> { + + // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static final boolean GENERATE_CANCELLATION_CAUSES = + Boolean.parseBoolean( + System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); + + // Logger to log exceptions caught when running listeners. + private static final Logger log = Logger.getLogger( + AbstractResolvableFuture.class.getName()); + + // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of + // blocking. This value is what AbstractQueuedSynchronizer uses. + private static final long SPIN_THRESHOLD_NANOS = 1000L; + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static final AtomicHelper ATOMIC_HELPER; + + static { + AtomicHelper helper; + Throwable thrownAtomicReferenceFieldUpdaterFailure = null; + + // The access control checks that ARFU does means the caller class has to be + // AbstractFuture instead of SafeAtomicHelper, so we annoyingly define these here + try { + helper = + new SafeAtomicHelper( + newUpdater(Waiter.class, Thread.class, "thread"), + newUpdater(Waiter.class, Waiter.class, "next"), + newUpdater(AbstractResolvableFuture.class, Waiter.class, "waiters"), + newUpdater( + AbstractResolvableFuture.class, + Listener.class, + "listeners"), + newUpdater(AbstractResolvableFuture.class, Object.class, "value")); + } catch (Throwable atomicReferenceFieldUpdaterFailure) { + // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause + // getDeclaredField to throw a NoSuchFieldException when the field is definitely + // there. For these users fallback to a suboptimal implementation, + // based on synchronized. This will be a definite performance hit to those users. + thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; + helper = new SynchronizedHelper(); + } + + ATOMIC_HELPER = helper; + + // Prevent rare disastrous classloading in first call to LockSupport.park. + // See: https://bugs.openjdk.java.net/browse/JDK-8074773 + @SuppressWarnings("unused") + Class<?> ensureLoaded = LockSupport.class; + + // Log after all static init is finished; if an installed logger uses any Futures + // methods, it shouldn't break in cases where reflection is missing/broken. + if (thrownAtomicReferenceFieldUpdaterFailure != null) { + log.log(Level.SEVERE, "SafeAtomicHelper is broken!", + thrownAtomicReferenceFieldUpdaterFailure); + } + } + + /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ + private static final class Waiter { + static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); + + @Nullable + volatile Thread thread; + @Nullable + volatile Waiter next; + + /** + * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded + * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. + */ + Waiter(boolean unused) { + } + + Waiter() { + // avoid volatile write, write is made visible by subsequent CAS on waiters field + ATOMIC_HELPER.putThread(this, Thread.currentThread()); + } + + // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters + // field. + void setNext(Waiter next) { + ATOMIC_HELPER.putNext(this, next); + } + + void unpark() { + // This is racy with removeWaiter. The consequence of the race is that we may + // spuriously call unpark even though the thread has already removed itself + // from the list. But even if we did use a CAS, that race would still exist + // (it would just be ever so slightly smaller). + Thread w = thread; + if (w != null) { + thread = null; + LockSupport.unpark(w); + } + } + } + + /** + * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted + * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are + * saved by two things. + * + * <ul> + * <li>This is only called when a waiting thread times out or is interrupted. Both of which + * should be rare. + * <li>The waiters list should be very short. + * </ul> + */ + private void removeWaiter(Waiter node) { + node.thread = null; // mark as 'deleted' + restart: + while (true) { + Waiter pred = null; + Waiter curr = waiters; + if (curr == Waiter.TOMBSTONE) { + return; // give up if someone is calling complete + } + Waiter succ; + while (curr != null) { + succ = curr.next; + if (curr.thread != null) { // we aren't unlinking this node, update pred. + pred = curr; + } else if (pred != null) { // We are unlinking this node and it has a predecessor. + pred.next = succ; + if (pred.thread == null) { + // We raced with another node that unlinked pred. Restart. + continue restart; + } + } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { + // We are unlinking head + continue restart; // We raced with an add or complete + } + curr = succ; + } + break; + } + } + + /** Listeners also form a stack through the {@link #listeners} field. */ + private static final class Listener { + static final Listener TOMBSTONE = new Listener(null, null); + final Runnable task; + final Executor executor; + + // writes to next are made visible by subsequent CAS's on the listeners field + @Nullable + Listener next; + + Listener(Runnable task, Executor executor) { + this.task = task; + this.executor = executor; + } + } + + /** A special value to represent {@code null}. */ + private static final Object NULL = new Object(); + + /** A special value to represent failure, when {@link #setException} is called successfully. */ + private static final class Failure { + static final Failure FALLBACK_INSTANCE = + new Failure( + new Throwable("Failure occurred while trying to finish a future.") { + @Override + public synchronized Throwable fillInStackTrace() { + return this; // no stack trace + } + }); + final Throwable exception; + + Failure(Throwable exception) { + this.exception = checkNotNull(exception); + } + } + + /** A special value to represent cancellation and the 'wasInterrupted' bit. */ + private static final class Cancellation { + // constants to use when GENERATE_CANCELLATION_CAUSES = false + static final Cancellation CAUSELESS_INTERRUPTED; + static final Cancellation CAUSELESS_CANCELLED; + + static { + if (GENERATE_CANCELLATION_CAUSES) { + CAUSELESS_CANCELLED = null; + CAUSELESS_INTERRUPTED = null; + } else { + CAUSELESS_CANCELLED = new Cancellation(false, null); + CAUSELESS_INTERRUPTED = new Cancellation(true, null); + } + } + + final boolean wasInterrupted; + @Nullable + final Throwable cause; + + Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { + this.wasInterrupted = wasInterrupted; + this.cause = cause; + } + } + + /** A special value that encodes the 'setFuture' state. */ + private static final class SetFuture<V> implements Runnable { + final AbstractResolvableFuture<V> owner; + final ListenableFuture<? extends V> future; + + SetFuture(AbstractResolvableFuture<V> owner, ListenableFuture<? extends V> future) { + this.owner = owner; + this.future = future; + } + + @Override + public void run() { + if (owner.value != this) { + // nothing to do, we must have been cancelled, don't bother inspecting the future. + return; + } + Object valueToSet = getFutureValue(future); + if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { + complete(owner); + } + } + } + + // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is + // available. + /** + * This field encodes the current state of the future. + * + * <p>The valid values are: + * + * <ul> + * <li>{@code null} initial state, nothing has happened. + * <li>{@link Cancellation} terminal state, {@code cancel} was called. + * <li>{@link Failure} terminal state, {@code setException} was called. + * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. + * <li>{@link #NULL} terminal state, {@code set(null)} was called. + * <li>Any other non-null value, terminal state, {@code set} was called with a non-null + * argument. + * </ul> + */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Object value; + + /** All listeners. */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Listener listeners; + + /** All waiting threads. */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Waiter waiters; + + /** Constructor for use by subclasses. */ + protected AbstractResolvableFuture() { + } + + // Gets and Timed Gets + // + // * Be responsive to interruption + // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the + // waiters field. + // * Future completion is defined by when #value becomes non-null/non SetFuture + // * Future completion can be observed if the waiters field contains a TOMBSTONE + + // Timed Get + // There are a few design constraints to consider + // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I + // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the + // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of + // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for + // similar purposes. + // * We want to behave reasonably for timeouts of 0 + // * We are more responsive to completion than timeouts. This is because parkNanos depends on + // system scheduling and as such we could either miss our deadline, or unpark() could be + // delayed so that it looks like we timed out even though we didn't. For comparison FutureTask + // respects completion preferably and AQS is non-deterministic (depends on where in the queue + // the waiter is). If we wanted to be strict about it, we could store the unpark() time in + // the Waiter node and we could use that to make a decision about whether or not we timed out + // prior to being unparked. + + /** + * {@inheritDoc} + * + * <p>The default {@link AbstractResolvableFuture} implementation throws + * {@code InterruptedException} if the current thread is interrupted during the call, even if + * the value is already available. + * + * @throws CancellationException {@inheritDoc} + */ + @Override + public final V get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException { + // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) + // loop at the bottom and throw a timeoutexception. + // we rely on the implicit null check on unit. + final long timeoutNanos = unit.toNanos(timeout); + long remainingNanos = timeoutNanos; + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + // we delay calling nanoTime until we know we will need to either park or spin + final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; + long_wait_loop: + if (remainingNanos >= SPIN_THRESHOLD_NANOS) { + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + while (true) { + LockSupport.parkNanos(this, remainingNanos); + // Check interruption first, if we woke up due to interruption we + // need to honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + + // Otherwise re-read and check doneness. If we loop then it must have + // been a spurious wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + + // timed out? + remainingNanos = endNanos - System.nanoTime(); + if (remainingNanos < SPIN_THRESHOLD_NANOS) { + // Remove the waiter, one way or another we are done parking this + // thread. + removeWaiter(node); + break long_wait_loop; // jump down to the busy wait loop + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE while trying + // to add a waiter. + return getDoneValue(value); + } + // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node + // on the waiters list + while (remainingNanos > 0) { + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + if (Thread.interrupted()) { + throw new InterruptedException(); + } + remainingNanos = endNanos - System.nanoTime(); + } + + String futureToString = toString(); + final String unitString = unit.toString().toLowerCase(Locale.ROOT); + String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); + // Only report scheduling delay if larger than our spin threshold - otherwise it's just + // noise + if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { + // We over-waited for our timeout. + message += " (plus "; + long overWaitNanos = -remainingNanos; + long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); + long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); + boolean shouldShowExtraNanos = + overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; + if (overWaitUnits > 0) { + message += overWaitUnits + " " + unitString; + if (shouldShowExtraNanos) { + message += ","; + } + message += " "; + } + if (shouldShowExtraNanos) { + message += overWaitLeftoverNanos + " nanoseconds "; + } + + message += "delay)"; + } + // It's confusing to see a completed future in a timeout message; if isDone() returns false, + // then we know it must have given a pending toString value earlier. If not, then the future + // completed after the timeout expired, and the message might be success. + if (isDone()) { + throw new TimeoutException(message + " but future completed as timeout expired"); + } + throw new TimeoutException(message + " for " + futureToString); + } + + /** + * {@inheritDoc} + * + * <p>The default {@link AbstractResolvableFuture} implementation throws + * {@code InterruptedException} if the current thread is interrupted during the call, even if + * the value is already available. + * + * @throws CancellationException {@inheritDoc} + */ + @Override + public final V get() throws InterruptedException, ExecutionException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + // we are on the stack, now wait for completion. + while (true) { + LockSupport.park(this); + // Check interruption first, if we woke up due to interruption we need to + // honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + // Otherwise re-read and check doneness. If we loop then it must have + // been a spurious + // wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE while trying to + // add a waiter. + return getDoneValue(value); + } + + /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ + private V getDoneValue(Object obj) throws ExecutionException { + // While this seems like it might be too branch-y, simple benchmarking proves it to be + // unmeasurable (comparing done AbstractFutures with immediateFuture) + if (obj instanceof Cancellation) { + throw cancellationExceptionWithCause( + "Task was cancelled.", + ((Cancellation) obj).cause); + } else if (obj instanceof Failure) { + throw new ExecutionException(((Failure) obj).exception); + } else if (obj == NULL) { + return null; + } else { + @SuppressWarnings("unchecked") // this is the only other option + V asV = (V) obj; + return asV; + } + } + + @Override + public final boolean isDone() { + final Object localValue = value; + return localValue != null & !(localValue instanceof SetFuture); + } + + @Override + public final boolean isCancelled() { + final Object localValue = value; + return localValue instanceof Cancellation; + } + + /** + * {@inheritDoc} + * + * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been + * {@linkplain #setFuture set asynchronously}, then the cancellation will also be propagated + * to the delegate {@code Future} that was supplied in the {@code setFuture} call. + * + * <p>Rather than override this method to perform additional cancellation work or cleanup, + * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link + * #wasInterrupted} as necessary. This ensures that the work is done even if the future is + * cancelled without a call to {@code cancel}, such as by calling {@code + * setFuture(cancelledFuture)}. + */ + @Override + public final boolean cancel(boolean mayInterruptIfRunning) { + Object localValue = value; + boolean rValue = false; + if (localValue == null | localValue instanceof SetFuture) { + // Try to delay allocating the exception. At this point we may still lose the CAS, + // but it is certainly less likely. + Object valueToSet = + GENERATE_CANCELLATION_CAUSES + ? new Cancellation( + mayInterruptIfRunning, + new CancellationException("Future.cancel() was called.")) + : (mayInterruptIfRunning + ? Cancellation.CAUSELESS_INTERRUPTED + : Cancellation.CAUSELESS_CANCELLED); + AbstractResolvableFuture<?> abstractFuture = this; + while (true) { + if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { + rValue = true; + // We call interuptTask before calling complete(), which is consistent with + // FutureTask + if (mayInterruptIfRunning) { + abstractFuture.interruptTask(); + } + complete(abstractFuture); + if (localValue instanceof SetFuture) { + // propagate cancellation to the future set in setfuture, this is racy, + // and we don't + // care if we are successful or not. + ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; + if (futureToPropagateTo instanceof AbstractResolvableFuture) { + // If the future is a trusted then we specifically avoid + // calling cancel() this has 2 benefits + // 1. for long chains of futures strung together with setFuture we + // consume less stack + // 2. we avoid allocating Cancellation objects at every level of the + // cancellation chain + // We can only do this for TrustedFuture, because TrustedFuture + // .cancel is final and does nothing but delegate to this method. + AbstractResolvableFuture<?> trusted = + (AbstractResolvableFuture<?>) futureToPropagateTo; + localValue = trusted.value; + if (localValue == null | localValue instanceof SetFuture) { + abstractFuture = trusted; + continue; // loop back up and try to complete the new future + } + } else { + // not a TrustedFuture, call cancel directly. + futureToPropagateTo.cancel(mayInterruptIfRunning); + } + } + break; + } + // obj changed, reread + localValue = abstractFuture.value; + if (!(localValue instanceof SetFuture)) { + // obj cannot be null at this point, because value can only change from null + // to non-null. So if value changed (and it did since we lost the CAS), + // then it cannot be null and since it isn't a SetFuture, then the future must + // be done and we should exit the loop + break; + } + } + } + return rValue; + } + + /** + * Subclasses can override this method to implement interruption of the future's computation. + * The method is invoked automatically by a successful call to + * {@link #cancel(boolean) cancel(true)}. + * + * <p>The default implementation does nothing. + * + * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking + * {@link #wasInterrupted} to decide whether to interrupt your task. + * + * @since 10.0 + */ + protected void interruptTask() { + } + + /** + * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code + * true}. + * + * @since 14.0 + */ + protected final boolean wasInterrupted() { + final Object localValue = value; + return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; + } + + /** + * {@inheritDoc} + * + * @since 10.0 + */ + @Override + public final void addListener(Runnable listener, Executor executor) { + checkNotNull(listener); + checkNotNull(executor); + Listener oldHead = listeners; + if (oldHead != Listener.TOMBSTONE) { + Listener newNode = new Listener(listener, executor); + do { + newNode.next = oldHead; + if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { + return; + } + oldHead = listeners; // re-read + } while (oldHead != Listener.TOMBSTONE); + } + // If we get here then the Listener TOMBSTONE was set, which means the future is done, call + // the listener. + executeListener(listener, executor); + } + + /** + * Sets the result of this {@code Future} unless this {@code Future} has already been + * cancelled or set (including {@linkplain #setFuture set asynchronously}). + * When a call to this method returns, the {@code Future} is guaranteed to be + * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which case it returns + * {@code true}). If it returns {@code false}, the {@code Future} may have previously been set + * asynchronously, in which case its result may not be known yet. That result, + * though not yet known, cannot be overridden by a call to a {@code set*} method, + * only by a call to {@link #cancel}. + * + * @param value the value to be used as the result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean set(@Nullable V value) { + Object valueToSet = value == null ? NULL : value; + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the failed result of this {@code Future} unless this {@code Future} has already been + * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this + * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only + * if</b> + * the call was accepted (in which case it returns {@code true}). If it returns {@code + * false}, the + * {@code Future} may have previously been set asynchronously, in which case its result may + * not be + * known yet. That result, though not yet known, cannot be overridden by a call to a {@code + * set*} + * method, only by a call to {@link #cancel}. + * + * @param throwable the exception to be used as the failed result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean setException(Throwable throwable) { + Object valueToSet = new Failure(checkNotNull(throwable)); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the result of this {@code Future} to match the supplied input {@code Future} once the + * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set + * (including "set asynchronously," defined below). + * + * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the + * call is accepted, then this future is guaranteed to have been completed with the supplied + * future by the time this method returns. If the supplied future is not done and the call + * is accepted, then the future will be <i>set asynchronously</i>. Note that such a result, + * though not yet known, cannot be overridden by a call to a {@code set*} method, + * only by a call to {@link #cancel}. + * + * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later + * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to + * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code + * Future}. + * + * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, + * it will never trigger interruption behavior. In particular, it will not cause this future to + * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not + * return {@code true}. + * + * @param future the future to delegate to + * @return true if the attempt was accepted, indicating that the {@code Future} was not + * previously cancelled or set. + * @since 19.0 + */ + protected boolean setFuture(ListenableFuture<? extends V> future) { + checkNotNull(future); + Object localValue = value; + if (localValue == null) { + if (future.isDone()) { + Object value = getFutureValue(future); + if (ATOMIC_HELPER.casValue(this, null, value)) { + complete(this); + return true; + } + return false; + } + SetFuture valueToSet = new SetFuture<V>(this, future); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + // the listener is responsible for calling completeWithFuture, directExecutor is + // appropriate since all we are doing is unpacking a completed future + // which should be fast. + try { + future.addListener(valueToSet, DirectExecutor.INSTANCE); + } catch (Throwable t) { + // addListener has thrown an exception! SetFuture.run can't throw any + // exceptions so this must have been caused by addListener itself. + // The most likely explanation is a misconfigured mock. + // Try to switch to Failure. + Failure failure; + try { + failure = new Failure(t); + } catch (Throwable oomMostLikely) { + failure = Failure.FALLBACK_INSTANCE; + } + // Note: The only way this CAS could fail is if cancel() has raced with us. + // That is ok. + boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); + } + return true; + } + localValue = value; // we lost the cas, fall through and maybe cancel + } + // The future has already been set to something. If it is cancellation we should cancel the + // incoming future. + if (localValue instanceof Cancellation) { + // we don't care if it fails, this is best-effort. + future.cancel(((Cancellation) localValue).wasInterrupted); + } + return false; + } + + /** + * Returns a value that satisfies the contract of the {@link #value} field based on the state of + * given future. + * + * <p>This is approximately the inverse of {@link #getDoneValue(Object)} + */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static Object getFutureValue(ListenableFuture<?> future) { + if (future instanceof AbstractResolvableFuture) { + // Break encapsulation for TrustedFuture instances since we know that subclasses cannot + // override .get() (since it is final) and therefore this is equivalent to calling + // .get() and unpacking the exceptions like we do below (just much faster because it is + // a single field read instead of a read, several branches and possibly + // creating exceptions). + Object v = ((AbstractResolvableFuture<?>) future).value; + if (v instanceof Cancellation) { + // If the other future was interrupted, clear the interrupted bit while + // preserving the cause this will make it consistent with how non-trustedfutures + // work which cannot propagate the wasInterrupted bit + Cancellation c = (Cancellation) v; + if (c.wasInterrupted) { + v = c.cause != null ? new Cancellation(/* wasInterrupted= */ false, c.cause) + : Cancellation.CAUSELESS_CANCELLED; + } + } + return v; + } + boolean wasCancelled = future.isCancelled(); + // Don't allocate a CancellationException if it's not necessary + if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { + return Cancellation.CAUSELESS_CANCELLED; + } + // Otherwise calculate the value by calling .get() + try { + Object v = getUninterruptibly(future); + return v == null ? NULL : v; + } catch (ExecutionException exception) { + return new Failure(exception.getCause()); + } catch (CancellationException cancellation) { + if (!wasCancelled) { + return new Failure( + new IllegalArgumentException( + "get() threw CancellationException, despite reporting isCancelled" + + "() == false: " + + future, + cancellation)); + } + return new Cancellation(false, cancellation); + } catch (Throwable t) { + return new Failure(t); + } + } + + /** + * internal dependency on other /util/concurrent classes. + */ + private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { + boolean interrupted = false; + try { + while (true) { + try { + return future.get(); + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** Unblocks all threads and runs all listeners. */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static void complete(AbstractResolvableFuture<?> future) { + Listener next = null; + outer: + while (true) { + future.releaseWaiters(); + // We call this before the listeners in order to avoid needing to manage a separate + // stack data structure for them. Also, some implementations rely on this running + // prior to listeners so that the cleanup work is visible to listeners. + // afterDone() should be generally fast and only used for cleanup work... but in + // theory can also be recursive and create StackOverflowErrors + future.afterDone(); + // push the current set of listeners onto next + next = future.clearListeners(next); + future = null; + while (next != null) { + Listener curr = next; + next = next.next; + Runnable task = curr.task; + if (task instanceof SetFuture) { + SetFuture<?> setFuture = (SetFuture<?>) task; + // We unwind setFuture specifically to avoid StackOverflowErrors in the case + // of long chains of SetFutures + // Handling this special case is important because there is no way to pass an + // executor to setFuture, so a user couldn't break the chain by doing this + // themselves. It is also potentially common if someone writes a recursive + // Futures.transformAsync transformer. + future = setFuture.owner; + if (future.value == setFuture) { + Object valueToSet = getFutureValue(setFuture.future); + if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { + continue outer; + } + } + // other wise the future we were trying to set is already done. + } else { + executeListener(task, curr.executor); + } + } + break; + } + } + + /** + * Callback method that is called exactly once after the future is completed. + * + * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. + * + * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is + * intended for very lightweight cleanup work, for example, timing statistics or clearing + * fields. + * If your task does anything heavier consider, just using a listener with an executor. + * + * @since 20.0 + */ + protected void afterDone() { + } + + /** + * If this future has been cancelled (and possibly interrupted), cancels (and possibly + * interrupts) the given future (if available). + */ + @SuppressWarnings("ParameterNotNullable") + final void maybePropagateCancellationTo(@Nullable Future<?> related) { + if (related != null & isCancelled()) { + related.cancel(wasInterrupted()); + } + } + + /** Releases all threads in the {@link #waiters} list, and clears the list. */ + private void releaseWaiters() { + Waiter head; + do { + head = waiters; + } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); + for (Waiter currentWaiter = head; currentWaiter != null; + currentWaiter = currentWaiter.next) { + currentWaiter.unpark(); + } + } + + /** + * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently + * added first. + */ + private Listener clearListeners(Listener onto) { + // We need to + // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to + // to synchronize with us + // 2. reverse the linked list, because despite our rather clear contract, people depend + // on us executing listeners in the order they were added + // 3. push all the items onto 'onto' and return the new head of the stack + Listener head; + do { + head = listeners; + } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); + Listener reversedList = onto; + while (head != null) { + Listener tmp = head; + head = head.next; + tmp.next = reversedList; + reversedList = tmp; + } + return reversedList; + } + + // TODO(clm): move parts into a default method on ListenableFuture? + @Override + public String toString() { + StringBuilder builder = new StringBuilder().append(super.toString()).append("[status="); + if (isCancelled()) { + builder.append("CANCELLED"); + } else if (isDone()) { + addDoneString(builder); + } else { + String pendingDescription; + try { + pendingDescription = pendingToString(); + } catch (RuntimeException e) { + // Don't call getMessage or toString() on the exception, in case the exception + // thrown by the subclass is implemented with bugs similar to the subclass. + pendingDescription = "Exception thrown from implementation: " + e.getClass(); + } + // The future may complete during or before the call to getPendingToString, so we use + // null as a signal that we should try checking if the future is done again. + if (pendingDescription != null && !pendingDescription.isEmpty()) { + builder.append("PENDING, info=[").append(pendingDescription).append("]"); + } else if (isDone()) { + addDoneString(builder); + } else { + builder.append("PENDING"); + } + } + return builder.append("]").toString(); + } + + /** + * Provide a human-readable explanation of why this future has not yet completed. + * + * @return null if an explanation cannot be provided because the future is done. + * @since 23.0 + */ + @Nullable + protected String pendingToString() { + Object localValue = value; + if (localValue instanceof SetFuture) { + return "setFuture=[" + userObjectToString(((SetFuture) localValue).future) + "]"; + } else if (this instanceof ScheduledFuture) { + return "remaining delay=[" + + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) + + " ms]"; + } + return null; + } + + private void addDoneString(StringBuilder builder) { + try { + V value = getUninterruptibly(this); + builder.append("SUCCESS, result=[").append(userObjectToString(value)).append("]"); + } catch (ExecutionException e) { + builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); + } catch (CancellationException e) { + builder.append("CANCELLED"); // shouldn't be reachable + } catch (RuntimeException e) { + builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); + } + } + + /** Helper for printing user supplied objects into our toString method. */ + private String userObjectToString(Object o) { + // This is some basic recursion detection for when people create cycles via set/setFuture + // This is however only partial protection though since it only detects self loops. We + // could detect arbitrary cycles using a thread local or possibly by catching + // StackOverflowExceptions but this should be a good enough solution + // (it is also what jdk collections do in these cases) + if (o == this) { + return "this future"; + } + return String.valueOf(o); + } + + /** + * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain + * RuntimeException runtime exceptions} thrown by the executor. + */ + private static void executeListener(Runnable runnable, Executor executor) { + try { + executor.execute(runnable); + } catch (RuntimeException e) { + // Log it and keep going -- bad runnable and/or executor. Don't punish the other + // runnables if we're given a bad one. We only catch RuntimeException + // because we want Errors to propagate up. + log.log( + Level.SEVERE, + "RuntimeException while executing runnable " + runnable + " with executor " + + executor, + e); + } + } + + private abstract static class AtomicHelper { + /** Non volatile write of the thread to the {@link Waiter#thread} field. */ + abstract void putThread(Waiter waiter, Thread newValue); + + /** Non volatile write of the waiter to the {@link Waiter#next} field. */ + abstract void putNext(Waiter waiter, Waiter newValue); + + /** Performs a CAS operation on the {@link #waiters} field. */ + abstract boolean casWaiters( + AbstractResolvableFuture<?> future, + Waiter expect, + Waiter update); + + /** Performs a CAS operation on the {@link #listeners} field. */ + abstract boolean casListeners( + AbstractResolvableFuture<?> future, + Listener expect, + Listener update); + + /** Performs a CAS operation on the {@link #value} field. */ + abstract boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update); + } + + /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ + private static final class SafeAtomicHelper extends AtomicHelper { + final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; + final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Waiter> waitersUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Listener> listenersUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Object> valueUpdater; + + SafeAtomicHelper( + AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, + AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Waiter> waitersUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Listener> listenersUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Object> valueUpdater) { + this.waiterThreadUpdater = waiterThreadUpdater; + this.waiterNextUpdater = waiterNextUpdater; + this.waitersUpdater = waitersUpdater; + this.listenersUpdater = listenersUpdater; + this.valueUpdater = valueUpdater; + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + waiterThreadUpdater.lazySet(waiter, newValue); + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiterNextUpdater.lazySet(waiter, newValue); + } + + @Override + boolean casWaiters(AbstractResolvableFuture<?> future, Waiter expect, Waiter update) { + return waitersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casListeners(AbstractResolvableFuture<?> future, Listener expect, Listener update) { + return listenersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update) { + return valueUpdater.compareAndSet(future, expect, update); + } + } + + /** + * {@link AtomicHelper} based on {@code synchronized} and volatile writes. + * + * <p>This is an implementation of last resort for when certain basic VM features are broken + * (like AtomicReferenceFieldUpdater). + */ + private static final class SynchronizedHelper extends AtomicHelper { + SynchronizedHelper() { + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + waiter.thread = newValue; + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiter.next = newValue; + } + + @Override + boolean casWaiters(AbstractResolvableFuture<?> future, Waiter expect, Waiter update) { + synchronized (future) { + if (future.waiters == expect) { + future.waiters = update; + return true; + } + return false; + } + } + + @Override + boolean casListeners(AbstractResolvableFuture<?> future, Listener expect, Listener update) { + synchronized (future) { + if (future.listeners == expect) { + future.listeners = update; + return true; + } + return false; + } + } + + @Override + boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update) { + synchronized (future) { + if (future.value == expect) { + future.value = update; + return true; + } + return false; + } + } + } + + private static CancellationException cancellationExceptionWithCause( + @Nullable String message, @Nullable Throwable cause) { + CancellationException exception = new CancellationException(message); + exception.initCause(cause); + return exception; + } + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + @NonNull + static <T> T checkNotNull(@Nullable T reference) { + if (reference == null) { + throw new NullPointerException(); + } + return reference; + } +} diff --git a/media2/player/src/main/java/androidx/media2/player/futures/DirectExecutor.java b/media2/player/src/main/java/androidx/media2/player/futures/DirectExecutor.java new file mode 100644 index 00000000000..23380bbfd52 --- /dev/null +++ b/media2/player/src/main/java/androidx/media2/player/futures/DirectExecutor.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.player.futures; + +import androidx.annotation.RestrictTo; + +import java.util.concurrent.Executor; + +/** + * An {@link Executor} that runs each task in the thread that invokes {@link Executor#execute + * execute}. + * @hide + */ +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public String toString() { + return "DirectExecutor"; + } +}
\ No newline at end of file diff --git a/media2/player/src/main/java/androidx/media2/player/futures/ResolvableFuture.java b/media2/player/src/main/java/androidx/media2/player/futures/ResolvableFuture.java new file mode 100644 index 00000000000..6d3cf0aebbd --- /dev/null +++ b/media2/player/src/main/java/androidx/media2/player/futures/ResolvableFuture.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.player.futures; + + +import androidx.annotation.Nullable; +import androidx.annotation.RestrictTo; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * An AndroidX version of Guava's {@code SettableFuture}. + * <p> + * A {@link ListenableFuture} whose result can be set by a {@link #set(Object)}, {@link + * #setException(Throwable)} or {@link #setFuture(ListenableFuture)} call. It can also, like any + * other {@code Future}, be {@linkplain #cancel cancelled}. + * <p> + * If your needs are more complex than {@code ResolvableFuture} supports, use {@link + * AbstractResolvableFuture}, which offers an extensible version of the API. + * + * @hide + * @author Sven Mawson + */ +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +public final class ResolvableFuture<V> extends AbstractResolvableFuture<V> { + /** + * Creates a new {@code ResolvableFuture} that can be completed or cancelled by a later method + * call. + */ + public static <V> ResolvableFuture<V> create() { + return new ResolvableFuture<>(); + } + + @Override + public boolean set(@Nullable V value) { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + + @Override + public boolean setFuture(ListenableFuture<? extends V> future) { + return super.setFuture(future); + } + + private ResolvableFuture() { + } +} + diff --git a/media2/session/build.gradle b/media2/session/build.gradle index c9b77b7b050..8ee4f94c7ee 100644 --- a/media2/session/build.gradle +++ b/media2/session/build.gradle @@ -13,7 +13,8 @@ dependencies { api(project(":media")) api(project(":media2:media2-common")) api(GUAVA_LISTENABLE_FUTURE) - implementation(project(":concurrent:concurrent-futures")) + // Temporarily removed to unblock stable release. + //implementation(project(":concurrent:concurrent-futures")) implementation("androidx.collection:collection:1.0.0") compileOnly(CHECKER_FRAMEWORK) diff --git a/media2/session/src/main/java/androidx/media2/session/LibraryResult.java b/media2/session/src/main/java/androidx/media2/session/LibraryResult.java index c1dd2d4f6e9..12823162f1e 100644 --- a/media2/session/src/main/java/androidx/media2/session/LibraryResult.java +++ b/media2/session/src/main/java/androidx/media2/session/LibraryResult.java @@ -24,9 +24,9 @@ import android.os.SystemClock; import androidx.annotation.IntDef; import androidx.annotation.Nullable; import androidx.annotation.RestrictTo; -import androidx.concurrent.futures.ResolvableFuture; import androidx.media2.common.MediaItem; import androidx.media2.common.ParcelImplListSlice; +import androidx.media2.session.futures.ResolvableFuture; import androidx.versionedparcelable.CustomVersionedParcelable; import androidx.versionedparcelable.NonParcelField; import androidx.versionedparcelable.ParcelField; diff --git a/media2/session/src/main/java/androidx/media2/session/MediaBrowserImplLegacy.java b/media2/session/src/main/java/androidx/media2/session/MediaBrowserImplLegacy.java index 0b0360449b4..0e5f877905a 100644 --- a/media2/session/src/main/java/androidx/media2/session/MediaBrowserImplLegacy.java +++ b/media2/session/src/main/java/androidx/media2/session/MediaBrowserImplLegacy.java @@ -36,12 +36,12 @@ import android.util.Log; import androidx.annotation.GuardedBy; import androidx.annotation.NonNull; import androidx.annotation.Nullable; -import androidx.concurrent.futures.ResolvableFuture; import androidx.media2.common.MediaItem; import androidx.media2.common.MediaMetadata; import androidx.media2.session.MediaBrowser.BrowserCallback; import androidx.media2.session.MediaBrowser.BrowserCallbackRunnable; import androidx.media2.session.MediaLibraryService.LibraryParams; +import androidx.media2.session.futures.ResolvableFuture; import com.google.common.util.concurrent.ListenableFuture; diff --git a/media2/session/src/main/java/androidx/media2/session/MediaControllerImplLegacy.java b/media2/session/src/main/java/androidx/media2/session/MediaControllerImplLegacy.java index 8fe0dd052c7..8f0aeb4a3bb 100644 --- a/media2/session/src/main/java/androidx/media2/session/MediaControllerImplLegacy.java +++ b/media2/session/src/main/java/androidx/media2/session/MediaControllerImplLegacy.java @@ -57,7 +57,6 @@ import android.view.Surface; import androidx.annotation.GuardedBy; import androidx.annotation.NonNull; import androidx.annotation.Nullable; -import androidx.concurrent.futures.ResolvableFuture; import androidx.core.app.BundleCompat; import androidx.core.util.ObjectsCompat; import androidx.media2.common.MediaItem; @@ -75,6 +74,7 @@ import androidx.media2.session.MediaController.PlaybackInfo; import androidx.media2.session.MediaController.VolumeDirection; import androidx.media2.session.MediaController.VolumeFlags; import androidx.media2.session.MediaSession.CommandButton; +import androidx.media2.session.futures.ResolvableFuture; import com.google.common.util.concurrent.ListenableFuture; diff --git a/media2/session/src/main/java/androidx/media2/session/MediaSessionImplBase.java b/media2/session/src/main/java/androidx/media2/session/MediaSessionImplBase.java index b35e4631b8c..9a471ef7504 100644 --- a/media2/session/src/main/java/androidx/media2/session/MediaSessionImplBase.java +++ b/media2/session/src/main/java/androidx/media2/session/MediaSessionImplBase.java @@ -59,8 +59,6 @@ import android.view.Surface; import androidx.annotation.GuardedBy; import androidx.annotation.NonNull; import androidx.annotation.Nullable; -import androidx.concurrent.futures.AbstractResolvableFuture; -import androidx.concurrent.futures.ResolvableFuture; import androidx.core.util.ObjectsCompat; import androidx.media.AudioAttributesCompat; import androidx.media.MediaBrowserServiceCompat; @@ -77,6 +75,8 @@ import androidx.media2.session.MediaSession.ControllerCb; import androidx.media2.session.MediaSession.ControllerInfo; import androidx.media2.session.MediaSession.SessionCallback; import androidx.media2.session.SequencedFutureManager.SequencedFuture; +import androidx.media2.session.futures.ResolvableFuture; +import androidx.media2.session.futures.AbstractResolvableFuture; import com.google.common.util.concurrent.ListenableFuture; diff --git a/media2/session/src/main/java/androidx/media2/session/SequencedFutureManager.java b/media2/session/src/main/java/androidx/media2/session/SequencedFutureManager.java index 464153cd3cf..12316cac880 100644 --- a/media2/session/src/main/java/androidx/media2/session/SequencedFutureManager.java +++ b/media2/session/src/main/java/androidx/media2/session/SequencedFutureManager.java @@ -22,7 +22,7 @@ import androidx.annotation.GuardedBy; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.collection.ArrayMap; -import androidx.concurrent.futures.AbstractResolvableFuture; +import androidx.media2.session.futures.AbstractResolvableFuture; import com.google.common.util.concurrent.ListenableFuture; diff --git a/media2/session/src/main/java/androidx/media2/session/SessionResult.java b/media2/session/src/main/java/androidx/media2/session/SessionResult.java index e9a8301d963..a327d7e35a8 100644 --- a/media2/session/src/main/java/androidx/media2/session/SessionResult.java +++ b/media2/session/src/main/java/androidx/media2/session/SessionResult.java @@ -26,9 +26,9 @@ import android.support.v4.media.session.MediaSessionCompat; import androidx.annotation.IntDef; import androidx.annotation.Nullable; import androidx.annotation.RestrictTo; -import androidx.concurrent.futures.ResolvableFuture; import androidx.media2.common.MediaItem; import androidx.media2.common.SessionPlayer; +import androidx.media2.session.futures.ResolvableFuture; import androidx.versionedparcelable.ParcelField; import androidx.versionedparcelable.VersionedParcelable; import androidx.versionedparcelable.VersionedParcelize; diff --git a/media2/session/src/main/java/androidx/media2/session/futures/AbstractResolvableFuture.java b/media2/session/src/main/java/androidx/media2/session/futures/AbstractResolvableFuture.java new file mode 100644 index 00000000000..217c54c25ef --- /dev/null +++ b/media2/session/src/main/java/androidx/media2/session/futures/AbstractResolvableFuture.java @@ -0,0 +1,1206 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.session.futures; + +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; + +import androidx.annotation.NonNull; +import androidx.annotation.Nullable; +import androidx.annotation.RestrictTo; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Locale; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.LockSupport; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An AndroidX version of Guava's {@code AbstractFuture}. + * <p> + * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. A more + * common ways to create a {@code ListenableFuture} is to instantiate {@link ResolvableFuture}. + * + * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way + * to set the result of the computation through the protected methods {@link #set(Object)}, {@link + * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override + * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses + * should rarely override other methods. + * + * @author Sven Mawson + * @author Luke Sandberg + * @hide + */ +// TODO(b/119308748): Implement InternalFutureFailureAccess +@SuppressWarnings("ShortCircuitBoolean") // we use non-short circuiting comparisons intentionally +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +public abstract class AbstractResolvableFuture<V> implements ListenableFuture<V> { + + // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static final boolean GENERATE_CANCELLATION_CAUSES = + Boolean.parseBoolean( + System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); + + // Logger to log exceptions caught when running listeners. + private static final Logger log = Logger.getLogger( + AbstractResolvableFuture.class.getName()); + + // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of + // blocking. This value is what AbstractQueuedSynchronizer uses. + private static final long SPIN_THRESHOLD_NANOS = 1000L; + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static final AtomicHelper ATOMIC_HELPER; + + static { + AtomicHelper helper; + Throwable thrownAtomicReferenceFieldUpdaterFailure = null; + + // The access control checks that ARFU does means the caller class has to be + // AbstractFuture instead of SafeAtomicHelper, so we annoyingly define these here + try { + helper = + new SafeAtomicHelper( + newUpdater(Waiter.class, Thread.class, "thread"), + newUpdater(Waiter.class, Waiter.class, "next"), + newUpdater(AbstractResolvableFuture.class, Waiter.class, "waiters"), + newUpdater( + AbstractResolvableFuture.class, + Listener.class, + "listeners"), + newUpdater(AbstractResolvableFuture.class, Object.class, "value")); + } catch (Throwable atomicReferenceFieldUpdaterFailure) { + // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause + // getDeclaredField to throw a NoSuchFieldException when the field is definitely + // there. For these users fallback to a suboptimal implementation, + // based on synchronized. This will be a definite performance hit to those users. + thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; + helper = new SynchronizedHelper(); + } + + ATOMIC_HELPER = helper; + + // Prevent rare disastrous classloading in first call to LockSupport.park. + // See: https://bugs.openjdk.java.net/browse/JDK-8074773 + @SuppressWarnings("unused") + Class<?> ensureLoaded = LockSupport.class; + + // Log after all static init is finished; if an installed logger uses any Futures + // methods, it shouldn't break in cases where reflection is missing/broken. + if (thrownAtomicReferenceFieldUpdaterFailure != null) { + log.log(Level.SEVERE, "SafeAtomicHelper is broken!", + thrownAtomicReferenceFieldUpdaterFailure); + } + } + + /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ + private static final class Waiter { + static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); + + @Nullable + volatile Thread thread; + @Nullable + volatile Waiter next; + + /** + * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded + * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. + */ + Waiter(boolean unused) { + } + + Waiter() { + // avoid volatile write, write is made visible by subsequent CAS on waiters field + ATOMIC_HELPER.putThread(this, Thread.currentThread()); + } + + // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters + // field. + void setNext(Waiter next) { + ATOMIC_HELPER.putNext(this, next); + } + + void unpark() { + // This is racy with removeWaiter. The consequence of the race is that we may + // spuriously call unpark even though the thread has already removed itself + // from the list. But even if we did use a CAS, that race would still exist + // (it would just be ever so slightly smaller). + Thread w = thread; + if (w != null) { + thread = null; + LockSupport.unpark(w); + } + } + } + + /** + * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted + * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are + * saved by two things. + * + * <ul> + * <li>This is only called when a waiting thread times out or is interrupted. Both of which + * should be rare. + * <li>The waiters list should be very short. + * </ul> + */ + private void removeWaiter(Waiter node) { + node.thread = null; // mark as 'deleted' + restart: + while (true) { + Waiter pred = null; + Waiter curr = waiters; + if (curr == Waiter.TOMBSTONE) { + return; // give up if someone is calling complete + } + Waiter succ; + while (curr != null) { + succ = curr.next; + if (curr.thread != null) { // we aren't unlinking this node, update pred. + pred = curr; + } else if (pred != null) { // We are unlinking this node and it has a predecessor. + pred.next = succ; + if (pred.thread == null) { + // We raced with another node that unlinked pred. Restart. + continue restart; + } + } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { + // We are unlinking head + continue restart; // We raced with an add or complete + } + curr = succ; + } + break; + } + } + + /** Listeners also form a stack through the {@link #listeners} field. */ + private static final class Listener { + static final Listener TOMBSTONE = new Listener(null, null); + final Runnable task; + final Executor executor; + + // writes to next are made visible by subsequent CAS's on the listeners field + @Nullable + Listener next; + + Listener(Runnable task, Executor executor) { + this.task = task; + this.executor = executor; + } + } + + /** A special value to represent {@code null}. */ + private static final Object NULL = new Object(); + + /** A special value to represent failure, when {@link #setException} is called successfully. */ + private static final class Failure { + static final Failure FALLBACK_INSTANCE = + new Failure( + new Throwable("Failure occurred while trying to finish a future.") { + @Override + public synchronized Throwable fillInStackTrace() { + return this; // no stack trace + } + }); + final Throwable exception; + + Failure(Throwable exception) { + this.exception = checkNotNull(exception); + } + } + + /** A special value to represent cancellation and the 'wasInterrupted' bit. */ + private static final class Cancellation { + // constants to use when GENERATE_CANCELLATION_CAUSES = false + static final Cancellation CAUSELESS_INTERRUPTED; + static final Cancellation CAUSELESS_CANCELLED; + + static { + if (GENERATE_CANCELLATION_CAUSES) { + CAUSELESS_CANCELLED = null; + CAUSELESS_INTERRUPTED = null; + } else { + CAUSELESS_CANCELLED = new Cancellation(false, null); + CAUSELESS_INTERRUPTED = new Cancellation(true, null); + } + } + + final boolean wasInterrupted; + @Nullable + final Throwable cause; + + Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { + this.wasInterrupted = wasInterrupted; + this.cause = cause; + } + } + + /** A special value that encodes the 'setFuture' state. */ + private static final class SetFuture<V> implements Runnable { + final AbstractResolvableFuture<V> owner; + final ListenableFuture<? extends V> future; + + SetFuture(AbstractResolvableFuture<V> owner, ListenableFuture<? extends V> future) { + this.owner = owner; + this.future = future; + } + + @Override + public void run() { + if (owner.value != this) { + // nothing to do, we must have been cancelled, don't bother inspecting the future. + return; + } + Object valueToSet = getFutureValue(future); + if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { + complete(owner); + } + } + } + + // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is + // available. + /** + * This field encodes the current state of the future. + * + * <p>The valid values are: + * + * <ul> + * <li>{@code null} initial state, nothing has happened. + * <li>{@link Cancellation} terminal state, {@code cancel} was called. + * <li>{@link Failure} terminal state, {@code setException} was called. + * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. + * <li>{@link #NULL} terminal state, {@code set(null)} was called. + * <li>Any other non-null value, terminal state, {@code set} was called with a non-null + * argument. + * </ul> + */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Object value; + + /** All listeners. */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Listener listeners; + + /** All waiting threads. */ + @Nullable + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + volatile Waiter waiters; + + /** Constructor for use by subclasses. */ + protected AbstractResolvableFuture() { + } + + // Gets and Timed Gets + // + // * Be responsive to interruption + // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the + // waiters field. + // * Future completion is defined by when #value becomes non-null/non SetFuture + // * Future completion can be observed if the waiters field contains a TOMBSTONE + + // Timed Get + // There are a few design constraints to consider + // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I + // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the + // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of + // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for + // similar purposes. + // * We want to behave reasonably for timeouts of 0 + // * We are more responsive to completion than timeouts. This is because parkNanos depends on + // system scheduling and as such we could either miss our deadline, or unpark() could be + // delayed so that it looks like we timed out even though we didn't. For comparison FutureTask + // respects completion preferably and AQS is non-deterministic (depends on where in the queue + // the waiter is). If we wanted to be strict about it, we could store the unpark() time in + // the Waiter node and we could use that to make a decision about whether or not we timed out + // prior to being unparked. + + /** + * {@inheritDoc} + * + * <p>The default {@link AbstractResolvableFuture} implementation throws + * {@code InterruptedException} if the current thread is interrupted during the call, even if + * the value is already available. + * + * @throws CancellationException {@inheritDoc} + */ + @Override + public final V get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException, ExecutionException { + // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) + // loop at the bottom and throw a timeoutexception. + // we rely on the implicit null check on unit. + final long timeoutNanos = unit.toNanos(timeout); + long remainingNanos = timeoutNanos; + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + // we delay calling nanoTime until we know we will need to either park or spin + final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; + long_wait_loop: + if (remainingNanos >= SPIN_THRESHOLD_NANOS) { + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + while (true) { + LockSupport.parkNanos(this, remainingNanos); + // Check interruption first, if we woke up due to interruption we + // need to honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + + // Otherwise re-read and check doneness. If we loop then it must have + // been a spurious wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + + // timed out? + remainingNanos = endNanos - System.nanoTime(); + if (remainingNanos < SPIN_THRESHOLD_NANOS) { + // Remove the waiter, one way or another we are done parking this + // thread. + removeWaiter(node); + break long_wait_loop; // jump down to the busy wait loop + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE while trying + // to add a waiter. + return getDoneValue(value); + } + // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node + // on the waiters list + while (remainingNanos > 0) { + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + if (Thread.interrupted()) { + throw new InterruptedException(); + } + remainingNanos = endNanos - System.nanoTime(); + } + + String futureToString = toString(); + final String unitString = unit.toString().toLowerCase(Locale.ROOT); + String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); + // Only report scheduling delay if larger than our spin threshold - otherwise it's just + // noise + if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { + // We over-waited for our timeout. + message += " (plus "; + long overWaitNanos = -remainingNanos; + long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); + long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); + boolean shouldShowExtraNanos = + overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; + if (overWaitUnits > 0) { + message += overWaitUnits + " " + unitString; + if (shouldShowExtraNanos) { + message += ","; + } + message += " "; + } + if (shouldShowExtraNanos) { + message += overWaitLeftoverNanos + " nanoseconds "; + } + + message += "delay)"; + } + // It's confusing to see a completed future in a timeout message; if isDone() returns false, + // then we know it must have given a pending toString value earlier. If not, then the future + // completed after the timeout expired, and the message might be success. + if (isDone()) { + throw new TimeoutException(message + " but future completed as timeout expired"); + } + throw new TimeoutException(message + " for " + futureToString); + } + + /** + * {@inheritDoc} + * + * <p>The default {@link AbstractResolvableFuture} implementation throws + * {@code InterruptedException} if the current thread is interrupted during the call, even if + * the value is already available. + * + * @throws CancellationException {@inheritDoc} + */ + @Override + public final V get() throws InterruptedException, ExecutionException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + Object localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + Waiter oldHead = waiters; + if (oldHead != Waiter.TOMBSTONE) { + Waiter node = new Waiter(); + do { + node.setNext(oldHead); + if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { + // we are on the stack, now wait for completion. + while (true) { + LockSupport.park(this); + // Check interruption first, if we woke up due to interruption we need to + // honor that. + if (Thread.interrupted()) { + removeWaiter(node); + throw new InterruptedException(); + } + // Otherwise re-read and check doneness. If we loop then it must have + // been a spurious + // wakeup + localValue = value; + if (localValue != null & !(localValue instanceof SetFuture)) { + return getDoneValue(localValue); + } + } + } + oldHead = waiters; // re-read and loop. + } while (oldHead != Waiter.TOMBSTONE); + } + // re-read value, if we get here then we must have observed a TOMBSTONE while trying to + // add a waiter. + return getDoneValue(value); + } + + /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ + private V getDoneValue(Object obj) throws ExecutionException { + // While this seems like it might be too branch-y, simple benchmarking proves it to be + // unmeasurable (comparing done AbstractFutures with immediateFuture) + if (obj instanceof Cancellation) { + throw cancellationExceptionWithCause( + "Task was cancelled.", + ((Cancellation) obj).cause); + } else if (obj instanceof Failure) { + throw new ExecutionException(((Failure) obj).exception); + } else if (obj == NULL) { + return null; + } else { + @SuppressWarnings("unchecked") // this is the only other option + V asV = (V) obj; + return asV; + } + } + + @Override + public final boolean isDone() { + final Object localValue = value; + return localValue != null & !(localValue instanceof SetFuture); + } + + @Override + public final boolean isCancelled() { + final Object localValue = value; + return localValue instanceof Cancellation; + } + + /** + * {@inheritDoc} + * + * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been + * {@linkplain #setFuture set asynchronously}, then the cancellation will also be propagated + * to the delegate {@code Future} that was supplied in the {@code setFuture} call. + * + * <p>Rather than override this method to perform additional cancellation work or cleanup, + * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link + * #wasInterrupted} as necessary. This ensures that the work is done even if the future is + * cancelled without a call to {@code cancel}, such as by calling {@code + * setFuture(cancelledFuture)}. + */ + @Override + public final boolean cancel(boolean mayInterruptIfRunning) { + Object localValue = value; + boolean rValue = false; + if (localValue == null | localValue instanceof SetFuture) { + // Try to delay allocating the exception. At this point we may still lose the CAS, + // but it is certainly less likely. + Object valueToSet = + GENERATE_CANCELLATION_CAUSES + ? new Cancellation( + mayInterruptIfRunning, + new CancellationException("Future.cancel() was called.")) + : (mayInterruptIfRunning + ? Cancellation.CAUSELESS_INTERRUPTED + : Cancellation.CAUSELESS_CANCELLED); + AbstractResolvableFuture<?> abstractFuture = this; + while (true) { + if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { + rValue = true; + // We call interuptTask before calling complete(), which is consistent with + // FutureTask + if (mayInterruptIfRunning) { + abstractFuture.interruptTask(); + } + complete(abstractFuture); + if (localValue instanceof SetFuture) { + // propagate cancellation to the future set in setfuture, this is racy, + // and we don't + // care if we are successful or not. + ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; + if (futureToPropagateTo instanceof AbstractResolvableFuture) { + // If the future is a trusted then we specifically avoid + // calling cancel() this has 2 benefits + // 1. for long chains of futures strung together with setFuture we + // consume less stack + // 2. we avoid allocating Cancellation objects at every level of the + // cancellation chain + // We can only do this for TrustedFuture, because TrustedFuture + // .cancel is final and does nothing but delegate to this method. + AbstractResolvableFuture<?> trusted = + (AbstractResolvableFuture<?>) futureToPropagateTo; + localValue = trusted.value; + if (localValue == null | localValue instanceof SetFuture) { + abstractFuture = trusted; + continue; // loop back up and try to complete the new future + } + } else { + // not a TrustedFuture, call cancel directly. + futureToPropagateTo.cancel(mayInterruptIfRunning); + } + } + break; + } + // obj changed, reread + localValue = abstractFuture.value; + if (!(localValue instanceof SetFuture)) { + // obj cannot be null at this point, because value can only change from null + // to non-null. So if value changed (and it did since we lost the CAS), + // then it cannot be null and since it isn't a SetFuture, then the future must + // be done and we should exit the loop + break; + } + } + } + return rValue; + } + + /** + * Subclasses can override this method to implement interruption of the future's computation. + * The method is invoked automatically by a successful call to + * {@link #cancel(boolean) cancel(true)}. + * + * <p>The default implementation does nothing. + * + * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking + * {@link #wasInterrupted} to decide whether to interrupt your task. + * + * @since 10.0 + */ + protected void interruptTask() { + } + + /** + * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code + * true}. + * + * @since 14.0 + */ + protected final boolean wasInterrupted() { + final Object localValue = value; + return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; + } + + /** + * {@inheritDoc} + * + * @since 10.0 + */ + @Override + public final void addListener(Runnable listener, Executor executor) { + checkNotNull(listener); + checkNotNull(executor); + Listener oldHead = listeners; + if (oldHead != Listener.TOMBSTONE) { + Listener newNode = new Listener(listener, executor); + do { + newNode.next = oldHead; + if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { + return; + } + oldHead = listeners; // re-read + } while (oldHead != Listener.TOMBSTONE); + } + // If we get here then the Listener TOMBSTONE was set, which means the future is done, call + // the listener. + executeListener(listener, executor); + } + + /** + * Sets the result of this {@code Future} unless this {@code Future} has already been + * cancelled or set (including {@linkplain #setFuture set asynchronously}). + * When a call to this method returns, the {@code Future} is guaranteed to be + * {@linkplain #isDone done} <b>only if</b> the call was accepted (in which case it returns + * {@code true}). If it returns {@code false}, the {@code Future} may have previously been set + * asynchronously, in which case its result may not be known yet. That result, + * though not yet known, cannot be overridden by a call to a {@code set*} method, + * only by a call to {@link #cancel}. + * + * @param value the value to be used as the result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean set(@Nullable V value) { + Object valueToSet = value == null ? NULL : value; + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the failed result of this {@code Future} unless this {@code Future} has already been + * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this + * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only + * if</b> + * the call was accepted (in which case it returns {@code true}). If it returns {@code + * false}, the + * {@code Future} may have previously been set asynchronously, in which case its result may + * not be + * known yet. That result, though not yet known, cannot be overridden by a call to a {@code + * set*} + * method, only by a call to {@link #cancel}. + * + * @param throwable the exception to be used as the failed result + * @return true if the attempt was accepted, completing the {@code Future} + */ + protected boolean setException(Throwable throwable) { + Object valueToSet = new Failure(checkNotNull(throwable)); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + complete(this); + return true; + } + return false; + } + + /** + * Sets the result of this {@code Future} to match the supplied input {@code Future} once the + * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set + * (including "set asynchronously," defined below). + * + * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the + * call is accepted, then this future is guaranteed to have been completed with the supplied + * future by the time this method returns. If the supplied future is not done and the call + * is accepted, then the future will be <i>set asynchronously</i>. Note that such a result, + * though not yet known, cannot be overridden by a call to a {@code set*} method, + * only by a call to {@link #cancel}. + * + * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later + * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to + * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code + * Future}. + * + * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, + * it will never trigger interruption behavior. In particular, it will not cause this future to + * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not + * return {@code true}. + * + * @param future the future to delegate to + * @return true if the attempt was accepted, indicating that the {@code Future} was not + * previously cancelled or set. + * @since 19.0 + */ + protected boolean setFuture(ListenableFuture<? extends V> future) { + checkNotNull(future); + Object localValue = value; + if (localValue == null) { + if (future.isDone()) { + Object value = getFutureValue(future); + if (ATOMIC_HELPER.casValue(this, null, value)) { + complete(this); + return true; + } + return false; + } + SetFuture valueToSet = new SetFuture<V>(this, future); + if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { + // the listener is responsible for calling completeWithFuture, directExecutor is + // appropriate since all we are doing is unpacking a completed future + // which should be fast. + try { + future.addListener(valueToSet, DirectExecutor.INSTANCE); + } catch (Throwable t) { + // addListener has thrown an exception! SetFuture.run can't throw any + // exceptions so this must have been caused by addListener itself. + // The most likely explanation is a misconfigured mock. + // Try to switch to Failure. + Failure failure; + try { + failure = new Failure(t); + } catch (Throwable oomMostLikely) { + failure = Failure.FALLBACK_INSTANCE; + } + // Note: The only way this CAS could fail is if cancel() has raced with us. + // That is ok. + boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); + } + return true; + } + localValue = value; // we lost the cas, fall through and maybe cancel + } + // The future has already been set to something. If it is cancellation we should cancel the + // incoming future. + if (localValue instanceof Cancellation) { + // we don't care if it fails, this is best-effort. + future.cancel(((Cancellation) localValue).wasInterrupted); + } + return false; + } + + /** + * Returns a value that satisfies the contract of the {@link #value} field based on the state of + * given future. + * + * <p>This is approximately the inverse of {@link #getDoneValue(Object)} + */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static Object getFutureValue(ListenableFuture<?> future) { + if (future instanceof AbstractResolvableFuture) { + // Break encapsulation for TrustedFuture instances since we know that subclasses cannot + // override .get() (since it is final) and therefore this is equivalent to calling + // .get() and unpacking the exceptions like we do below (just much faster because it is + // a single field read instead of a read, several branches and possibly + // creating exceptions). + Object v = ((AbstractResolvableFuture<?>) future).value; + if (v instanceof Cancellation) { + // If the other future was interrupted, clear the interrupted bit while + // preserving the cause this will make it consistent with how non-trustedfutures + // work which cannot propagate the wasInterrupted bit + Cancellation c = (Cancellation) v; + if (c.wasInterrupted) { + v = c.cause != null ? new Cancellation(/* wasInterrupted= */ false, c.cause) + : Cancellation.CAUSELESS_CANCELLED; + } + } + return v; + } + boolean wasCancelled = future.isCancelled(); + // Don't allocate a CancellationException if it's not necessary + if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { + return Cancellation.CAUSELESS_CANCELLED; + } + // Otherwise calculate the value by calling .get() + try { + Object v = getUninterruptibly(future); + return v == null ? NULL : v; + } catch (ExecutionException exception) { + return new Failure(exception.getCause()); + } catch (CancellationException cancellation) { + if (!wasCancelled) { + return new Failure( + new IllegalArgumentException( + "get() threw CancellationException, despite reporting isCancelled" + + "() == false: " + + future, + cancellation)); + } + return new Cancellation(false, cancellation); + } catch (Throwable t) { + return new Failure(t); + } + } + + /** + * internal dependency on other /util/concurrent classes. + */ + private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { + boolean interrupted = false; + try { + while (true) { + try { + return future.get(); + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** Unblocks all threads and runs all listeners. */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + static void complete(AbstractResolvableFuture<?> future) { + Listener next = null; + outer: + while (true) { + future.releaseWaiters(); + // We call this before the listeners in order to avoid needing to manage a separate + // stack data structure for them. Also, some implementations rely on this running + // prior to listeners so that the cleanup work is visible to listeners. + // afterDone() should be generally fast and only used for cleanup work... but in + // theory can also be recursive and create StackOverflowErrors + future.afterDone(); + // push the current set of listeners onto next + next = future.clearListeners(next); + future = null; + while (next != null) { + Listener curr = next; + next = next.next; + Runnable task = curr.task; + if (task instanceof SetFuture) { + SetFuture<?> setFuture = (SetFuture<?>) task; + // We unwind setFuture specifically to avoid StackOverflowErrors in the case + // of long chains of SetFutures + // Handling this special case is important because there is no way to pass an + // executor to setFuture, so a user couldn't break the chain by doing this + // themselves. It is also potentially common if someone writes a recursive + // Futures.transformAsync transformer. + future = setFuture.owner; + if (future.value == setFuture) { + Object valueToSet = getFutureValue(setFuture.future); + if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { + continue outer; + } + } + // other wise the future we were trying to set is already done. + } else { + executeListener(task, curr.executor); + } + } + break; + } + } + + /** + * Callback method that is called exactly once after the future is completed. + * + * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. + * + * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is + * intended for very lightweight cleanup work, for example, timing statistics or clearing + * fields. + * If your task does anything heavier consider, just using a listener with an executor. + * + * @since 20.0 + */ + protected void afterDone() { + } + + /** + * If this future has been cancelled (and possibly interrupted), cancels (and possibly + * interrupts) the given future (if available). + */ + @SuppressWarnings("ParameterNotNullable") + final void maybePropagateCancellationTo(@Nullable Future<?> related) { + if (related != null & isCancelled()) { + related.cancel(wasInterrupted()); + } + } + + /** Releases all threads in the {@link #waiters} list, and clears the list. */ + private void releaseWaiters() { + Waiter head; + do { + head = waiters; + } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); + for (Waiter currentWaiter = head; currentWaiter != null; + currentWaiter = currentWaiter.next) { + currentWaiter.unpark(); + } + } + + /** + * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently + * added first. + */ + private Listener clearListeners(Listener onto) { + // We need to + // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to + // to synchronize with us + // 2. reverse the linked list, because despite our rather clear contract, people depend + // on us executing listeners in the order they were added + // 3. push all the items onto 'onto' and return the new head of the stack + Listener head; + do { + head = listeners; + } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); + Listener reversedList = onto; + while (head != null) { + Listener tmp = head; + head = head.next; + tmp.next = reversedList; + reversedList = tmp; + } + return reversedList; + } + + // TODO(clm): move parts into a default method on ListenableFuture? + @Override + public String toString() { + StringBuilder builder = new StringBuilder().append(super.toString()).append("[status="); + if (isCancelled()) { + builder.append("CANCELLED"); + } else if (isDone()) { + addDoneString(builder); + } else { + String pendingDescription; + try { + pendingDescription = pendingToString(); + } catch (RuntimeException e) { + // Don't call getMessage or toString() on the exception, in case the exception + // thrown by the subclass is implemented with bugs similar to the subclass. + pendingDescription = "Exception thrown from implementation: " + e.getClass(); + } + // The future may complete during or before the call to getPendingToString, so we use + // null as a signal that we should try checking if the future is done again. + if (pendingDescription != null && !pendingDescription.isEmpty()) { + builder.append("PENDING, info=[").append(pendingDescription).append("]"); + } else if (isDone()) { + addDoneString(builder); + } else { + builder.append("PENDING"); + } + } + return builder.append("]").toString(); + } + + /** + * Provide a human-readable explanation of why this future has not yet completed. + * + * @return null if an explanation cannot be provided because the future is done. + * @since 23.0 + */ + @Nullable + protected String pendingToString() { + Object localValue = value; + if (localValue instanceof SetFuture) { + return "setFuture=[" + userObjectToString(((SetFuture) localValue).future) + "]"; + } else if (this instanceof ScheduledFuture) { + return "remaining delay=[" + + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) + + " ms]"; + } + return null; + } + + private void addDoneString(StringBuilder builder) { + try { + V value = getUninterruptibly(this); + builder.append("SUCCESS, result=[").append(userObjectToString(value)).append("]"); + } catch (ExecutionException e) { + builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); + } catch (CancellationException e) { + builder.append("CANCELLED"); // shouldn't be reachable + } catch (RuntimeException e) { + builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); + } + } + + /** Helper for printing user supplied objects into our toString method. */ + private String userObjectToString(Object o) { + // This is some basic recursion detection for when people create cycles via set/setFuture + // This is however only partial protection though since it only detects self loops. We + // could detect arbitrary cycles using a thread local or possibly by catching + // StackOverflowExceptions but this should be a good enough solution + // (it is also what jdk collections do in these cases) + if (o == this) { + return "this future"; + } + return String.valueOf(o); + } + + /** + * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain + * RuntimeException runtime exceptions} thrown by the executor. + */ + private static void executeListener(Runnable runnable, Executor executor) { + try { + executor.execute(runnable); + } catch (RuntimeException e) { + // Log it and keep going -- bad runnable and/or executor. Don't punish the other + // runnables if we're given a bad one. We only catch RuntimeException + // because we want Errors to propagate up. + log.log( + Level.SEVERE, + "RuntimeException while executing runnable " + runnable + " with executor " + + executor, + e); + } + } + + private abstract static class AtomicHelper { + /** Non volatile write of the thread to the {@link Waiter#thread} field. */ + abstract void putThread(Waiter waiter, Thread newValue); + + /** Non volatile write of the waiter to the {@link Waiter#next} field. */ + abstract void putNext(Waiter waiter, Waiter newValue); + + /** Performs a CAS operation on the {@link #waiters} field. */ + abstract boolean casWaiters( + AbstractResolvableFuture<?> future, + Waiter expect, + Waiter update); + + /** Performs a CAS operation on the {@link #listeners} field. */ + abstract boolean casListeners( + AbstractResolvableFuture<?> future, + Listener expect, + Listener update); + + /** Performs a CAS operation on the {@link #value} field. */ + abstract boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update); + } + + /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ + private static final class SafeAtomicHelper extends AtomicHelper { + final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; + final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Waiter> waitersUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Listener> listenersUpdater; + final AtomicReferenceFieldUpdater<AbstractResolvableFuture, Object> valueUpdater; + + SafeAtomicHelper( + AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, + AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Waiter> waitersUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Listener> listenersUpdater, + AtomicReferenceFieldUpdater<AbstractResolvableFuture, Object> valueUpdater) { + this.waiterThreadUpdater = waiterThreadUpdater; + this.waiterNextUpdater = waiterNextUpdater; + this.waitersUpdater = waitersUpdater; + this.listenersUpdater = listenersUpdater; + this.valueUpdater = valueUpdater; + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + waiterThreadUpdater.lazySet(waiter, newValue); + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiterNextUpdater.lazySet(waiter, newValue); + } + + @Override + boolean casWaiters(AbstractResolvableFuture<?> future, Waiter expect, Waiter update) { + return waitersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casListeners(AbstractResolvableFuture<?> future, Listener expect, Listener update) { + return listenersUpdater.compareAndSet(future, expect, update); + } + + @Override + boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update) { + return valueUpdater.compareAndSet(future, expect, update); + } + } + + /** + * {@link AtomicHelper} based on {@code synchronized} and volatile writes. + * + * <p>This is an implementation of last resort for when certain basic VM features are broken + * (like AtomicReferenceFieldUpdater). + */ + private static final class SynchronizedHelper extends AtomicHelper { + SynchronizedHelper() { + } + + @Override + void putThread(Waiter waiter, Thread newValue) { + waiter.thread = newValue; + } + + @Override + void putNext(Waiter waiter, Waiter newValue) { + waiter.next = newValue; + } + + @Override + boolean casWaiters(AbstractResolvableFuture<?> future, Waiter expect, Waiter update) { + synchronized (future) { + if (future.waiters == expect) { + future.waiters = update; + return true; + } + return false; + } + } + + @Override + boolean casListeners(AbstractResolvableFuture<?> future, Listener expect, Listener update) { + synchronized (future) { + if (future.listeners == expect) { + future.listeners = update; + return true; + } + return false; + } + } + + @Override + boolean casValue(AbstractResolvableFuture<?> future, Object expect, Object update) { + synchronized (future) { + if (future.value == expect) { + future.value = update; + return true; + } + return false; + } + } + } + + private static CancellationException cancellationExceptionWithCause( + @Nullable String message, @Nullable Throwable cause) { + CancellationException exception = new CancellationException(message); + exception.initCause(cause); + return exception; + } + + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessor. + @NonNull + static <T> T checkNotNull(@Nullable T reference) { + if (reference == null) { + throw new NullPointerException(); + } + return reference; + } +} diff --git a/media2/session/src/main/java/androidx/media2/session/futures/DirectExecutor.java b/media2/session/src/main/java/androidx/media2/session/futures/DirectExecutor.java new file mode 100644 index 00000000000..9ca99b5e8c2 --- /dev/null +++ b/media2/session/src/main/java/androidx/media2/session/futures/DirectExecutor.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.session.futures; + +import androidx.annotation.RestrictTo; + +import java.util.concurrent.Executor; + +/** + * An {@link Executor} that runs each task in the thread that invokes {@link Executor#execute + * execute}. + * @hide + */ +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public String toString() { + return "DirectExecutor"; + } +}
\ No newline at end of file diff --git a/media2/session/src/main/java/androidx/media2/session/futures/ResolvableFuture.java b/media2/session/src/main/java/androidx/media2/session/futures/ResolvableFuture.java new file mode 100644 index 00000000000..4a3fcb36092 --- /dev/null +++ b/media2/session/src/main/java/androidx/media2/session/futures/ResolvableFuture.java @@ -0,0 +1,66 @@ +/* + * Copyright 2019 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package androidx.media2.session.futures; + + +import androidx.annotation.Nullable; +import androidx.annotation.RestrictTo; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * An AndroidX version of Guava's {@code SettableFuture}. + * <p> + * A {@link ListenableFuture} whose result can be set by a {@link #set(Object)}, {@link + * #setException(Throwable)} or {@link #setFuture(ListenableFuture)} call. It can also, like any + * other {@code Future}, be {@linkplain #cancel cancelled}. + * <p> + * If your needs are more complex than {@code ResolvableFuture} supports, use {@link + * AbstractResolvableFuture}, which offers an extensible version of the API. + * + * @hide + * @author Sven Mawson + */ +@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) +public final class ResolvableFuture<V> extends AbstractResolvableFuture<V> { + /** + * Creates a new {@code ResolvableFuture} that can be completed or cancelled by a later method + * call. + */ + public static <V> ResolvableFuture<V> create() { + return new ResolvableFuture<>(); + } + + @Override + public boolean set(@Nullable V value) { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + + @Override + public boolean setFuture(ListenableFuture<? extends V> future) { + return super.setFuture(future); + } + + private ResolvableFuture() { + } +} + diff --git a/media2/widget/build.gradle b/media2/widget/build.gradle index 2049fa6df07..0e717bb0b39 100644 --- a/media2/widget/build.gradle +++ b/media2/widget/build.gradle @@ -28,9 +28,9 @@ dependencies { api(project(":media2:media2-common")) api(project(":media2:media2-player")) api(project(":media2:media2-session")) + api(GUAVA_LISTENABLE_FUTURE) implementation("androidx.appcompat:appcompat:1.0.2") implementation("androidx.palette:palette:1.0.0") - implementation(project(":concurrent:concurrent-futures")) androidTestImplementation(ANDROIDX_TEST_EXT_JUNIT) androidTestImplementation(ANDROIDX_TEST_CORE) |