diff options
author | psandoz <none@none> | 2014-05-06 10:29:59 +0200 |
---|---|---|
committer | psandoz <none@none> | 2014-05-06 10:29:59 +0200 |
commit | dfbfbb4e75dc98033a02497aca374711d1d2c558 (patch) | |
tree | 35fc46e261875a1e1300c50ddcdf35babd9d3128 /src/share/classes/java/util/stream | |
parent | 39c685c3cd8f816d4ac29c34a9566c8dd46b8e93 (diff) | |
download | jdk8u_jdk-dfbfbb4e75dc98033a02497aca374711d1d2c558.tar.gz |
8042355: stream with sorted() causes downstream ops not to be lazy
Reviewed-by: mduigou
Diffstat (limited to 'src/share/classes/java/util/stream')
-rw-r--r-- | src/share/classes/java/util/stream/SortedOps.java | 207 |
1 files changed, 178 insertions, 29 deletions
diff --git a/src/share/classes/java/util/stream/SortedOps.java b/src/share/classes/java/util/stream/SortedOps.java index 8dcabb4914..810de1ca20 100644 --- a/src/share/classes/java/util/stream/SortedOps.java +++ b/src/share/classes/java/util/stream/SortedOps.java @@ -278,16 +278,60 @@ final class SortedOps { } /** + * Abstract {@link Sink} for implementing sort on reference streams. + * + * <p> + * Note: documentation below applies to reference and all primitive sinks. + * <p> + * Sorting sinks first accept all elements, buffering then into an array + * or a re-sizable data structure, if the size of the pipeline is known or + * unknown respectively. At the end of the sink protocol those elements are + * sorted and then pushed downstream. + * This class records if {@link #cancellationRequested} is called. If so it + * can be inferred that the source pushing source elements into the pipeline + * knows that the pipeline is short-circuiting. In such cases sub-classes + * pushing elements downstream will preserve the short-circuiting protocol + * by calling {@code downstream.cancellationRequested()} and checking the + * result is {@code false} before an element is pushed. + * <p> + * Note that the above behaviour is an optimization for sorting with + * sequential streams. It is not an error that more elements, than strictly + * required to produce a result, may flow through the pipeline. This can + * occur, in general (not restricted to just sorting), for short-circuiting + * parallel pipelines. + */ + private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> { + protected final Comparator<? super T> comparator; + // @@@ could be a lazy final value, if/when support is added + protected boolean cancellationWasRequested; + + AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { + super(downstream); + this.comparator = comparator; + } + + /** + * Records is cancellation is requested so short-circuiting behaviour + * can be preserved when the sorted elements are pushed downstream. + * + * @return false, as this sink never short-circuits. + */ + @Override + public final boolean cancellationRequested() { + cancellationWasRequested = true; + return false; + } + } + + /** * {@link Sink} for implementing sort on SIZED reference streams. */ - private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T, T> { - private final Comparator<? super T> comparator; + private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> { private T[] array; private int offset; SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { - super(sink); - this.comparator = comparator; + super(sink, comparator); } @Override @@ -301,8 +345,14 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset, comparator); downstream.begin(offset); - for (int i = 0; i < offset; i++) - downstream.accept(array[i]); + if (!cancellationWasRequested) { + for (int i = 0; i < offset; i++) + downstream.accept(array[i]); + } + else { + for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) + downstream.accept(array[i]); + } downstream.end(); array = null; } @@ -316,13 +366,11 @@ final class SortedOps { /** * {@link Sink} for implementing sort on reference streams. */ - private static final class RefSortingSink<T> extends Sink.ChainedReference<T, T> { - private final Comparator<? super T> comparator; + private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { private ArrayList<T> list; RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { - super(sink); - this.comparator = comparator; + super(sink, comparator); } @Override @@ -336,7 +384,15 @@ final class SortedOps { public void end() { list.sort(comparator); downstream.begin(list.size()); - list.forEach(downstream::accept); + if (!cancellationWasRequested) { + list.forEach(downstream::accept); + } + else { + for (T t : list) { + if (downstream.cancellationRequested()) break; + downstream.accept(t); + } + } downstream.end(); list = null; } @@ -348,9 +404,26 @@ final class SortedOps { } /** + * Abstract {@link Sink} for implementing sort on int streams. + */ + private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> { + protected boolean cancellationWasRequested; + + AbstractIntSortingSink(Sink<? super Integer> downstream) { + super(downstream); + } + + @Override + public final boolean cancellationRequested() { + cancellationWasRequested = true; + return false; + } + } + + /** * {@link Sink} for implementing sort on SIZED int streams. */ - private static final class SizedIntSortingSink extends Sink.ChainedInt<Integer> { + private static final class SizedIntSortingSink extends AbstractIntSortingSink { private int[] array; private int offset; @@ -369,8 +442,14 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset); downstream.begin(offset); - for (int i = 0; i < offset; i++) - downstream.accept(array[i]); + if (!cancellationWasRequested) { + for (int i = 0; i < offset; i++) + downstream.accept(array[i]); + } + else { + for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) + downstream.accept(array[i]); + } downstream.end(); array = null; } @@ -384,7 +463,7 @@ final class SortedOps { /** * {@link Sink} for implementing sort on int streams. */ - private static final class IntSortingSink extends Sink.ChainedInt<Integer> { + private static final class IntSortingSink extends AbstractIntSortingSink { private SpinedBuffer.OfInt b; IntSortingSink(Sink<? super Integer> sink) { @@ -403,8 +482,16 @@ final class SortedOps { int[] ints = b.asPrimitiveArray(); Arrays.sort(ints); downstream.begin(ints.length); - for (int anInt : ints) - downstream.accept(anInt); + if (!cancellationWasRequested) { + for (int anInt : ints) + downstream.accept(anInt); + } + else { + for (int anInt : ints) { + if (downstream.cancellationRequested()) break; + downstream.accept(anInt); + } + } downstream.end(); } @@ -415,9 +502,26 @@ final class SortedOps { } /** + * Abstract {@link Sink} for implementing sort on long streams. + */ + private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> { + protected boolean cancellationWasRequested; + + AbstractLongSortingSink(Sink<? super Long> downstream) { + super(downstream); + } + + @Override + public final boolean cancellationRequested() { + cancellationWasRequested = true; + return false; + } + } + + /** * {@link Sink} for implementing sort on SIZED long streams. */ - private static final class SizedLongSortingSink extends Sink.ChainedLong<Long> { + private static final class SizedLongSortingSink extends AbstractLongSortingSink { private long[] array; private int offset; @@ -436,8 +540,14 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset); downstream.begin(offset); - for (int i = 0; i < offset; i++) - downstream.accept(array[i]); + if (!cancellationWasRequested) { + for (int i = 0; i < offset; i++) + downstream.accept(array[i]); + } + else { + for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) + downstream.accept(array[i]); + } downstream.end(); array = null; } @@ -451,7 +561,7 @@ final class SortedOps { /** * {@link Sink} for implementing sort on long streams. */ - private static final class LongSortingSink extends Sink.ChainedLong<Long> { + private static final class LongSortingSink extends AbstractLongSortingSink { private SpinedBuffer.OfLong b; LongSortingSink(Sink<? super Long> sink) { @@ -470,8 +580,16 @@ final class SortedOps { long[] longs = b.asPrimitiveArray(); Arrays.sort(longs); downstream.begin(longs.length); - for (long aLong : longs) - downstream.accept(aLong); + if (!cancellationWasRequested) { + for (long aLong : longs) + downstream.accept(aLong); + } + else { + for (long aLong : longs) { + if (downstream.cancellationRequested()) break; + downstream.accept(aLong); + } + } downstream.end(); } @@ -482,9 +600,26 @@ final class SortedOps { } /** + * Abstract {@link Sink} for implementing sort on long streams. + */ + private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> { + protected boolean cancellationWasRequested; + + AbstractDoubleSortingSink(Sink<? super Double> downstream) { + super(downstream); + } + + @Override + public final boolean cancellationRequested() { + cancellationWasRequested = true; + return false; + } + } + + /** * {@link Sink} for implementing sort on SIZED double streams. */ - private static final class SizedDoubleSortingSink extends Sink.ChainedDouble<Double> { + private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink { private double[] array; private int offset; @@ -503,8 +638,14 @@ final class SortedOps { public void end() { Arrays.sort(array, 0, offset); downstream.begin(offset); - for (int i = 0; i < offset; i++) - downstream.accept(array[i]); + if (!cancellationWasRequested) { + for (int i = 0; i < offset; i++) + downstream.accept(array[i]); + } + else { + for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) + downstream.accept(array[i]); + } downstream.end(); array = null; } @@ -518,7 +659,7 @@ final class SortedOps { /** * {@link Sink} for implementing sort on double streams. */ - private static final class DoubleSortingSink extends Sink.ChainedDouble<Double> { + private static final class DoubleSortingSink extends AbstractDoubleSortingSink { private SpinedBuffer.OfDouble b; DoubleSortingSink(Sink<? super Double> sink) { @@ -537,8 +678,16 @@ final class SortedOps { double[] doubles = b.asPrimitiveArray(); Arrays.sort(doubles); downstream.begin(doubles.length); - for (double aDouble : doubles) - downstream.accept(aDouble); + if (!cancellationWasRequested) { + for (double aDouble : doubles) + downstream.accept(aDouble); + } + else { + for (double aDouble : doubles) { + if (downstream.cancellationRequested()) break; + downstream.accept(aDouble); + } + } downstream.end(); } |