diff options
author | chegar <none@none> | 2013-08-02 14:29:31 +0100 |
---|---|---|
committer | chegar <none@none> | 2013-08-02 14:29:31 +0100 |
commit | c5941a9d1ca32a65bab71d5acd7f5d1c85c63643 (patch) | |
tree | 623064e306b166f91fcc51008b5eb66f68715c4d /src/share/classes/java/util/concurrent | |
parent | 85db95678f7e0ae4bffcb65b17e8c24a5d1c3642 (diff) | |
download | jdk8u_jdk-c5941a9d1ca32a65bab71d5acd7f5d1c85c63643.tar.gz |
8020291: j.u.c.CompletionStage
8020435: CompletableFuture/Basic.java fails on single core machine
Reviewed-by: chegar, psandoz
Contributed-by: Doug Lea <dl@cs.oswego.edu>
Diffstat (limited to 'src/share/classes/java/util/concurrent')
-rw-r--r-- | src/share/classes/java/util/concurrent/CompletableFuture.java | 1861 | ||||
-rw-r--r-- | src/share/classes/java/util/concurrent/CompletionStage.java | 760 |
2 files changed, 1543 insertions, 1078 deletions
diff --git a/src/share/classes/java/util/concurrent/CompletableFuture.java b/src/share/classes/java/util/concurrent/CompletableFuture.java index 7926f2e63b..5e2fb134fc 100644 --- a/src/share/classes/java/util/concurrent/CompletableFuture.java +++ b/src/share/classes/java/util/concurrent/CompletableFuture.java @@ -48,13 +48,16 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; /** * A {@link Future} that may be explicitly completed (setting its - * value and status), and may include dependent functions and actions - * that trigger upon its completion. + * value and status), and may be used as a {@link CompletionStage}, + * supporting dependent functions and actions that trigger upon its + * completion. * * <p>When two or more threads attempt to * {@link #complete complete}, @@ -62,64 +65,50 @@ import java.util.concurrent.locks.LockSupport; * {@link #cancel cancel} * a CompletableFuture, only one of them succeeds. * - * <p>Methods are available for adding dependents based on - * user-provided Functions, Consumers, or Runnables. The appropriate - * form to use depends on whether actions require arguments and/or - * produce results. Completion of a dependent action will trigger the - * completion of another CompletableFuture. Actions may also be - * triggered after either or both the current and another - * CompletableFuture complete. Multiple CompletableFutures may also - * be grouped as one using {@link #anyOf(CompletableFuture...)} and - * {@link #allOf(CompletableFuture...)}. + * <p>In addition to these and related methods for directly + * manipulating status and results, CompletableFuture implements + * interface {@link CompletionStage} with the following policies: <ul> * - * <p>CompletableFutures themselves do not execute asynchronously. - * However, actions supplied for dependent completions of another - * CompletableFuture may do so, depending on whether they are provided - * via one of the <em>async</em> methods (that is, methods with names - * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em> - * methods provide a way to commence asynchronous processing of an - * action using either a given {@link Executor} or by default the - * {@link ForkJoinPool#commonPool()}. To simplify monitoring, + * <li>Actions supplied for dependent completions of + * <em>non-async</em> methods may be performed by the thread that + * completes the current CompletableFuture, or by any other caller of + * a completion method.</li> + * + * <li>All <em>async</em> methods without an explicit Executor + * argument are performed using the {@link ForkJoinPool#commonPool()} + * (unless it does not support a parallelism level of at least two, in + * which case, a new Thread is used). To simplify monitoring, * debugging, and tracking, all generated asynchronous tasks are - * instances of the marker interface {@link AsynchronousCompletionTask}. + * instances of the marker interface {@link + * AsynchronousCompletionTask}. </li> * - * <p>Actions supplied for dependent completions of <em>non-async</em> - * methods may be performed by the thread that completes the current - * CompletableFuture, or by any other caller of these methods. There - * are no guarantees about the order of processing completions unless - * constrained by these methods. + * <li>All CompletionStage methods are implemented independently of + * other public methods, so the behavior of one method is not impacted + * by overrides of others in subclasses. </li> </ul> * - * <p>Since (unlike {@link FutureTask}) this class has no direct - * control over the computation that causes it to be completed, - * cancellation is treated as just another form of exceptional completion. - * Method {@link #cancel cancel} has the same effect as - * {@code completeExceptionally(new CancellationException())}. + * <p>CompletableFuture also implements {@link Future} with the following + * policies: <ul> * - * <p>Upon exceptional completion (including cancellation), or when a - * completion entails an additional computation which terminates - * abruptly with an (unchecked) exception or error, then all of their - * dependent completions (and their dependents in turn) generally act - * as {@code completeExceptionally} with a {@link CompletionException} - * holding that exception as its cause. However, the {@link - * #exceptionally exceptionally} and {@link #handle handle} - * completions <em>are</em> able to handle exceptional completions of - * the CompletableFutures they depend on. + * <li>Since (unlike {@link FutureTask}) this class has no direct + * control over the computation that causes it to be completed, + * cancellation is treated as just another form of exceptional + * completion. Method {@link #cancel cancel} has the same effect as + * {@code completeExceptionally(new CancellationException())}. Method + * {@link #isCompletedExceptionally} can be used to determine if a + * CompletableFuture completed in any exceptional fashion.</li> * - * <p>In case of exceptional completion with a CompletionException, + * <li>In case of exceptional completion with a CompletionException, * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an * {@link ExecutionException} with the same cause as held in the - * corresponding CompletionException. However, in these cases, - * methods {@link #join()} and {@link #getNow} throw the - * CompletionException, which simplifies usage. - * - * <p>Arguments used to pass a completion result (that is, for parameters - * of type {@code T}) may be null, but passing a null value for any other - * parameter will result in a {@link NullPointerException} being thrown. + * corresponding CompletionException. To simplify usage in most + * contexts, this class also defines methods {@link #join()} and + * {@link #getNow} that instead throw the CompletionException directly + * in these cases.</li> </ul> * * @author Doug Lea * @since 1.8 */ -public class CompletableFuture<T> implements Future<T> { +public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { /* * Overview: @@ -438,6 +427,19 @@ public class CompletableFuture<T> implements Future<T> { public final void run() { exec(); } } + /** + * Starts the given async task using the given executor, unless + * the executor is ForkJoinPool.commonPool and it has been + * disabled, in which case starts a new thread. + */ + static void execAsync(Executor e, Async r) { + if (e == ForkJoinPool.commonPool() && + ForkJoinPool.getCommonPoolParallelism() <= 1) + new Thread(r).start(); + else + e.execute(r); + } + static final class AsyncRun extends Async { final Runnable fn; final CompletableFuture<Void> dst; @@ -538,13 +540,13 @@ public class CompletableFuture<T> implements Future<T> { static final class AsyncAccept<T> extends Async { final T arg; final Consumer<? super T> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; AsyncAccept(T arg, Consumer<? super T> fn, - CompletableFuture<Void> dst) { + CompletableFuture<?> dst) { this.arg = arg; this.fn = fn; this.dst = dst; } public final boolean exec() { - CompletableFuture<Void> d; Throwable ex; + CompletableFuture<?> d; Throwable ex; if ((d = this.dst) != null && d.result == null) { try { fn.accept(arg); @@ -563,14 +565,14 @@ public class CompletableFuture<T> implements Future<T> { final T arg1; final U arg2; final BiConsumer<? super T,? super U> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; AsyncAcceptBoth(T arg1, U arg2, BiConsumer<? super T,? super U> fn, - CompletableFuture<Void> dst) { + CompletableFuture<?> dst) { this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; } public final boolean exec() { - CompletableFuture<Void> d; Throwable ex; + CompletableFuture<?> d; Throwable ex; if ((d = this.dst) != null && d.result == null) { try { fn.accept(arg1, arg2); @@ -587,10 +589,10 @@ public class CompletableFuture<T> implements Future<T> { static final class AsyncCompose<T,U> extends Async { final T arg; - final Function<? super T, CompletableFuture<U>> fn; + final Function<? super T, ? extends CompletionStage<U>> fn; final CompletableFuture<U> dst; AsyncCompose(T arg, - Function<? super T, CompletableFuture<U>> fn, + Function<? super T, ? extends CompletionStage<U>> fn, CompletableFuture<U> dst) { this.arg = arg; this.fn = fn; this.dst = dst; } @@ -598,7 +600,8 @@ public class CompletableFuture<T> implements Future<T> { CompletableFuture<U> d, fr; U u; Throwable ex; if ((d = this.dst) != null && d.result == null) { try { - fr = fn.apply(arg); + CompletionStage<U> cs = fn.apply(arg); + fr = (cs == null) ? null : cs.toCompletableFuture(); ex = (fr == null) ? new NullPointerException() : null; } catch (Throwable rex) { ex = rex; @@ -626,6 +629,33 @@ public class CompletableFuture<T> implements Future<T> { private static final long serialVersionUID = 5232453952276885070L; } + static final class AsyncWhenComplete<T> extends Async { + final T arg1; + final Throwable arg2; + final BiConsumer<? super T,? super Throwable> fn; + final CompletableFuture<T> dst; + AsyncWhenComplete(T arg1, Throwable arg2, + BiConsumer<? super T,? super Throwable> fn, + CompletableFuture<T> dst) { + this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst; + } + public final boolean exec() { + CompletableFuture<T> d; + if ((d = this.dst) != null && d.result == null) { + Throwable ex = arg2; + try { + fn.accept(arg1, ex); + } catch (Throwable rex) { + if (ex == null) + ex = rex; + } + d.internalComplete(arg1, ex); + } + return true; + } + private static final long serialVersionUID = 5232453952276885070L; + } + /* ------------- Completions -------------- */ /** @@ -680,7 +710,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); else u = fn.apply(t); } catch (Throwable rex) { @@ -697,11 +727,11 @@ public class CompletableFuture<T> implements Future<T> { static final class ThenAccept<T> extends Completion { final CompletableFuture<? extends T> src; final Consumer<? super T> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; final Executor executor; ThenAccept(CompletableFuture<? extends T> src, Consumer<? super T> fn, - CompletableFuture<Void> dst, + CompletableFuture<?> dst, Executor executor) { this.src = src; this.fn = fn; this.dst = dst; this.executor = executor; @@ -709,7 +739,7 @@ public class CompletableFuture<T> implements Future<T> { public final void run() { final CompletableFuture<? extends T> a; final Consumer<? super T> fn; - final CompletableFuture<Void> dst; + final CompletableFuture<?> dst; Object r; T t; Throwable ex; if ((dst = this.dst) != null && (fn = this.fn) != null && @@ -729,7 +759,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); + execAsync(e, new AsyncAccept<T>(t, fn, dst)); else fn.accept(t); } catch (Throwable rex) { @@ -773,7 +803,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(fn, dst)); + execAsync(e, new AsyncRun(fn, dst)); else fn.run(); } catch (Throwable rex) { @@ -839,7 +869,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst)); + execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); else v = fn.apply(t, u); } catch (Throwable rex) { @@ -904,7 +934,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst)); + execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); else fn.accept(t, u); } catch (Throwable rex) { @@ -956,7 +986,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(fn, dst)); + execAsync(e, new AsyncRun(fn, dst)); else fn.run(); } catch (Throwable rex) { @@ -1042,7 +1072,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); else u = fn.apply(t); } catch (Throwable rex) { @@ -1095,7 +1125,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); + execAsync(e, new AsyncAccept<T>(t, fn, dst)); else fn.accept(t); } catch (Throwable rex) { @@ -1143,7 +1173,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(fn, dst)); + execAsync(e, new AsyncRun(fn, dst)); else fn.run(); } catch (Throwable rex) { @@ -1226,6 +1256,54 @@ public class CompletableFuture<T> implements Future<T> { private static final long serialVersionUID = 5232453952276885070L; } + static final class WhenCompleteCompletion<T> extends Completion { + final CompletableFuture<? extends T> src; + final BiConsumer<? super T, ? super Throwable> fn; + final CompletableFuture<T> dst; + final Executor executor; + WhenCompleteCompletion(CompletableFuture<? extends T> src, + BiConsumer<? super T, ? super Throwable> fn, + CompletableFuture<T> dst, + Executor executor) { + this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; + } + public final void run() { + final CompletableFuture<? extends T> a; + final BiConsumer<? super T, ? super Throwable> fn; + final CompletableFuture<T> dst; + Object r; T t; Throwable ex; + if ((dst = this.dst) != null && + (fn = this.fn) != null && + (a = this.src) != null && + (r = a.result) != null && + compareAndSet(0, 1)) { + if (r instanceof AltResult) { + ex = ((AltResult)r).ex; + t = null; + } + else { + ex = null; + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + Executor e = executor; + Throwable dx = null; + try { + if (e != null) + execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); + else + fn.accept(t, ex); + } catch (Throwable rex) { + dx = rex; + } + if (e == null || dx != null) + dst.internalComplete(t, ex != null ? ex : dx); + } + } + private static final long serialVersionUID = 5232453952276885070L; + } + static final class ThenCopy<T> extends Completion { final CompletableFuture<?> src; final CompletableFuture<T> dst; @@ -1286,10 +1364,13 @@ public class CompletableFuture<T> implements Future<T> { final CompletableFuture<? extends T> src; final BiFunction<? super T, Throwable, ? extends U> fn; final CompletableFuture<U> dst; + final Executor executor; HandleCompletion(CompletableFuture<? extends T> src, BiFunction<? super T, Throwable, ? extends U> fn, - CompletableFuture<U> dst) { + CompletableFuture<U> dst, + Executor executor) { this.src = src; this.fn = fn; this.dst = dst; + this.executor = executor; } public final void run() { final CompletableFuture<? extends T> a; @@ -1310,13 +1391,19 @@ public class CompletableFuture<T> implements Future<T> { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; } - U u = null; Throwable dx = null; + Executor e = executor; + U u = null; + Throwable dx = null; try { - u = fn.apply(t, ex); + if (e != null) + execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); + else + u = fn.apply(t, ex); } catch (Throwable rex) { dx = rex; } - dst.internalComplete(u, dx); + if (e == null || dx != null) + dst.internalComplete(u, dx); } } private static final long serialVersionUID = 5232453952276885070L; @@ -1324,11 +1411,11 @@ public class CompletableFuture<T> implements Future<T> { static final class ThenCompose<T,U> extends Completion { final CompletableFuture<? extends T> src; - final Function<? super T, CompletableFuture<U>> fn; + final Function<? super T, ? extends CompletionStage<U>> fn; final CompletableFuture<U> dst; final Executor executor; ThenCompose(CompletableFuture<? extends T> src, - Function<? super T, CompletableFuture<U>> fn, + Function<? super T, ? extends CompletionStage<U>> fn, CompletableFuture<U> dst, Executor executor) { this.src = src; this.fn = fn; this.dst = dst; @@ -1336,7 +1423,7 @@ public class CompletableFuture<T> implements Future<T> { } public final void run() { final CompletableFuture<? extends T> a; - final Function<? super T, CompletableFuture<U>> fn; + final Function<? super T, ? extends CompletionStage<U>> fn; final CompletableFuture<U> dst; Object r; T t; Throwable ex; Executor e; if ((dst = this.dst) != null && @@ -1358,10 +1445,12 @@ public class CompletableFuture<T> implements Future<T> { boolean complete = false; if (ex == null) { if ((e = executor) != null) - e.execute(new AsyncCompose<T,U>(t, fn, dst)); + execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); else { try { - if ((c = fn.apply(t)) == null) + CompletionStage<U> cs = fn.apply(t); + c = (cs == null) ? null : cs.toCompletableFuture(); + if (c == null) ex = new NullPointerException(); } catch (Throwable rex) { ex = rex; @@ -1401,322 +1490,7 @@ public class CompletableFuture<T> implements Future<T> { private static final long serialVersionUID = 5232453952276885070L; } - // public methods - - /** - * Creates a new incomplete CompletableFuture. - */ - public CompletableFuture() { - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * by a task running in the {@link ForkJoinPool#commonPool()} with - * the value obtained by calling the given Supplier. - * - * @param supplier a function returning the value to be used - * to complete the returned CompletableFuture - * @return the new CompletableFuture - */ - public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { - if (supplier == null) throw new NullPointerException(); - CompletableFuture<U> f = new CompletableFuture<U>(); - ForkJoinPool.commonPool(). - execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f)); - return f; - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * by a task running in the given executor with the value obtained - * by calling the given Supplier. - * - * @param supplier a function returning the value to be used - * to complete the returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, - Executor executor) { - if (executor == null || supplier == null) - throw new NullPointerException(); - CompletableFuture<U> f = new CompletableFuture<U>(); - executor.execute(new AsyncSupply<U>(supplier, f)); - return f; - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * by a task running in the {@link ForkJoinPool#commonPool()} after - * it runs the given action. - * - * @param runnable the action to run before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public static CompletableFuture<Void> runAsync(Runnable runnable) { - if (runnable == null) throw new NullPointerException(); - CompletableFuture<Void> f = new CompletableFuture<Void>(); - ForkJoinPool.commonPool(). - execute((ForkJoinTask<?>)new AsyncRun(runnable, f)); - return f; - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * by a task running in the given executor after it runs the given - * action. - * - * @param runnable the action to run before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public static CompletableFuture<Void> runAsync(Runnable runnable, - Executor executor) { - if (executor == null || runnable == null) - throw new NullPointerException(); - CompletableFuture<Void> f = new CompletableFuture<Void>(); - executor.execute(new AsyncRun(runnable, f)); - return f; - } - - /** - * Returns a new CompletableFuture that is already completed with - * the given value. - * - * @param value the value - * @return the completed CompletableFuture - */ - public static <U> CompletableFuture<U> completedFuture(U value) { - CompletableFuture<U> f = new CompletableFuture<U>(); - f.result = (value == null) ? NIL : value; - return f; - } - - /** - * Returns {@code true} if completed in any fashion: normally, - * exceptionally, or via cancellation. - * - * @return {@code true} if completed - */ - public boolean isDone() { - return result != null; - } - - /** - * Waits if necessary for this future to complete, and then - * returns its result. - * - * @return the result value - * @throws CancellationException if this future was cancelled - * @throws ExecutionException if this future completed exceptionally - * @throws InterruptedException if the current thread was interrupted - * while waiting - */ - public T get() throws InterruptedException, ExecutionException { - Object r; Throwable ex, cause; - if ((r = result) == null && (r = waitingGet(true)) == null) - throw new InterruptedException(); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); - } - - /** - * Waits if necessary for at most the given time for this future - * to complete, and then returns its result, if available. - * - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @return the result value - * @throws CancellationException if this future was cancelled - * @throws ExecutionException if this future completed exceptionally - * @throws InterruptedException if the current thread was interrupted - * while waiting - * @throws TimeoutException if the wait timed out - */ - public T get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - Object r; Throwable ex, cause; - long nanos = unit.toNanos(timeout); - if (Thread.interrupted()) - throw new InterruptedException(); - if ((r = result) == null) - r = timedAwaitDone(nanos); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if ((ex instanceof CompletionException) && - (cause = ex.getCause()) != null) - ex = cause; - throw new ExecutionException(ex); - } - - /** - * Returns the result value when complete, or throws an - * (unchecked) exception if completed exceptionally. To better - * conform with the use of common functional forms, if a - * computation involved in the completion of this - * CompletableFuture threw an exception, this method throws an - * (unchecked) {@link CompletionException} with the underlying - * exception as its cause. - * - * @return the result value - * @throws CancellationException if the computation was cancelled - * @throws CompletionException if this future completed - * exceptionally or a completion computation threw an exception - */ - public T join() { - Object r; Throwable ex; - if ((r = result) == null) - r = waitingGet(false); - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); - } - - /** - * Returns the result value (or throws any encountered exception) - * if completed, else returns the given valueIfAbsent. - * - * @param valueIfAbsent the value to return if not completed - * @return the result value, if completed, else the given valueIfAbsent - * @throws CancellationException if the computation was cancelled - * @throws CompletionException if this future completed - * exceptionally or a completion computation threw an exception - */ - public T getNow(T valueIfAbsent) { - Object r; Throwable ex; - if ((r = result) == null) - return valueIfAbsent; - if (!(r instanceof AltResult)) { - @SuppressWarnings("unchecked") T tr = (T) r; - return tr; - } - if ((ex = ((AltResult)r).ex) == null) - return null; - if (ex instanceof CancellationException) - throw (CancellationException)ex; - if (ex instanceof CompletionException) - throw (CompletionException)ex; - throw new CompletionException(ex); - } - - /** - * If not already completed, sets the value returned by {@link - * #get()} and related methods to the given value. - * - * @param value the result value - * @return {@code true} if this invocation caused this CompletableFuture - * to transition to a completed state, else {@code false} - */ - public boolean complete(T value) { - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, - value == null ? NIL : value); - postComplete(); - return triggered; - } - - /** - * If not already completed, causes invocations of {@link #get()} - * and related methods to throw the given exception. - * - * @param ex the exception - * @return {@code true} if this invocation caused this CompletableFuture - * to transition to a completed state, else {@code false} - */ - public boolean completeExceptionally(Throwable ex) { - if (ex == null) throw new NullPointerException(); - boolean triggered = result == null && - UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); - postComplete(); - return triggered; - } - - /** - * Returns a new CompletableFuture that is completed - * when this CompletableFuture completes, with the result of the - * given function of this CompletableFuture's result. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied function throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) { - return doThenApply(fn, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, with the result of the - * given function of this CompletableFuture's result from a - * task running in the {@link ForkJoinPool#commonPool()}. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied function throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> thenApplyAsync - (Function<? super T,? extends U> fn) { - return doThenApply(fn, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, with the result of the - * given function of this CompletableFuture's result from a - * task running in the given executor. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied function throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> thenApplyAsync - (Function<? super T,? extends U> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenApply(fn, executor); - } + // Implementations of stage methods with (plain, async, Executor) forms private <U> CompletableFuture<U> doThenApply (Function<? super T,? extends U> fn, @@ -1749,7 +1523,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); else u = fn.apply(t); } catch (Throwable rex) { @@ -1763,65 +1537,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when this CompletableFuture completes, after performing the given - * action with this CompletableFuture's result. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenAccept(Consumer<? super T> block) { - return doThenAccept(block, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action with this CompletableFuture's result from a task running - * in the {@link ForkJoinPool#commonPool()}. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) { - return doThenAccept(block, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action with this CompletableFuture's result from a task running - * in the given executor. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param block the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAccept(block, executor); - } - private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn, Executor e) { if (fn == null) throw new NullPointerException(); @@ -1851,7 +1566,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); + execAsync(e, new AsyncAccept<T>(t, fn, dst)); else fn.accept(t); } catch (Throwable rex) { @@ -1865,63 +1580,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when this CompletableFuture completes, after performing the given - * action. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenRun(Runnable action) { - return doThenRun(action, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action from a task running in the {@link ForkJoinPool#commonPool()}. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenRunAsync(Runnable action) { - return doThenRun(action, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when this CompletableFuture completes, after performing the given - * action from a task running in the given executor. - * - * <p>If this CompletableFuture completes exceptionally, or the - * supplied action throws an exception, then the returned - * CompletableFuture completes exceptionally with a - * CompletionException holding the exception as its cause. - * - * @param action the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> thenRunAsync(Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenRun(action, executor); - } - private CompletableFuture<Void> doThenRun(Runnable action, Executor e) { if (action == null) throw new NullPointerException(); @@ -1946,7 +1604,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(action, dst)); + execAsync(e, new AsyncRun(action, dst)); else action.run(); } catch (Throwable rex) { @@ -1960,76 +1618,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when both this and the other given CompletableFuture complete, - * with the result of the given function of the results of the two - * CompletableFutures. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied function throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U,V> CompletableFuture<V> thenCombine - (CompletableFuture<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn) { - return doThenCombine(other, fn, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * with the result of the given function of the results of the two - * CompletableFutures from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied function throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U,V> CompletableFuture<V> thenCombineAsync - (CompletableFuture<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn) { - return doThenCombine(other, fn, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * with the result of the given function of the results of the two - * CompletableFutures from a task running in the given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied function throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public <U,V> CompletableFuture<V> thenCombineAsync - (CompletableFuture<? extends U> other, - BiFunction<? super T,? super U,? extends V> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCombine(other, fn, executor); - } - private <U,V> CompletableFuture<V> doThenCombine (CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, @@ -2083,7 +1671,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst)); + execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst)); else v = fn.apply(t, u); } catch (Throwable rex) { @@ -2098,76 +1686,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when both this and the other given CompletableFuture complete, - * after performing the given action with the results of the two - * CompletableFutures. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<Void> thenAcceptBoth - (CompletableFuture<? extends U> other, - BiConsumer<? super T, ? super U> block) { - return doThenAcceptBoth(other, block, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action with the results of the two - * CompletableFutures from a task running in the {@link - * ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<Void> thenAcceptBothAsync - (CompletableFuture<? extends U> other, - BiConsumer<? super T, ? super U> block) { - return doThenAcceptBoth(other, block, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action with the results of the two - * CompletableFutures from a task running in the given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public <U> CompletableFuture<Void> thenAcceptBothAsync - (CompletableFuture<? extends U> other, - BiConsumer<? super T, ? super U> block, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenAcceptBoth(other, block, executor); - } - private <U> CompletableFuture<Void> doThenAcceptBoth (CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> fn, @@ -2220,7 +1738,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst)); + execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst)); else fn.accept(t, u); } catch (Throwable rex) { @@ -2235,71 +1753,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when both this and the other given CompletableFuture complete, - * after performing the given action. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, - Runnable action) { - return doRunAfterBoth(other, action, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, - Runnable action) { - return doRunAfterBoth(other, action, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when both this and the other given CompletableFuture complete, - * after performing the given action from a task running in the - * given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, or the supplied action throws an exception, - * then the returned CompletableFuture completes exceptionally - * with a CompletionException holding the exception as its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterBoth(other, action, executor); - } - private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other, Runnable action, Executor e) { @@ -2338,7 +1791,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(action, dst)); + execAsync(e, new AsyncRun(action, dst)); else action.run(); } catch (Throwable rex) { @@ -2353,89 +1806,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when either this or the other given CompletableFuture completes, - * with the result of the given function of either this or the other - * CompletableFuture's result. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied function - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> applyToEither - (CompletableFuture<? extends T> other, - Function<? super T, U> fn) { - return doApplyToEither(other, fn, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * with the result of the given function of either this or the other - * CompletableFuture's result from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied function - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> applyToEitherAsync - (CompletableFuture<? extends T> other, - Function<? super T, U> fn) { - return doApplyToEither(other, fn, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * with the result of the given function of either this or the other - * CompletableFuture's result from a task running in the - * given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied function - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param fn the function to use to compute the value of - * the returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> applyToEitherAsync - (CompletableFuture<? extends T> other, - Function<? super T, U> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doApplyToEither(other, fn, executor); - } - private <U> CompletableFuture<U> doApplyToEither (CompletableFuture<? extends T> other, Function<? super T, U> fn, @@ -2473,7 +1843,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncApply<T,U>(t, fn, dst)); + execAsync(e, new AsyncApply<T,U>(t, fn, dst)); else u = fn.apply(t); } catch (Throwable rex) { @@ -2488,89 +1858,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when either this or the other given CompletableFuture completes, - * after performing the given action with the result of either this - * or the other CompletableFuture's result. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> acceptEither - (CompletableFuture<? extends T> other, - Consumer<? super T> block) { - return doAcceptEither(other, block, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action with the result of either this - * or the other CompletableFuture's result from a task running in - * the {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> acceptEitherAsync - (CompletableFuture<? extends T> other, - Consumer<? super T> block) { - return doAcceptEither(other, block, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action with the result of either this - * or the other CompletableFuture's result from a task running in - * the given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param block the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> acceptEitherAsync - (CompletableFuture<? extends T> other, - Consumer<? super T> block, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doAcceptEither(other, block, executor); - } - private CompletableFuture<Void> doAcceptEither (CompletableFuture<? extends T> other, Consumer<? super T> fn, @@ -2607,7 +1894,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncAccept<T>(t, fn, dst)); + execAsync(e, new AsyncAccept<T>(t, fn, dst)); else fn.accept(t); } catch (Throwable rex) { @@ -2622,85 +1909,6 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed - * when either this or the other given CompletableFuture completes, - * after performing the given action. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, - Runnable action) { - return doRunAfterEither(other, action, null); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action from a task running in the - * {@link ForkJoinPool#commonPool()}. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterEitherAsync - (CompletableFuture<?> other, - Runnable action) { - return doRunAfterEither(other, action, ForkJoinPool.commonPool()); - } - - /** - * Returns a new CompletableFuture that is asynchronously completed - * when either this or the other given CompletableFuture completes, - * after performing the given action from a task running in the - * given executor. - * - * <p>If this and/or the other CompletableFuture complete - * exceptionally, then the returned CompletableFuture may also do so, - * with a CompletionException holding one of these exceptions as its - * cause. No guarantees are made about which result or exception is - * used in the returned CompletableFuture. If the supplied action - * throws an exception, then the returned CompletableFuture completes - * exceptionally with a CompletionException holding the exception as - * its cause. - * - * @param other the other CompletableFuture - * @param action the action to perform before completing the - * returned CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the new CompletableFuture - */ - public CompletableFuture<Void> runAfterEitherAsync - (CompletableFuture<?> other, - Runnable action, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doRunAfterEither(other, action, executor); - } - private CompletableFuture<Void> doRunAfterEither (CompletableFuture<?> other, Runnable action, @@ -2732,7 +1940,7 @@ public class CompletableFuture<T> implements Future<T> { if (ex == null) { try { if (e != null) - e.execute(new AsyncRun(action, dst)); + execAsync(e, new AsyncRun(action, dst)); else action.run(); } catch (Throwable rex) { @@ -2747,69 +1955,8 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a CompletableFuture that upon completion, has the same - * value as produced by the given function of the result of this - * CompletableFuture. - * - * <p>If this CompletableFuture completes exceptionally, then the - * returned CompletableFuture also does so, with a - * CompletionException holding this exception as its cause. - * Similarly, if the computed CompletableFuture completes - * exceptionally, then so does the returned CompletableFuture. - * - * @param fn the function returning a new CompletableFuture - * @return the CompletableFuture - */ - public <U> CompletableFuture<U> thenCompose - (Function<? super T, CompletableFuture<U>> fn) { - return doThenCompose(fn, null); - } - - /** - * Returns a CompletableFuture that upon completion, has the same - * value as that produced asynchronously using the {@link - * ForkJoinPool#commonPool()} by the given function of the result - * of this CompletableFuture. - * - * <p>If this CompletableFuture completes exceptionally, then the - * returned CompletableFuture also does so, with a - * CompletionException holding this exception as its cause. - * Similarly, if the computed CompletableFuture completes - * exceptionally, then so does the returned CompletableFuture. - * - * @param fn the function returning a new CompletableFuture - * @return the CompletableFuture - */ - public <U> CompletableFuture<U> thenComposeAsync - (Function<? super T, CompletableFuture<U>> fn) { - return doThenCompose(fn, ForkJoinPool.commonPool()); - } - - /** - * Returns a CompletableFuture that upon completion, has the same - * value as that produced asynchronously using the given executor - * by the given function of this CompletableFuture. - * - * <p>If this CompletableFuture completes exceptionally, then the - * returned CompletableFuture also does so, with a - * CompletionException holding this exception as its cause. - * Similarly, if the computed CompletableFuture completes - * exceptionally, then so does the returned CompletableFuture. - * - * @param fn the function returning a new CompletableFuture - * @param executor the executor to use for asynchronous execution - * @return the CompletableFuture - */ - public <U> CompletableFuture<U> thenComposeAsync - (Function<? super T, CompletableFuture<U>> fn, - Executor executor) { - if (executor == null) throw new NullPointerException(); - return doThenCompose(fn, executor); - } - private <U> CompletableFuture<U> doThenCompose - (Function<? super T, CompletableFuture<U>> fn, + (Function<? super T, ? extends CompletionStage<U>> fn, Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture<U> dst = null; @@ -2840,11 +1987,13 @@ public class CompletableFuture<T> implements Future<T> { if (e != null) { if (dst == null) dst = new CompletableFuture<U>(); - e.execute(new AsyncCompose<T,U>(t, fn, dst)); + execAsync(e, new AsyncCompose<T,U>(t, fn, dst)); } else { try { - if ((dst = fn.apply(t)) == null) + CompletionStage<U> cs = fn.apply(t); + if (cs == null || + (dst = cs.toCompletableFuture()) == null) ex = new NullPointerException(); } catch (Throwable rex) { ex = rex; @@ -2861,28 +2010,17 @@ public class CompletableFuture<T> implements Future<T> { return dst; } - /** - * Returns a new CompletableFuture that is completed when this - * CompletableFuture completes, with the result of the given - * function of the exception triggering this CompletableFuture's - * completion when it completes exceptionally; otherwise, if this - * CompletableFuture completes normally, then the returned - * CompletableFuture also completes normally with the same value. - * - * @param fn the function to use to compute the value of the - * returned CompletableFuture if this CompletableFuture completed - * exceptionally - * @return the new CompletableFuture - */ - public CompletableFuture<T> exceptionally - (Function<Throwable, ? extends T> fn) { + private CompletableFuture<T> doWhenComplete + (BiConsumer<? super T, ? super Throwable> fn, + Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture<T> dst = new CompletableFuture<T>(); - ExceptionCompletion<T> d = null; + WhenCompleteCompletion<T> d = null; Object r; if ((r = result) == null) { CompletionNode p = - new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst)); + new CompletionNode(d = new WhenCompleteCompletion<T> + (this, fn, dst, e)); while ((r = result) == null) { if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, p.next = completions, p)) @@ -2890,47 +2028,43 @@ public class CompletableFuture<T> implements Future<T> { } } if (r != null && (d == null || d.compareAndSet(0, 1))) { - T t = null; Throwable ex, dx = null; + T t; Throwable ex; if (r instanceof AltResult) { - if ((ex = ((AltResult)r).ex) != null) { - try { - t = fn.apply(ex); - } catch (Throwable rex) { - dx = rex; - } - } + ex = ((AltResult)r).ex; + t = null; } else { + ex = null; @SuppressWarnings("unchecked") T tr = (T) r; t = tr; } - dst.internalComplete(t, dx); + Throwable dx = null; + try { + if (e != null) + execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst)); + else + fn.accept(t, ex); + } catch (Throwable rex) { + dx = rex; + } + if (e == null || dx != null) + dst.internalComplete(t, ex != null ? ex : dx); } helpPostComplete(); return dst; } - /** - * Returns a new CompletableFuture that is completed when this - * CompletableFuture completes, with the result of the given - * function of the result and exception of this CompletableFuture's - * completion. The given function is invoked with the result (or - * {@code null} if none) and the exception (or {@code null} if none) - * of this CompletableFuture when complete. - * - * @param fn the function to use to compute the value of the - * returned CompletableFuture - * @return the new CompletableFuture - */ - public <U> CompletableFuture<U> handle - (BiFunction<? super T, Throwable, ? extends U> fn) { + private <U> CompletableFuture<U> doHandle + (BiFunction<? super T, Throwable, ? extends U> fn, + Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture<U> dst = new CompletableFuture<U>(); HandleCompletion<T,U> d = null; Object r; if ((r = result) == null) { CompletionNode p = - new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst)); + new CompletionNode(d = new HandleCompletion<T,U> + (this, fn, dst, e)); while ((r = result) == null) { if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, p.next = completions, p)) @@ -2948,21 +2082,577 @@ public class CompletableFuture<T> implements Future<T> { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; } - U u; Throwable dx; + U u = null; + Throwable dx = null; try { - u = fn.apply(t, ex); - dx = null; + if (e != null) + execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst)); + else { + u = fn.apply(t, ex); + dx = null; + } } catch (Throwable rex) { dx = rex; u = null; } - dst.internalComplete(u, dx); + if (e == null || dx != null) + dst.internalComplete(u, dx); } helpPostComplete(); return dst; } + // public methods + + /** + * Creates a new incomplete CompletableFuture. + */ + public CompletableFuture() { + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the {@link ForkJoinPool#commonPool()} with + * the value obtained by calling the given Supplier. + * + * @param supplier a function returning the value to be used + * to complete the returned CompletableFuture + * @param <U> the function's return type + * @return the new CompletableFuture + */ + public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { + if (supplier == null) throw new NullPointerException(); + CompletableFuture<U> f = new CompletableFuture<U>(); + execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the given executor with the value obtained + * by calling the given Supplier. + * + * @param supplier a function returning the value to be used + * to complete the returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new CompletableFuture + */ + public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, + Executor executor) { + if (executor == null || supplier == null) + throw new NullPointerException(); + CompletableFuture<U> f = new CompletableFuture<U>(); + execAsync(executor, new AsyncSupply<U>(supplier, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the {@link ForkJoinPool#commonPool()} after + * it runs the given action. + * + * @param runnable the action to run before completing the + * returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture<Void> runAsync(Runnable runnable) { + if (runnable == null) throw new NullPointerException(); + CompletableFuture<Void> f = new CompletableFuture<Void>(); + execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the given executor after it runs the given + * action. + * + * @param runnable the action to run before completing the + * returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + */ + public static CompletableFuture<Void> runAsync(Runnable runnable, + Executor executor) { + if (executor == null || runnable == null) + throw new NullPointerException(); + CompletableFuture<Void> f = new CompletableFuture<Void>(); + execAsync(executor, new AsyncRun(runnable, f)); + return f; + } + + /** + * Returns a new CompletableFuture that is already completed with + * the given value. + * + * @param value the value + * @param <U> the type of the value + * @return the completed CompletableFuture + */ + public static <U> CompletableFuture<U> completedFuture(U value) { + CompletableFuture<U> f = new CompletableFuture<U>(); + f.result = (value == null) ? NIL : value; + return f; + } + + /** + * Returns {@code true} if completed in any fashion: normally, + * exceptionally, or via cancellation. + * + * @return {@code true} if completed + */ + public boolean isDone() { + return result != null; + } + + /** + * Waits if necessary for this future to complete, and then + * returns its result. + * + * @return the result value + * @throws CancellationException if this future was cancelled + * @throws ExecutionException if this future completed exceptionally + * @throws InterruptedException if the current thread was interrupted + * while waiting + */ + public T get() throws InterruptedException, ExecutionException { + Object r; Throwable ex, cause; + if ((r = result) == null && (r = waitingGet(true)) == null) + throw new InterruptedException(); + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if ((ex instanceof CompletionException) && + (cause = ex.getCause()) != null) + ex = cause; + throw new ExecutionException(ex); + } + + /** + * Waits if necessary for at most the given time for this future + * to complete, and then returns its result, if available. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return the result value + * @throws CancellationException if this future was cancelled + * @throws ExecutionException if this future completed exceptionally + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws TimeoutException if the wait timed out + */ + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + Object r; Throwable ex, cause; + long nanos = unit.toNanos(timeout); + if (Thread.interrupted()) + throw new InterruptedException(); + if ((r = result) == null) + r = timedAwaitDone(nanos); + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if ((ex instanceof CompletionException) && + (cause = ex.getCause()) != null) + ex = cause; + throw new ExecutionException(ex); + } + + /** + * Returns the result value when complete, or throws an + * (unchecked) exception if completed exceptionally. To better + * conform with the use of common functional forms, if a + * computation involved in the completion of this + * CompletableFuture threw an exception, this method throws an + * (unchecked) {@link CompletionException} with the underlying + * exception as its cause. + * + * @return the result value + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed + * exceptionally or a completion computation threw an exception + */ + public T join() { + Object r; Throwable ex; + if ((r = result) == null) + r = waitingGet(false); + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if (ex instanceof CompletionException) + throw (CompletionException)ex; + throw new CompletionException(ex); + } + + /** + * Returns the result value (or throws any encountered exception) + * if completed, else returns the given valueIfAbsent. + * + * @param valueIfAbsent the value to return if not completed + * @return the result value, if completed, else the given valueIfAbsent + * @throws CancellationException if the computation was cancelled + * @throws CompletionException if this future completed + * exceptionally or a completion computation threw an exception + */ + public T getNow(T valueIfAbsent) { + Object r; Throwable ex; + if ((r = result) == null) + return valueIfAbsent; + if (!(r instanceof AltResult)) { + @SuppressWarnings("unchecked") T tr = (T) r; + return tr; + } + if ((ex = ((AltResult)r).ex) == null) + return null; + if (ex instanceof CancellationException) + throw (CancellationException)ex; + if (ex instanceof CompletionException) + throw (CompletionException)ex; + throw new CompletionException(ex); + } + + /** + * If not already completed, sets the value returned by {@link + * #get()} and related methods to the given value. + * + * @param value the result value + * @return {@code true} if this invocation caused this CompletableFuture + * to transition to a completed state, else {@code false} + */ + public boolean complete(T value) { + boolean triggered = result == null && + UNSAFE.compareAndSwapObject(this, RESULT, null, + value == null ? NIL : value); + postComplete(); + return triggered; + } + + /** + * If not already completed, causes invocations of {@link #get()} + * and related methods to throw the given exception. + * + * @param ex the exception + * @return {@code true} if this invocation caused this CompletableFuture + * to transition to a completed state, else {@code false} + */ + public boolean completeExceptionally(Throwable ex) { + if (ex == null) throw new NullPointerException(); + boolean triggered = result == null && + UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex)); + postComplete(); + return triggered; + } + + // CompletionStage methods + + public <U> CompletableFuture<U> thenApply + (Function<? super T,? extends U> fn) { + return doThenApply(fn, null); + } + + public <U> CompletableFuture<U> thenApplyAsync + (Function<? super T,? extends U> fn) { + return doThenApply(fn, ForkJoinPool.commonPool()); + } + + public <U> CompletableFuture<U> thenApplyAsync + (Function<? super T,? extends U> fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenApply(fn, executor); + } + + public CompletableFuture<Void> thenAccept + (Consumer<? super T> action) { + return doThenAccept(action, null); + } + + public CompletableFuture<Void> thenAcceptAsync + (Consumer<? super T> action) { + return doThenAccept(action, ForkJoinPool.commonPool()); + } + + public CompletableFuture<Void> thenAcceptAsync + (Consumer<? super T> action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenAccept(action, executor); + } + + public CompletableFuture<Void> thenRun + (Runnable action) { + return doThenRun(action, null); + } + + public CompletableFuture<Void> thenRunAsync + (Runnable action) { + return doThenRun(action, ForkJoinPool.commonPool()); + } + + public CompletableFuture<Void> thenRunAsync + (Runnable action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenRun(action, executor); + } + + public <U,V> CompletableFuture<V> thenCombine + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn) { + return doThenCombine(other.toCompletableFuture(), fn, null); + } + + public <U,V> CompletableFuture<V> thenCombineAsync + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn) { + return doThenCombine(other.toCompletableFuture(), fn, + ForkJoinPool.commonPool()); + } + + public <U,V> CompletableFuture<V> thenCombineAsync + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenCombine(other.toCompletableFuture(), fn, executor); + } + + public <U> CompletableFuture<Void> thenAcceptBoth + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + return doThenAcceptBoth(other.toCompletableFuture(), action, null); + } + + public <U> CompletableFuture<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + return doThenAcceptBoth(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); + } + + public <U> CompletableFuture<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenAcceptBoth(other.toCompletableFuture(), action, executor); + } + + public CompletableFuture<Void> runAfterBoth + (CompletionStage<?> other, + Runnable action) { + return doRunAfterBoth(other.toCompletableFuture(), action, null); + } + + public CompletableFuture<Void> runAfterBothAsync + (CompletionStage<?> other, + Runnable action) { + return doRunAfterBoth(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); + } + + public CompletableFuture<Void> runAfterBothAsync + (CompletionStage<?> other, + Runnable action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doRunAfterBoth(other.toCompletableFuture(), action, executor); + } + + + public <U> CompletableFuture<U> applyToEither + (CompletionStage<? extends T> other, + Function<? super T, U> fn) { + return doApplyToEither(other.toCompletableFuture(), fn, null); + } + + public <U> CompletableFuture<U> applyToEitherAsync + (CompletionStage<? extends T> other, + Function<? super T, U> fn) { + return doApplyToEither(other.toCompletableFuture(), fn, + ForkJoinPool.commonPool()); + } + + public <U> CompletableFuture<U> applyToEitherAsync + (CompletionStage<? extends T> other, + Function<? super T, U> fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doApplyToEither(other.toCompletableFuture(), fn, executor); + } + + public CompletableFuture<Void> acceptEither + (CompletionStage<? extends T> other, + Consumer<? super T> action) { + return doAcceptEither(other.toCompletableFuture(), action, null); + } + + public CompletableFuture<Void> acceptEitherAsync + (CompletionStage<? extends T> other, + Consumer<? super T> action) { + return doAcceptEither(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); + } + + public CompletableFuture<Void> acceptEitherAsync + (CompletionStage<? extends T> other, + Consumer<? super T> action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doAcceptEither(other.toCompletableFuture(), action, executor); + } + + public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, + Runnable action) { + return doRunAfterEither(other.toCompletableFuture(), action, null); + } + + public CompletableFuture<Void> runAfterEitherAsync + (CompletionStage<?> other, + Runnable action) { + return doRunAfterEither(other.toCompletableFuture(), action, + ForkJoinPool.commonPool()); + } + + public CompletableFuture<Void> runAfterEitherAsync + (CompletionStage<?> other, + Runnable action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doRunAfterEither(other.toCompletableFuture(), action, executor); + } + + public <U> CompletableFuture<U> thenCompose + (Function<? super T, ? extends CompletionStage<U>> fn) { + return doThenCompose(fn, null); + } + + public <U> CompletableFuture<U> thenComposeAsync + (Function<? super T, ? extends CompletionStage<U>> fn) { + return doThenCompose(fn, ForkJoinPool.commonPool()); + } + + public <U> CompletableFuture<U> thenComposeAsync + (Function<? super T, ? extends CompletionStage<U>> fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doThenCompose(fn, executor); + } + + public CompletableFuture<T> whenComplete + (BiConsumer<? super T, ? super Throwable> action) { + return doWhenComplete(action, null); + } + + public CompletableFuture<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action) { + return doWhenComplete(action, ForkJoinPool.commonPool()); + } + + public CompletableFuture<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doWhenComplete(action, executor); + } + + public <U> CompletableFuture<U> handle + (BiFunction<? super T, Throwable, ? extends U> fn) { + return doHandle(fn, null); + } + + public <U> CompletableFuture<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn) { + return doHandle(fn, ForkJoinPool.commonPool()); + } + + public <U> CompletableFuture<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn, + Executor executor) { + if (executor == null) throw new NullPointerException(); + return doHandle(fn, executor); + } + + /** + * Returns this CompletableFuture + * + * @return this CompletableFuture + */ + public CompletableFuture<T> toCompletableFuture() { + return this; + } + + // not in interface CompletionStage + + /** + * Returns a new CompletableFuture that is completed when this + * CompletableFuture completes, with the result of the given + * function of the exception triggering this CompletableFuture's + * completion when it completes exceptionally; otherwise, if this + * CompletableFuture completes normally, then the returned + * CompletableFuture also completes normally with the same value. + * Note: More flexible versions of this functionality are + * available using methods {@code whenComplete} and {@code handle}. + * + * @param fn the function to use to compute the value of the + * returned CompletableFuture if this CompletableFuture completed + * exceptionally + * @return the new CompletableFuture + */ + public CompletableFuture<T> exceptionally + (Function<Throwable, ? extends T> fn) { + if (fn == null) throw new NullPointerException(); + CompletableFuture<T> dst = new CompletableFuture<T>(); + ExceptionCompletion<T> d = null; + Object r; + if ((r = result) == null) { + CompletionNode p = + new CompletionNode(d = new ExceptionCompletion<T> + (this, fn, dst)); + while ((r = result) == null) { + if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, + p.next = completions, p)) + break; + } + } + if (r != null && (d == null || d.compareAndSet(0, 1))) { + T t = null; Throwable ex, dx = null; + if (r instanceof AltResult) { + if ((ex = ((AltResult)r).ex) != null) { + try { + t = fn.apply(ex); + } catch (Throwable rex) { + dx = rex; + } + } + } + else { + @SuppressWarnings("unchecked") T tr = (T) r; + t = tr; + } + dst.internalComplete(t, dx); + } + helpPostComplete(); + return dst; + } + /* ------------- Arbitrary-arity constructions -------------- */ /* @@ -3215,6 +2905,21 @@ public class CompletableFuture<T> implements Future<T> { } /** + * Returns {@code true} if this CompletableFuture completed + * exceptionally, in any way. Possible causes include + * cancellation, explicit invocation of {@code + * completeExceptionally}, and abrupt termination of a + * CompletionStage action. + * + * @return {@code true} if this CompletableFuture completed + * exceptionally + */ + public boolean isCompletedExceptionally() { + Object r; + return ((r = result) instanceof AltResult) && r != NIL; + } + + /** * Forcibly sets or resets the value subsequently returned by * method {@link #get()} and related methods, whether or not * already completed. This method is designed for use only in diff --git a/src/share/classes/java/util/concurrent/CompletionStage.java b/src/share/classes/java/util/concurrent/CompletionStage.java new file mode 100644 index 0000000000..6de60980cf --- /dev/null +++ b/src/share/classes/java/util/concurrent/CompletionStage.java @@ -0,0 +1,760 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package java.util.concurrent; +import java.util.function.Supplier; +import java.util.function.Consumer; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.BiFunction; +import java.util.concurrent.Executor; + +/** + * A stage of a possibly asynchronous computation, that performs an + * action or computes a value when another CompletionStage completes. + * A stage completes upon termination of its computation, but this may + * in turn trigger other dependent stages. The functionality defined + * in this interface takes only a few basic forms, which expand out to + * a larger set of methods to capture a range of usage styles: <ul> + * + * <li>The computation performed by a stage may be expressed as a + * Function, Consumer, or Runnable (using methods with names including + * <em>apply</em>, <em>accept</em>, or <em>run</em>, respectively) + * depending on whether it requires arguments and/or produces results. + * For example, {@code stage.thenApply(x -> square(x)).thenAccept(x -> + * System.out.print(x)).thenRun(() -> System.out.println())}. An + * additional form (<em>compose</em>) applies functions of stages + * themselves, rather than their results. </li> + * + * <li> One stage's execution may be triggered by completion of a + * single stage, or both of two stages, or either of two stages. + * Dependencies on a single stage are arranged using methods with + * prefix <em>then</em>. Those triggered by completion of + * <em>both</em> of two stages may <em>combine</em> their results or + * effects, using correspondingly named methods. Those triggered by + * <em>either</em> of two stages make no guarantees about which of the + * results or effects are used for the dependent stage's + * computation.</li> + * + * <li> Dependencies among stages control the triggering of + * computations, but do not otherwise guarantee any particular + * ordering. Additionally, execution of a new stage's computations may + * be arranged in any of three ways: default execution, default + * asynchronous execution (using methods with suffix <em>async</em> + * that employ the stage's default asynchronous execution facility), + * or custom (via a supplied {@link Executor}). The execution + * properties of default and async modes are specified by + * CompletionStage implementations, not this interface. Methods with + * explicit Executor arguments may have arbitrary execution + * properties, and might not even support concurrent execution, but + * are arranged for processing in a way that accommodates asynchrony. + * + * <li> Two method forms support processing whether the triggering + * stage completed normally or exceptionally: Method {@link + * #whenComplete whenComplete} allows injection of an action + * regardless of outcome, otherwise preserving the outcome in its + * completion. Method {@link #handle handle} additionally allows the + * stage to compute a replacement result that may enable further + * processing by other dependent stages. In all other cases, if a + * stage's computation terminates abruptly with an (unchecked) + * exception or error, then all dependent stages requiring its + * completion complete exceptionally as well, with a {@link + * CompletionException} holding the exception as its cause. If a + * stage is dependent on <em>both</em> of two stages, and both + * complete exceptionally, then the CompletionException may correspond + * to either one of these exceptions. If a stage is dependent on + * <em>either</em> of two others, and only one of them completes + * exceptionally, no guarantees are made about whether the dependent + * stage completes normally or exceptionally. In the case of method + * {@code whenComplete}, when the supplied action itself encounters an + * exception, then the stage exceptionally completes with this + * exception if not already completed exceptionally.</li> + * + * </ul> + * + * <p>All methods adhere to the above triggering, execution, and + * exceptional completion specifications (which are not repeated in + * individual method specifications). Additionally, while arguments + * used to pass a completion result (that is, for parameters of type + * {@code T}) for methods accepting them may be null, passing a null + * value for any other parameter will result in a {@link + * NullPointerException} being thrown. + * + * <p>This interface does not define methods for initially creating, + * forcibly completing normally or exceptionally, probing completion + * status or results, or awaiting completion of a stage. + * Implementations of CompletionStage may provide means of achieving + * such effects, as appropriate. Method {@link #toCompletableFuture} + * enables interoperability among different implementations of this + * interface by providing a common conversion type. + * + * @author Doug Lea + * @since 1.8 + */ +public interface CompletionStage<T> { + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed with this stage's result as the argument + * to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using this stage's default asynchronous + * execution facility, with this stage's result as the argument to + * the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> thenApplyAsync + (Function<? super T,? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using the supplied Executor, with this + * stage's result as the argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> thenApplyAsync + (Function<? super T,? extends U> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed with this stage's result as the argument + * to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenAccept(Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using this stage's default asynchronous + * execution facility, with this stage's result as the argument to + * the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using the supplied Executor, with this + * stage's result as the argument to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, + Executor executor); + /** + * Returns a new CompletionStage that, when this stage completes + * normally, executes the given action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenRun(Runnable action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, executes the given action using this stage's default + * asynchronous execution facility. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> thenRunAsync(Runnable action); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, executes the given action using the supplied Executor. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> thenRunAsync(Runnable action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage both complete normally, is executed with the two + * results as arguments to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @param <V> the function's return type + * @return the new CompletionStage + */ + public <U,V> CompletionStage<V> thenCombine + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using this stage's + * default asynchronous execution facility, with the two results + * as arguments to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @param <V> the function's return type + * @return the new CompletionStage + */ + public <U,V> CompletionStage<V> thenCombineAsync + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using the supplied + * executor, with the two results as arguments to the supplied + * function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the type of the other CompletionStage's result + * @param <V> the function's return type + * @return the new CompletionStage + */ + public <U,V> CompletionStage<V> thenCombineAsync + (CompletionStage<? extends U> other, + BiFunction<? super T,? super U,? extends V> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage both complete normally, is executed with the two + * results as arguments to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @return the new CompletionStage + */ + public <U> CompletionStage<Void> thenAcceptBoth + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using this stage's + * default asynchronous execution facility, with the two results + * as arguments to the supplied action. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param <U> the type of the other CompletionStage's result + * @return the new CompletionStage + */ + public <U> CompletionStage<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, is executed using the supplied + * executor, with the two results as arguments to the supplied + * function. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the type of the other CompletionStage's result + * @return the new CompletionStage + */ + public <U> CompletionStage<Void> thenAcceptBothAsync + (CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage both complete normally, executes the given action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, + Runnable action); + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, executes the given action using + * this stage's default asynchronous execution facility. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, + Runnable action); + + /** + * Returns a new CompletionStage that, when this and the other + * given stage complete normally, executes the given action using + * the supplied executor + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, + Runnable action, + Executor executor); + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed with the + * corresponding result as argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> applyToEither + (CompletionStage<? extends T> other, + Function<? super T, U> fn); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using this + * stage's default asynchronous execution facility, with the + * corresponding result as argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> applyToEitherAsync + (CompletionStage<? extends T> other, + Function<? super T, U> fn); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using the + * supplied executor, with the corresponding result as argument to + * the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param fn the function to use to compute the value of + * the returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> applyToEitherAsync + (CompletionStage<? extends T> other, + Function<? super T, U> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed with the + * corresponding result as argument to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> acceptEither + (CompletionStage<? extends T> other, + Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using this + * stage's default asynchronous execution facility, with the + * corresponding result as argument to the supplied action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> acceptEitherAsync + (CompletionStage<? extends T> other, + Consumer<? super T> action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, is executed using the + * supplied executor, with the corresponding result as argument to + * the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> acceptEitherAsync + (CompletionStage<? extends T> other, + Consumer<? super T> action, + Executor executor); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, executes the given action. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterEither(CompletionStage<?> other, + Runnable action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, executes the given action + * using this stage's default asynchronous execution facility. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterEitherAsync + (CompletionStage<?> other, + Runnable action); + + /** + * Returns a new CompletionStage that, when either this or the + * other given stage complete normally, executes the given action + * using supplied executor. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param other the other CompletionStage + * @param action the action to perform before completing the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<Void> runAfterEitherAsync + (CompletionStage<?> other, + Runnable action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed with this stage as the argument + * to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function returning a new CompletionStage + * @param <U> the type of the returned CompletionStage's result + * @return the CompletionStage + */ + public <U> CompletionStage<U> thenCompose + (Function<? super T, ? extends CompletionStage<U>> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using this stage's default asynchronous + * execution facility, with this stage as the argument to the + * supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function returning a new CompletionStage + * @param <U> the type of the returned CompletionStage's result + * @return the CompletionStage + */ + public <U> CompletionStage<U> thenComposeAsync + (Function<? super T, ? extends CompletionStage<U>> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * normally, is executed using the supplied Executor, with this + * stage's result as the argument to the supplied function. + * + * See the {@link CompletionStage} documentation for rules + * covering exceptional completion. + * + * @param fn the function returning a new CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the type of the returned CompletionStage's result + * @return the CompletionStage + */ + public <U> CompletionStage<U> thenComposeAsync + (Function<? super T, ? extends CompletionStage<U>> fn, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * exceptionally, is executed with this stage's exception as the + * argument to the supplied function. Otherwise, if this stage + * completes normally, then the returned stage also completes + * normally with the same value. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage if this CompletionStage completed + * exceptionally + * @return the new CompletionStage + */ + public CompletionStage<T> exceptionally + (Function<Throwable, ? extends T> fn); + + /** + * Returns a new CompletionStage with the same result or exception + * as this stage, and when this stage completes, executes the + * given action with the result (or {@code null} if none) and the + * exception (or {@code null} if none) of this stage. + * + * @param action the action to perform + * @return the new CompletionStage + */ + public CompletionStage<T> whenComplete + (BiConsumer<? super T, ? super Throwable> action); + + /** + * Returns a new CompletionStage with the same result or exception + * as this stage, and when this stage completes, executes the + * given action executes the given action using this stage's + * default asynchronous execution facility, with the result (or + * {@code null} if none) and the exception (or {@code null} if + * none) of this stage as arguments. + * + * @param action the action to perform + * @return the new CompletionStage + */ + public CompletionStage<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action); + + /** + * Returns a new CompletionStage with the same result or exception + * as this stage, and when this stage completes, executes using + * the supplied Executor, the given action with the result (or + * {@code null} if none) and the exception (or {@code null} if + * none) of this stage as arguments. + * + * @param action the action to perform + * @param executor the executor to use for asynchronous execution + * @return the new CompletionStage + */ + public CompletionStage<T> whenCompleteAsync + (BiConsumer<? super T, ? super Throwable> action, + Executor executor); + + /** + * Returns a new CompletionStage that, when this stage completes + * either normally or exceptionally, is executed with this stage's + * result and exception as arguments to the supplied function. + * The given function is invoked with the result (or {@code null} + * if none) and the exception (or {@code null} if none) of this + * stage when complete as arguments. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> handle + (BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * either normally or exceptionally, is executed using this stage's + * default asynchronous execution facility, with this stage's + * result and exception as arguments to the supplied function. + * The given function is invoked with the result (or {@code null} + * if none) and the exception (or {@code null} if none) of this + * stage when complete as arguments. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn); + + /** + * Returns a new CompletionStage that, when this stage completes + * either normally or exceptionally, is executed using the + * supplied executor, with this stage's result and exception as + * arguments to the supplied function. The given function is + * invoked with the result (or {@code null} if none) and the + * exception (or {@code null} if none) of this stage when complete + * as arguments. + * + * @param fn the function to use to compute the value of the + * returned CompletionStage + * @param executor the executor to use for asynchronous execution + * @param <U> the function's return type + * @return the new CompletionStage + */ + public <U> CompletionStage<U> handleAsync + (BiFunction<? super T, Throwable, ? extends U> fn, + Executor executor); + + /** + * Returns a {@link CompletableFuture} maintaining the same + * completion properties as this stage. If this stage is already a + * CompletableFuture, this method may return this stage itself. + * Otherwise, invocation of this method may be equivalent in + * effect to {@code thenApply(x -> x)}, but returning an instance + * of type {@code CompletableFuture}. A CompletionStage + * implementation that does not choose to interoperate with others + * may throw {@code UnsupportedOperationException}. + * + * @return the CompletableFuture + * @throws UnsupportedOperationException if this implementation + * does not interoperate with CompletableFuture + */ + public CompletableFuture<T> toCompletableFuture(); + +} |