aboutsummaryrefslogtreecommitdiff
path: root/guava/src/com/google/common/util/concurrent/AggregateFuture.java
diff options
context:
space:
mode:
Diffstat (limited to 'guava/src/com/google/common/util/concurrent/AggregateFuture.java')
-rw-r--r--guava/src/com/google/common/util/concurrent/AggregateFuture.java439
1 files changed, 240 insertions, 199 deletions
diff --git a/guava/src/com/google/common/util/concurrent/AggregateFuture.java b/guava/src/com/google/common/util/concurrent/AggregateFuture.java
index 432684578..596a5ae35 100644
--- a/guava/src/com/google/common/util/concurrent/AggregateFuture.java
+++ b/guava/src/com/google/common/util/concurrent/AggregateFuture.java
@@ -16,8 +16,11 @@ package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED;
+import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static java.util.logging.Level.SEVERE;
import com.google.common.annotations.GwtCompatible;
import com.google.common.collect.ImmutableCollection;
@@ -26,265 +29,303 @@ import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
- * A future made up of a collection of sub-futures.
+ * A future whose value is derived from a collection of input futures.
*
* @param <InputT> the type of the individual inputs
* @param <OutputT> the type of the output (i.e. this) future
*/
@GwtCompatible
-abstract class AggregateFuture<InputT, OutputT> extends AbstractFuture.TrustedFuture<OutputT> {
+abstract class AggregateFuture<InputT, OutputT> extends AggregateFutureState<OutputT> {
private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName());
+ /**
+ * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to
+ * propagate cancellation) and {@link #toString()}. To access the futures' <i>values</i>, {@code
+ * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case
+ * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs.
+ */
/*
* In certain circumstances, this field might theoretically not be visible to an afterDone() call
* triggered by cancel(). For details, see the comments on the fields of TimeoutFuture.
*/
- private @Nullable RunningState runningState;
+ private @Nullable ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures;
+
+ private final boolean allMustSucceed;
+ private final boolean collectsValues;
+
+ AggregateFuture(
+ ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures,
+ boolean allMustSucceed,
+ boolean collectsValues) {
+ super(futures.size());
+ this.futures = checkNotNull(futures);
+ this.allMustSucceed = allMustSucceed;
+ this.collectsValues = collectsValues;
+ }
@Override
protected final void afterDone() {
super.afterDone();
- RunningState localRunningState = runningState;
- if (localRunningState != null) {
- // Let go of the memory held by the running state
- this.runningState = null;
- ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures =
- localRunningState.futures;
- boolean wasInterrupted = wasInterrupted();
- if (wasInterrupted) {
- localRunningState.interruptTask();
- }
+ ImmutableCollection<? extends Future<?>> localFutures = futures;
+ releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures`
- if (isCancelled() & futures != null) {
- for (ListenableFuture<?> future : futures) {
- future.cancel(wasInterrupted);
- }
+ if (isCancelled() & localFutures != null) {
+ boolean wasInterrupted = wasInterrupted();
+ for (Future<?> future : localFutures) {
+ future.cancel(wasInterrupted);
}
}
+ /*
+ * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed
+ * again if some outstanding input fails.
+ */
}
@Override
- protected String pendingToString() {
- RunningState localRunningState = runningState;
- if (localRunningState == null) {
- return null;
- }
- ImmutableCollection<? extends ListenableFuture<? extends InputT>> localFutures =
- localRunningState.futures;
+ protected final String pendingToString() {
+ ImmutableCollection<? extends Future<?>> localFutures = futures;
if (localFutures != null) {
- return "futures=[" + localFutures + "]";
+ return "futures=" + localFutures;
}
- return null;
+ return super.pendingToString();
}
- /** Must be called at the end of each sub-class's constructor. */
- final void init(RunningState runningState) {
- this.runningState = runningState;
- runningState.init();
- }
-
- abstract class RunningState extends AggregateFutureState implements Runnable {
- private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures;
- private final boolean allMustSucceed;
- private final boolean collectsValues;
-
- RunningState(
- ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures,
- boolean allMustSucceed,
- boolean collectsValues) {
- super(futures.size());
- this.futures = checkNotNull(futures);
- this.allMustSucceed = allMustSucceed;
- this.collectsValues = collectsValues;
- }
-
- /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */
- @Override
- public final void run() {
- decrementCountAndMaybeComplete();
+ /**
+ * Must be called at the end of each subclass's constructor. This method performs the "real"
+ * initialization; we can't put this in the constructor because, in the case where futures are
+ * already complete, we would not initialize the subclass before calling {@link
+ * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed,
+ * we're guaranteed to have properly initialized the subclass.
+ */
+ final void init() {
+ // Corner case: List is empty.
+ if (futures.isEmpty()) {
+ handleAllCompleted();
+ return;
}
- /**
- * The "real" initialization; we can't put this in the constructor because, in the case where
- * futures are already complete, we would not initialize the subclass before calling {@link
- * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed
- * to have properly initialized the subclass.
- */
- private void init() {
- // Corner case: List is empty.
- if (futures.isEmpty()) {
- handleAllCompleted();
- return;
- }
-
- // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll
- // need to handle RejectedExecutionException
+ // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll
+ // need to handle RejectedExecutionException
- if (allMustSucceed) {
- // We need fail fast, so we have to keep track of which future failed so we can propagate
- // the exception immediately
+ if (allMustSucceed) {
+ // We need fail fast, so we have to keep track of which future failed so we can propagate
+ // the exception immediately
- // Register a listener on each Future in the list to update the state of this future.
- // Note that if all the futures on the list are done prior to completing this loop, the last
- // call to addListener() will callback to setOneValue(), transitively call our cleanup
- // listener, and set this.futures to null.
- // This is not actually a problem, since the foreach only needs this.futures to be non-null
- // at the beginning of the loop.
- int i = 0;
- for (final ListenableFuture<? extends InputT> listenable : futures) {
- final int index = i++;
- listenable.addListener(
- new Runnable() {
- @Override
- public void run() {
- try {
- handleOneInputDone(index, listenable);
- } finally {
- decrementCountAndMaybeComplete();
+ // Register a listener on each Future in the list to update the state of this future.
+ // Note that if all the futures on the list are done prior to completing this loop, the last
+ // call to addListener() will callback to setOneValue(), transitively call our cleanup
+ // listener, and set this.futures to null.
+ // This is not actually a problem, since the foreach only needs this.futures to be non-null
+ // at the beginning of the loop.
+ int i = 0;
+ for (final ListenableFuture<? extends InputT> future : futures) {
+ final int index = i++;
+ future.addListener(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (future.isCancelled()) {
+ // Clear futures prior to cancelling children. This sets our own state but lets
+ // the input futures keep running, as some of them may be used elsewhere.
+ futures = null;
+ cancel(false);
+ } else {
+ collectValueFromNonCancelledFuture(index, future);
}
+ } finally {
+ /*
+ * "null" means: There is no need to access `futures` again during
+ * `processCompleted` because we're reading each value during a call to
+ * handleOneInputDone.
+ */
+ decrementCountAndMaybeComplete(null);
}
- },
- directExecutor());
- }
- } else {
- // We'll only call the callback when all futures complete, regardless of whether some failed
- // Hold off on calling setOneValue until all complete, so we can share the same listener
- for (ListenableFuture<? extends InputT> listenable : futures) {
- listenable.addListener(this, directExecutor());
- }
+ }
+ },
+ directExecutor());
+ }
+ } else {
+ /*
+ * We'll call the user callback or collect the values only when all inputs complete,
+ * regardless of whether some failed. This lets us avoid calling expensive methods like
+ * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it
+ * lets all futures share the same listener.
+ *
+ * We store `localFutures` inside the listener because `this.futures` might be nulled out by
+ * the time the listener runs for the final future -- at which point we need to check all
+ * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't
+ * need access to the futures again, so we can just pass `null`.
+ *
+ * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization.
+ * If we make some other optimizations, this one will no longer be necessary. The optimization
+ * could actually hurt in some cases, as it forces us to keep all inputs in memory until the
+ * final input completes.
+ */
+ final ImmutableCollection<? extends Future<? extends InputT>> localFutures =
+ collectsValues ? futures : null;
+ Runnable listener =
+ new Runnable() {
+ @Override
+ public void run() {
+ decrementCountAndMaybeComplete(localFutures);
+ }
+ };
+ for (ListenableFuture<? extends InputT> future : futures) {
+ future.addListener(listener, directExecutor());
}
}
+ }
- /**
- * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
- * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
- * throwable did not cause this future to fail, and it is the first time we've seen that
- * particular Throwable.
- */
- private void handleException(Throwable throwable) {
- checkNotNull(throwable);
+ /**
+ * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
+ * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
+ * throwable did not cause this future to fail, and it is the first time we've seen that
+ * particular Throwable.
+ */
+ private void handleException(Throwable throwable) {
+ checkNotNull(throwable);
- boolean completedWithFailure = false;
- boolean firstTimeSeeingThisException = true;
- if (allMustSucceed) {
- // As soon as the first one fails, throw the exception up.
- // The result of all other inputs is then ignored.
- completedWithFailure = setException(throwable);
- if (completedWithFailure) {
- releaseResourcesAfterFailure();
- } else {
- // Go up the causal chain to see if we've already seen this cause; if we have, even if
- // it's wrapped by a different exception, don't log it.
- firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable);
+ if (allMustSucceed) {
+ // As soon as the first one fails, make that failure the result of the output future.
+ // The results of all other inputs are then ignored (except for logging any failures).
+ boolean completedWithFailure = setException(throwable);
+ if (!completedWithFailure) {
+ // Go up the causal chain to see if we've already seen this cause; if we have, even if
+ // it's wrapped by a different exception, don't log it.
+ boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable);
+ if (firstTimeSeeingThisException) {
+ log(throwable);
+ return;
}
}
-
- // | and & used because it's faster than the branch required for || and &&
- if (throwable instanceof Error
- | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) {
- String message =
- (throwable instanceof Error)
- ? "Input Future failed with Error"
- : "Got more than one input Future failure. Logging failures after the first";
- logger.log(Level.SEVERE, message, throwable);
- }
}
- @Override
- final void addInitialException(Set<Throwable> seen) {
- if (!isCancelled()) {
- // TODO(cpovirk): Think about whether we could/should use Verify to check this.
- boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure());
- }
+ /*
+ * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call
+ * handleException() at all.
+ */
+ if (throwable instanceof Error) {
+ /*
+ * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it
+ * returned true? This was intentional (CL 46470009), but it seems odd compared to how we
+ * normally handle Error.
+ *
+ * Similarly, do we really want to log the same Error more than once?
+ */
+ log(throwable);
}
+ }
- /** Handles the input at the given index completing. */
- private void handleOneInputDone(int index, Future<? extends InputT> future) {
- // The only cases in which this Future should already be done are (a) if it was cancelled or
- // (b) if an input failed and we propagated that immediately because of allMustSucceed.
- checkState(
- allMustSucceed || !isDone() || isCancelled(),
- "Future was done before all dependencies completed");
+ private static void log(Throwable throwable) {
+ String message =
+ (throwable instanceof Error)
+ ? "Input Future failed with Error"
+ : "Got more than one input Future failure. Logging failures after the first";
+ logger.log(SEVERE, message, throwable);
+ }
- try {
- checkState(future.isDone(), "Tried to set value from future which is not done");
- if (allMustSucceed) {
- if (future.isCancelled()) {
- // clear running state prior to cancelling children, this sets our own state but lets
- // the input futures keep running as some of them may be used elsewhere.
- runningState = null;
- cancel(false);
- } else {
- // We always get the result so that we can have fail-fast, even if we don't collect
- InputT result = getDone(future);
- if (collectsValues) {
- collectOneValue(allMustSucceed, index, result);
- }
- }
- } else if (collectsValues && !future.isCancelled()) {
- collectOneValue(allMustSucceed, index, getDone(future));
- }
- } catch (ExecutionException e) {
- handleException(e.getCause());
- } catch (Throwable t) {
- handleException(t);
- }
+ @Override
+ final void addInitialException(Set<Throwable> seen) {
+ checkNotNull(seen);
+ if (!isCancelled()) {
+ // TODO(cpovirk): Think about whether we could/should use Verify to check this.
+ boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure());
}
+ }
- private void decrementCountAndMaybeComplete() {
- int newRemaining = decrementRemainingAndGet();
- checkState(newRemaining >= 0, "Less than 0 remaining futures");
- if (newRemaining == 0) {
- processCompleted();
- }
+ /**
+ * Collects the result (success or failure) of one input future. The input must not have been
+ * cancelled. For details on when this is called, see {@link #collectOneValue}.
+ */
+ private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) {
+ try {
+ // We get the result, even if collectOneValue is a no-op, so that we can fail fast.
+ collectOneValue(index, getDone(future));
+ } catch (ExecutionException e) {
+ handleException(e.getCause());
+ } catch (Throwable t) {
+ handleException(t);
}
+ }
- private void processCompleted() {
- // Collect the values if (a) our output requires collecting them and (b) we haven't been
- // collecting them as we go. (We've collected them as we go only if we needed to fail fast)
- if (collectsValues & !allMustSucceed) {
- int i = 0;
- for (ListenableFuture<? extends InputT> listenable : futures) {
- handleOneInputDone(i++, listenable);
+ private void decrementCountAndMaybeComplete(
+ @Nullable
+ ImmutableCollection<? extends Future<? extends InputT>>
+ futuresIfNeedToCollectAtCompletion) {
+ int newRemaining = decrementRemainingAndGet();
+ checkState(newRemaining >= 0, "Less than 0 remaining futures");
+ if (newRemaining == 0) {
+ processCompleted(futuresIfNeedToCollectAtCompletion);
+ }
+ }
+
+ private void processCompleted(
+ @Nullable
+ ImmutableCollection<? extends Future<? extends InputT>>
+ futuresIfNeedToCollectAtCompletion) {
+ if (futuresIfNeedToCollectAtCompletion != null) {
+ int i = 0;
+ for (Future<? extends InputT> future : futuresIfNeedToCollectAtCompletion) {
+ if (!future.isCancelled()) {
+ collectValueFromNonCancelledFuture(i, future);
}
+ i++;
}
- handleAllCompleted();
}
-
- /**
- * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we
- * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we
- * immediately release resources we no longer need); additionally, the future will release its
- * reference to {@link RunningState}, which should free all associated memory when all the
- * futures complete and the listeners are released.
- *
- * <p>TODO(user): Write tests for memory retention
+ clearSeenExceptions();
+ handleAllCompleted();
+ /*
+ * Null out fields, including some used in handleAllCompleted() above (like
+ * `CollectionFuture.values`). This might be a no-op: If this future completed during
+ * handleAllCompleted(), they will already have been nulled out. But in the case of
+ * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in
+ * the case of callAsync(), which waits for the callback's returned future to complete.
*/
- @ForOverride
- @OverridingMethodsMustInvokeSuper
- void releaseResourcesAfterFailure() {
- this.futures = null;
- }
+ releaseResources(ALL_INPUT_FUTURES_PROCESSED);
+ }
- /**
- * Called only if {@code collectsValues} is true.
- *
- * <p>If {@code allMustSucceed} is true, called as each future completes; otherwise, called for
- * each future when all futures complete.
+ /**
+ * Clears fields that are no longer needed after this future has completed -- or at least all its
+ * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called).
+ * Often called multiple times (that is, both when the inputs complete and when the output
+ * completes).
+ *
+ * <p>This is similar to our proposed {@code afterCommit} method but not quite the same. See the
+ * description of CL 265462958.
+ */
+ // TODO(user): Write more tests for memory retention.
+ @ForOverride
+ @OverridingMethodsMustInvokeSuper
+ void releaseResources(ReleaseResourcesReason reason) {
+ checkNotNull(reason);
+ /*
+ * All elements of `futures` are completed, or this future has already completed and read
+ * `futures` into a local variable (in preparation for propagating cancellation to them). In
+ * either case, no one needs to read `futures` for cancellation purposes later. (And
+ * cancellation purposes are the main reason to access `futures`, as discussed in its docs.)
*/
- abstract void collectOneValue(boolean allMustSucceed, int index, @Nullable InputT returnValue);
-
- abstract void handleAllCompleted();
+ this.futures = null;
+ }
- void interruptTask() {}
+ enum ReleaseResourcesReason {
+ OUTPUT_FUTURE_DONE,
+ ALL_INPUT_FUTURES_PROCESSED,
}
+ /**
+ * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code
+ * collectsValues} is true, called for each future when all futures complete.
+ */
+ abstract void collectOneValue(int index, @Nullable InputT returnValue);
+
+ abstract void handleAllCompleted();
+
/** Adds the chain to the seen set, and returns whether all the chain was new to us. */
private static boolean addCausalChain(Set<Throwable> seen, Throwable t) {
for (; t != null; t = t.getCause()) {