aboutsummaryrefslogtreecommitdiff
path: root/android/guava/src/com/google/common/util/concurrent/AggregateFuture.java
blob: ec96cbb89e4120a9ce262c2dbb76535996d9c263 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
/*
 * Copyright (C) 2006 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.SEVERE;

import com.google.common.annotations.GwtCompatible;
import com.google.common.collect.ImmutableCollection;
import com.google.errorprone.annotations.ForOverride;
import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
 * A future whose value is derived from a collection of input futures.
 *
 * @param <InputT> the type of the individual inputs
 * @param <OutputT> the type of the output (i.e. this) future
 */
@GwtCompatible
@ElementTypesAreNonnullByDefault
abstract class AggregateFuture<InputT extends @Nullable Object, OutputT extends @Nullable Object>
    extends AggregateFutureState<OutputT> {
  private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName());

  /**
   * The input futures. After {@link #init}, this field is read only by {@link #afterDone()} (to
   * propagate cancellation) and {@link #toString()}. To access the futures' <i>values</i>, {@code
   * AggregateFuture} attaches listeners that hold references to one or more inputs. And in the case
   * of {@link CombinedFuture}, the user-supplied callback usually has its own references to inputs.
   */
  /*
   * In certain circumstances, this field might theoretically not be visible to an afterDone() call
   * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture.
   */
  @CheckForNull private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures;

  private final boolean allMustSucceed;
  private final boolean collectsValues;

  AggregateFuture(
      ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures,
      boolean allMustSucceed,
      boolean collectsValues) {
    super(futures.size());
    this.futures = checkNotNull(futures);
    this.allMustSucceed = allMustSucceed;
    this.collectsValues = collectsValues;
  }

  @Override
  protected final void afterDone() {
    super.afterDone();

    ImmutableCollection<? extends Future<?>> localFutures = futures;
    releaseResources(OUTPUT_FUTURE_DONE); // nulls out `futures`

    if (isCancelled() & localFutures != null) {
      boolean wasInterrupted = wasInterrupted();
      for (Future<?> future : localFutures) {
        future.cancel(wasInterrupted);
      }
    }
    /*
     * We don't call clearSeenExceptions() until processCompleted(). Prior to that, it may be needed
     * again if some outstanding input fails.
     */
  }

  @Override
  @CheckForNull
  protected final String pendingToString() {
    ImmutableCollection<? extends Future<?>> localFutures = futures;
    if (localFutures != null) {
      return "futures=" + localFutures;
    }
    return super.pendingToString();
  }

  /**
   * Must be called at the end of each subclass's constructor. This method performs the "real"
   * initialization; we can't put this in the constructor because, in the case where futures are
   * already complete, we would not initialize the subclass before calling {@link
   * #collectValueFromNonCancelledFuture}. As this is called after the subclass is constructed,
   * we're guaranteed to have properly initialized the subclass.
   */
  final void init() {
    /*
     * requireNonNull is safe because this is called from the constructor after `futures` is set but
     * before releaseResources could be called (because we have not yet set up any of the listeners
     * that could call it, nor exposed this Future for users to call cancel() on).
     */
    requireNonNull(futures);

    // Corner case: List is empty.
    if (futures.isEmpty()) {
      handleAllCompleted();
      return;
    }

    // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll
    // need to handle RejectedExecutionException

    if (allMustSucceed) {
      // We need fail fast, so we have to keep track of which future failed so we can propagate
      // the exception immediately

      // Register a listener on each Future in the list to update the state of this future.
      // Note that if all the futures on the list are done prior to completing this loop, the last
      // call to addListener() will callback to setOneValue(), transitively call our cleanup
      // listener, and set this.futures to null.
      // This is not actually a problem, since the foreach only needs this.futures to be non-null
      // at the beginning of the loop.
      int i = 0;
      for (ListenableFuture<? extends InputT> future : futures) {
        int index = i++;
        future.addListener(
            () -> {
              try {
                if (future.isCancelled()) {
                  // Clear futures prior to cancelling children. This sets our own state but lets
                  // the input futures keep running, as some of them may be used elsewhere.
                  futures = null;
                  cancel(false);
                } else {
                  collectValueFromNonCancelledFuture(index, future);
                }
              } finally {
                /*
                 * "null" means: There is no need to access `futures` again during
                 * `processCompleted` because we're reading each value during a call to
                 * handleOneInputDone.
                 */
                decrementCountAndMaybeComplete(null);
              }
            },
            directExecutor());
      }
    } else {
      /*
       * We'll call the user callback or collect the values only when all inputs complete,
       * regardless of whether some failed. This lets us avoid calling expensive methods like
       * Future.get() when we don't need to (specifically, for whenAllComplete().call*()), and it
       * lets all futures share the same listener.
       *
       * We store `localFutures` inside the listener because `this.futures` might be nulled out by
       * the time the listener runs for the final future -- at which point we need to check all
       * inputs for exceptions *if* we're collecting values. If we're not, then the listener doesn't
       * need access to the futures again, so we can just pass `null`.
       *
       * TODO(b/112550045): Allocating a single, cheaper listener is (I think) only an optimization.
       * If we make some other optimizations, this one will no longer be necessary. The optimization
       * could actually hurt in some cases, as it forces us to keep all inputs in memory until the
       * final input completes.
       */
      ImmutableCollection<? extends Future<? extends InputT>> localFutures =
          collectsValues ? futures : null;
      Runnable listener = () -> decrementCountAndMaybeComplete(localFutures);
      for (ListenableFuture<? extends InputT> future : futures) {
        future.addListener(listener, directExecutor());
      }
    }
  }

  /**
   * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
   * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
   * throwable did not cause this future to fail, and it is the first time we've seen that
   * particular Throwable.
   */
  private void handleException(Throwable throwable) {
    checkNotNull(throwable);

    if (allMustSucceed) {
      // As soon as the first one fails, make that failure the result of the output future.
      // The results of all other inputs are then ignored (except for logging any failures).
      boolean completedWithFailure = setException(throwable);
      if (!completedWithFailure) {
        // Go up the causal chain to see if we've already seen this cause; if we have, even if
        // it's wrapped by a different exception, don't log it.
        boolean firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable);
        if (firstTimeSeeingThisException) {
          log(throwable);
          return;
        }
      }
    }

    /*
     * TODO(cpovirk): Should whenAllComplete().call*() log errors, too? Currently, it doesn't call
     * handleException() at all.
     */
    if (throwable instanceof Error) {
      /*
       * TODO(cpovirk): Do we really want to log this if we called setException(throwable) and it
       * returned true? This was intentional (CL 46470009), but it seems odd compared to how we
       * normally handle Error.
       *
       * Similarly, do we really want to log the same Error more than once?
       */
      log(throwable);
    }
  }

  private static void log(Throwable throwable) {
    String message =
        (throwable instanceof Error)
            ? "Input Future failed with Error"
            : "Got more than one input Future failure. Logging failures after the first";
    logger.log(SEVERE, message, throwable);
  }

  @Override
  final void addInitialException(Set<Throwable> seen) {
    checkNotNull(seen);
    if (!isCancelled()) {
      /*
       * requireNonNull is safe because:
       *
       * - This is a TrustedFuture, so tryInternalFastPathGetFailure will in fact return the failure
       *   cause if this Future has failed.
       *
       * - And this future *has* failed: This method is called only from handleException (through
       *   getOrInitSeenExceptions). handleException tried to call setException and failed, so
       *   either this Future was cancelled (which we ruled out with the isCancelled check above),
       *   or it had already failed. (It couldn't have completed *successfully* or even had
       *   setFuture called on it: Neither of those can happen until we've finished processing all
       *   the completed inputs. And we're still processing at least one input, the one that
       *   triggered handleException.)
       *
       * TODO(cpovirk): Think about whether we could/should use Verify to check the return value of
       * addCausalChain.
       */
      boolean unused = addCausalChain(seen, requireNonNull(tryInternalFastPathGetFailure()));
    }
  }

  /**
   * Collects the result (success or failure) of one input future. The input must not have been
   * cancelled. For details on when this is called, see {@link #collectOneValue}.
   */
  private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) {
    try {
      // We get the result, even if collectOneValue is a no-op, so that we can fail fast.
      collectOneValue(index, getDone(future));
    } catch (ExecutionException e) {
      handleException(e.getCause());
    } catch (RuntimeException | Error t) {
      handleException(t);
    }
  }

  private void decrementCountAndMaybeComplete(
      @CheckForNull
          ImmutableCollection<? extends Future<? extends InputT>>
              futuresIfNeedToCollectAtCompletion) {
    int newRemaining = decrementRemainingAndGet();
    checkState(newRemaining >= 0, "Less than 0 remaining futures");
    if (newRemaining == 0) {
      processCompleted(futuresIfNeedToCollectAtCompletion);
    }
  }

  private void processCompleted(
      @CheckForNull
          ImmutableCollection<? extends Future<? extends InputT>>
              futuresIfNeedToCollectAtCompletion) {
    if (futuresIfNeedToCollectAtCompletion != null) {
      int i = 0;
      for (Future<? extends InputT> future : futuresIfNeedToCollectAtCompletion) {
        if (!future.isCancelled()) {
          collectValueFromNonCancelledFuture(i, future);
        }
        i++;
      }
    }
    clearSeenExceptions();
    handleAllCompleted();
    /*
     * Null out fields, including some used in handleAllCompleted() above (like
     * `CollectionFuture.values`). This might be a no-op: If this future completed during
     * handleAllCompleted(), they will already have been nulled out. But in the case of
     * whenAll*().call*(), this future may be pending until the callback runs -- or even longer in
     * the case of callAsync(), which waits for the callback's returned future to complete.
     */
    releaseResources(ALL_INPUT_FUTURES_PROCESSED);
  }

  /**
   * Clears fields that are no longer needed after this future has completed -- or at least all its
   * inputs have completed (more precisely, after {@link #handleAllCompleted()} has been called).
   * Often called multiple times (that is, both when the inputs complete and when the output
   * completes).
   *
   * <p>This is similar to our proposed {@code afterCommit} method but not quite the same. See the
   * description of CL 265462958.
   */
  // TODO(user): Write more tests for memory retention.
  @ForOverride
  @OverridingMethodsMustInvokeSuper
  void releaseResources(ReleaseResourcesReason reason) {
    checkNotNull(reason);
    /*
     * All elements of `futures` are completed, or this future has already completed and read
     * `futures` into a local variable (in preparation for propagating cancellation to them). In
     * either case, no one needs to read `futures` for cancellation purposes later. (And
     * cancellation purposes are the main reason to access `futures`, as discussed in its docs.)
     */
    this.futures = null;
  }

  enum ReleaseResourcesReason {
    OUTPUT_FUTURE_DONE,
    ALL_INPUT_FUTURES_PROCESSED,
  }

  /**
   * If {@code allMustSucceed} is true, called as each future completes; otherwise, if {@code
   * collectsValues} is true, called for each future when all futures complete.
   */
  abstract void collectOneValue(int index, @ParametricNullness InputT returnValue);

  abstract void handleAllCompleted();

  /** Adds the chain to the seen set, and returns whether all the chain was new to us. */
  private static boolean addCausalChain(Set<Throwable> seen, Throwable param) {
    // Declare a "true" local variable so that the Checker Framework will infer nullness.
    Throwable t = param;

    for (; t != null; t = t.getCause()) {
      boolean firstTimeSeen = seen.add(t);
      if (!firstTimeSeen) {
        /*
         * We've seen this, so we've seen its causes, too. No need to re-add them. (There's one case
         * where this isn't true, but we ignore it: If we record an exception, then someone calls
         * initCause() on it, and then we examine it again, we'll conclude that we've seen the whole
         * chain before when in fact we haven't. But this should be rare.)
         */
        return false;
      }
    }
    return true;
  }
}