diff options
Diffstat (limited to 'guava/src/com/google/common/util/concurrent/AggregateFuture.java')
-rw-r--r-- | guava/src/com/google/common/util/concurrent/AggregateFuture.java | 439 |
1 files changed, 240 insertions, 199 deletions
diff --git a/guava/src/com/google/common/util/concurrent/AggregateFuture.java b/guava/src/com/google/common/util/concurrent/AggregateFuture.java index 432684578..596a5ae35 100644 --- a/guava/src/com/google/common/util/concurrent/AggregateFuture.java +++ b/guava/src/com/google/common/util/concurrent/AggregateFuture.java @@ -16,8 +16,11 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED; +import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE; import static com.google.common.util.concurrent.Futures.getDone; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.logging.Level.SEVERE; import com.google.common.annotations.GwtCompatible; import com.google.common.collect.ImmutableCollection; @@ -26,265 +29,303 @@ import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.qual.Nullable; /** - * A future made up of a collection of sub-futures. + * A future whose value is derived from a collection of input futures. * * @param <InputT> the type of the individual inputs * @param <OutputT> the type of the output (i.e. this) future */ @GwtCompatible -abstract class AggregateFuture<InputT, OutputT> extends AbstractFuture.TrustedFuture<OutputT> { +abstract class AggregateFuture<InputT, OutputT> extends AggregateFutureState<OutputT> { private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); + /** + * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to + * propagate cancellation) and {@link #toString()}. To access the futures' <i>values</i>, {@code + * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case + * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs. + */ /* * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ - private @Nullable RunningState runningState; + private @Nullable ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures; + + private final boolean allMustSucceed; + private final boolean collectsValues; + + AggregateFuture( + ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures, + boolean allMustSucceed, + boolean collectsValues) { + super(futures.size()); + this.futures = checkNotNull(futures); + this.allMustSucceed = allMustSucceed; + this.collectsValues = collectsValues; + } @Override protected final void afterDone() { super.afterDone(); - RunningState localRunningState = runningState; - if (localRunningState != null) { - // Let go of the memory held by the running state - this.runningState = null; - ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures = - localRunningState.futures; - boolean wasInterrupted = wasInterrupted(); - if (wasInterrupted) { - localRunningState.interruptTask(); - } + ImmutableCollection<? extends Future<?>> localFutures = futures; + releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures` - if (isCancelled() & futures != null) { - for (ListenableFuture<?> future : futures) { - future.cancel(wasInterrupted); - } + if (isCancelled() & localFutures != null) { + boolean wasInterrupted = wasInterrupted(); + for (Future<?> future : localFutures) { + future.cancel(wasInterrupted); } } + /* + * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed + * again if some outstanding input fails. + */ } @Override - protected String pendingToString() { - RunningState localRunningState = runningState; - if (localRunningState == null) { - return null; - } - ImmutableCollection<? extends ListenableFuture<? extends InputT>> localFutures = - localRunningState.futures; + protected final String pendingToString() { + ImmutableCollection<? extends Future<?>> localFutures = futures; if (localFutures != null) { - return "futures=[" + localFutures + "]"; + return "futures=" + localFutures; } - return null; + return super.pendingToString(); } - /** Must be called at the end of each sub-class's constructor. */ - final void init(RunningState runningState) { - this.runningState = runningState; - runningState.init(); - } - - abstract class RunningState extends AggregateFutureState implements Runnable { - private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures; - private final boolean allMustSucceed; - private final boolean collectsValues; - - RunningState( - ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures, - boolean allMustSucceed, - boolean collectsValues) { - super(futures.size()); - this.futures = checkNotNull(futures); - this.allMustSucceed = allMustSucceed; - this.collectsValues = collectsValues; - } - - /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */ - @Override - public final void run() { - decrementCountAndMaybeComplete(); + /** + * Must be called at the end of each subclass's constructor. This method performs the "real" + * initialization; we can't put this in the constructor because, in the case where futures are + * already complete, we would not initialize the subclass before calling {@link + * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed, + * we're guaranteed to have properly initialized the subclass. + */ + final void init() { + // Corner case: List is empty. + if (futures.isEmpty()) { + handleAllCompleted(); + return; } - /** - * The "real" initialization; we can't put this in the constructor because, in the case where - * futures are already complete, we would not initialize the subclass before calling {@link - * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed - * to have properly initialized the subclass. - */ - private void init() { - // Corner case: List is empty. - if (futures.isEmpty()) { - handleAllCompleted(); - return; - } - - // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll - // need to handle RejectedExecutionException + // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll + // need to handle RejectedExecutionException - if (allMustSucceed) { - // We need fail fast, so we have to keep track of which future failed so we can propagate - // the exception immediately + if (allMustSucceed) { + // We need fail fast, so we have to keep track of which future failed so we can propagate + // the exception immediately - // Register a listener on each Future in the list to update the state of this future. - // Note that if all the futures on the list are done prior to completing this loop, the last - // call to addListener() will callback to setOneValue(), transitively call our cleanup - // listener, and set this.futures to null. - // This is not actually a problem, since the foreach only needs this.futures to be non-null - // at the beginning of the loop. - int i = 0; - for (final ListenableFuture<? extends InputT> listenable : futures) { - final int index = i++; - listenable.addListener( - new Runnable() { - @Override - public void run() { - try { - handleOneInputDone(index, listenable); - } finally { - decrementCountAndMaybeComplete(); + // Register a listener on each Future in the list to update the state of this future. + // Note that if all the futures on the list are done prior to completing this loop, the last + // call to addListener() will callback to setOneValue(), transitively call our cleanup + // listener, and set this.futures to null. + // This is not actually a problem, since the foreach only needs this.futures to be non-null + // at the beginning of the loop. + int i = 0; + for (final ListenableFuture<? extends InputT> future : futures) { + final int index = i++; + future.addListener( + new Runnable() { + @Override + public void run() { + try { + if (future.isCancelled()) { + // Clear futures prior to cancelling children. This sets our own state but lets + // the input futures keep running, as some of them may be used elsewhere. + futures = null; + cancel(false); + } else { + collectValueFromNonCancelledFuture(index, future); } + } finally { + /* + * "null" means: There is no need to access `futures` again during + * `processCompleted` because we're reading each value during a call to + * handleOneInputDone. + */ + decrementCountAndMaybeComplete(null); } - }, - directExecutor()); - } - } else { - // We'll only call the callback when all futures complete, regardless of whether some failed - // Hold off on calling setOneValue until all complete, so we can share the same listener - for (ListenableFuture<? extends InputT> listenable : futures) { - listenable.addListener(this, directExecutor()); - } + } + }, + directExecutor()); + } + } else { + /* + * We'll call the user callback or collect the values only when all inputs complete, + * regardless of whether some failed. This lets us avoid calling expensive methods like + * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it + * lets all futures share the same listener. + * + * We store `localFutures` inside the listener because `this.futures` might be nulled out by + * the time the listener runs for the final future -- at which point we need to check all + * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't + * need access to the futures again, so we can just pass `null`. + * + * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization. + * If we make some other optimizations, this one will no longer be necessary. The optimization + * could actually hurt in some cases, as it forces us to keep all inputs in memory until the + * final input completes. + */ + final ImmutableCollection<? extends Future<? extends InputT>> localFutures = + collectsValues ? futures : null; + Runnable listener = + new Runnable() { + @Override + public void run() { + decrementCountAndMaybeComplete(localFutures); + } + }; + for (ListenableFuture<? extends InputT> future : futures) { + future.addListener(listener, directExecutor()); } } + } - /** - * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the - * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the - * throwable did not cause this future to fail, and it is the first time we've seen that - * particular Throwable. - */ - private void handleException(Throwable throwable) { - checkNotNull(throwable); + /** + * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the + * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the + * throwable did not cause this future to fail, and it is the first time we've seen that + * particular Throwable. + */ + private void handleException(Throwable throwable) { + checkNotNull(throwable); - boolean completedWithFailure = false; - boolean firstTimeSeeingThisException = true; - if (allMustSucceed) { - // As soon as the first one fails, throw the exception up. - // The result of all other inputs is then ignored. - completedWithFailure = setException(throwable); - if (completedWithFailure) { - releaseResourcesAfterFailure(); - } else { - // Go up the causal chain to see if we've already seen this cause; if we have, even if - // it's wrapped by a different exception, don't log it. - firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (allMustSucceed) { + // As soon as the first one fails, make that failure the result of the output future. + // The results of all other inputs are then ignored (except for logging any failures). + boolean completedWithFailure = setException(throwable); + if (!completedWithFailure) { + // Go up the causal chain to see if we've already seen this cause; if we have, even if + // it's wrapped by a different exception, don't log it. + boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); + if (firstTimeSeeingThisException) { + log(throwable); + return; } } - - // | and & used because it's faster than the branch required for || and && - if (throwable instanceof Error - | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) { - String message = - (throwable instanceof Error) - ? "Input Future failed with Error" - : "Got more than one input Future failure. Logging failures after the first"; - logger.log(Level.SEVERE, message, throwable); - } } - @Override - final void addInitialException(Set<Throwable> seen) { - if (!isCancelled()) { - // TODO(cpovirk): Think about whether we could/should use Verify to check this. - boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); - } + /* + * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call + * handleException() at all. + */ + if (throwable instanceof Error) { + /* + * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it + * returned true? This was intentional (CL 46470009), but it seems odd compared to how we + * normally handle Error. + * + * Similarly, do we really want to log the same Error more than once? + */ + log(throwable); } + } - /** Handles the input at the given index completing. */ - private void handleOneInputDone(int index, Future<? extends InputT> future) { - // The only cases in which this Future should already be done are (a) if it was cancelled or - // (b) if an input failed and we propagated that immediately because of allMustSucceed. - checkState( - allMustSucceed || !isDone() || isCancelled(), - "Future was done before all dependencies completed"); + private static void log(Throwable throwable) { + String message = + (throwable instanceof Error) + ? "Input Future failed with Error" + : "Got more than one input Future failure. Logging failures after the first"; + logger.log(SEVERE, message, throwable); + } - try { - checkState(future.isDone(), "Tried to set value from future which is not done"); - if (allMustSucceed) { - if (future.isCancelled()) { - // clear running state prior to cancelling children, this sets our own state but lets - // the input futures keep running as some of them may be used elsewhere. - runningState = null; - cancel(false); - } else { - // We always get the result so that we can have fail-fast, even if we don't collect - InputT result = getDone(future); - if (collectsValues) { - collectOneValue(allMustSucceed, index, result); - } - } - } else if (collectsValues && !future.isCancelled()) { - collectOneValue(allMustSucceed, index, getDone(future)); - } - } catch (ExecutionException e) { - handleException(e.getCause()); - } catch (Throwable t) { - handleException(t); - } + @Override + final void addInitialException(Set<Throwable> seen) { + checkNotNull(seen); + if (!isCancelled()) { + // TODO(cpovirk): Think about whether we could/should use Verify to check this. + boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure()); } + } - private void decrementCountAndMaybeComplete() { - int newRemaining = decrementRemainingAndGet(); - checkState(newRemaining >= 0, "Less than 0 remaining futures"); - if (newRemaining == 0) { - processCompleted(); - } + /** + * Collects the result (success or failure) of one input future. The input must not have been + * cancelled. For details on when this is called, see {@link #collectOneValue}. + */ + private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) { + try { + // We get the result, even if collectOneValue is a no-op, so that we can fail fast. + collectOneValue(index, getDone(future)); + } catch (ExecutionException e) { + handleException(e.getCause()); + } catch (Throwable t) { + handleException(t); } + } - private void processCompleted() { - // Collect the values if (a) our output requires collecting them and (b) we haven't been - // collecting them as we go. (We've collected them as we go only if we needed to fail fast) - if (collectsValues & !allMustSucceed) { - int i = 0; - for (ListenableFuture<? extends InputT> listenable : futures) { - handleOneInputDone(i++, listenable); + private void decrementCountAndMaybeComplete( + @Nullable + ImmutableCollection<? extends Future<? extends InputT>> + futuresIfNeedToCollectAtCompletion) { + int newRemaining = decrementRemainingAndGet(); + checkState(newRemaining >= 0, "Less than 0 remaining futures"); + if (newRemaining == 0) { + processCompleted(futuresIfNeedToCollectAtCompletion); + } + } + + private void processCompleted( + @Nullable + ImmutableCollection<? extends Future<? extends InputT>> + futuresIfNeedToCollectAtCompletion) { + if (futuresIfNeedToCollectAtCompletion != null) { + int i = 0; + for (Future<? extends InputT> future : futuresIfNeedToCollectAtCompletion) { + if (!future.isCancelled()) { + collectValueFromNonCancelledFuture(i, future); } + i++; } - handleAllCompleted(); } - - /** - * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we - * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we - * immediately release resources we no longer need); additionally, the future will release its - * reference to {@link RunningState}, which should free all associated memory when all the - * futures complete and the listeners are released. - * - * <p>TODO(user): Write tests for memory retention + clearSeenExceptions(); + handleAllCompleted(); + /* + * Null out fields, including some used in handleAllCompleted() above (like + * `CollectionFuture.values`). This might be a no-op: If this future completed during + * handleAllCompleted(), they will already have been nulled out. But in the case of + * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in + * the case of callAsync(), which waits for the callback's returned future to complete. */ - @ForOverride - @OverridingMethodsMustInvokeSuper - void releaseResourcesAfterFailure() { - this.futures = null; - } + releaseResources(ALL_INPUT_FUTURES_PROCESSED); + } - /** - * Called only if {@code collectsValues} is true. - * - * <p>If {@code allMustSucceed} is true, called as each future completes; otherwise, called for - * each future when all futures complete. + /** + * Clears fields that are no longer needed after this future has completed -- or at least all its + * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called). + * Often called multiple times (that is, both when the inputs complete and when the output + * completes). + * + * <p>This is similar to our proposed {@code afterCommit} method but not quite the same. See the + * description of CL 265462958. + */ + // TODO(user): Write more tests for memory retention. + @ForOverride + @OverridingMethodsMustInvokeSuper + void releaseResources(ReleaseResourcesReason reason) { + checkNotNull(reason); + /* + * All elements of `futures` are completed, or this future has already completed and read + * `futures` into a local variable (in preparation for propagating cancellation to them). In + * either case, no one needs to read `futures` for cancellation purposes later. (And + * cancellation purposes are the main reason to access `futures`, as discussed in its docs.) */ - abstract void collectOneValue(boolean allMustSucceed, int index, @Nullable InputT returnValue); - - abstract void handleAllCompleted(); + this.futures = null; + } - void interruptTask() {} + enum ReleaseResourcesReason { + OUTPUT_FUTURE_DONE, + ALL_INPUT_FUTURES_PROCESSED, } + /** + * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code + * collectsValues} is true, called for each future when all futures complete. + */ + abstract void collectOneValue(int index, @Nullable InputT returnValue); + + abstract void handleAllCompleted(); + /** Adds the chain to the seen set, and returns whether all the chain was new to us. */ private static boolean addCausalChain(Set<Throwable> seen, Throwable t) { for (; t != null; t = t.getCause()) { |