aboutsummaryrefslogtreecommitdiff
path: root/src/share/classes/java/util/stream
diff options
context:
space:
mode:
authorpsandoz <none@none>2014-05-06 10:29:59 +0200
committerpsandoz <none@none>2014-05-06 10:29:59 +0200
commitdfbfbb4e75dc98033a02497aca374711d1d2c558 (patch)
tree35fc46e261875a1e1300c50ddcdf35babd9d3128 /src/share/classes/java/util/stream
parent39c685c3cd8f816d4ac29c34a9566c8dd46b8e93 (diff)
downloadjdk8u_jdk-dfbfbb4e75dc98033a02497aca374711d1d2c558.tar.gz
8042355: stream with sorted() causes downstream ops not to be lazy
Reviewed-by: mduigou
Diffstat (limited to 'src/share/classes/java/util/stream')
-rw-r--r--src/share/classes/java/util/stream/SortedOps.java207
1 files changed, 178 insertions, 29 deletions
diff --git a/src/share/classes/java/util/stream/SortedOps.java b/src/share/classes/java/util/stream/SortedOps.java
index 8dcabb4914..810de1ca20 100644
--- a/src/share/classes/java/util/stream/SortedOps.java
+++ b/src/share/classes/java/util/stream/SortedOps.java
@@ -278,16 +278,60 @@ final class SortedOps {
}
/**
+ * Abstract {@link Sink} for implementing sort on reference streams.
+ *
+ * <p>
+ * Note: documentation below applies to reference and all primitive sinks.
+ * <p>
+ * Sorting sinks first accept all elements, buffering then into an array
+ * or a re-sizable data structure, if the size of the pipeline is known or
+ * unknown respectively. At the end of the sink protocol those elements are
+ * sorted and then pushed downstream.
+ * This class records if {@link #cancellationRequested} is called. If so it
+ * can be inferred that the source pushing source elements into the pipeline
+ * knows that the pipeline is short-circuiting. In such cases sub-classes
+ * pushing elements downstream will preserve the short-circuiting protocol
+ * by calling {@code downstream.cancellationRequested()} and checking the
+ * result is {@code false} before an element is pushed.
+ * <p>
+ * Note that the above behaviour is an optimization for sorting with
+ * sequential streams. It is not an error that more elements, than strictly
+ * required to produce a result, may flow through the pipeline. This can
+ * occur, in general (not restricted to just sorting), for short-circuiting
+ * parallel pipelines.
+ */
+ private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
+ protected final Comparator<? super T> comparator;
+ // @@@ could be a lazy final value, if/when support is added
+ protected boolean cancellationWasRequested;
+
+ AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
+ super(downstream);
+ this.comparator = comparator;
+ }
+
+ /**
+ * Records is cancellation is requested so short-circuiting behaviour
+ * can be preserved when the sorted elements are pushed downstream.
+ *
+ * @return false, as this sink never short-circuits.
+ */
+ @Override
+ public final boolean cancellationRequested() {
+ cancellationWasRequested = true;
+ return false;
+ }
+ }
+
+ /**
* {@link Sink} for implementing sort on SIZED reference streams.
*/
- private static final class SizedRefSortingSink<T> extends Sink.ChainedReference<T, T> {
- private final Comparator<? super T> comparator;
+ private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
private T[] array;
private int offset;
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
- super(sink);
- this.comparator = comparator;
+ super(sink, comparator);
}
@Override
@@ -301,8 +345,14 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
- for (int i = 0; i < offset; i++)
- downstream.accept(array[i]);
+ if (!cancellationWasRequested) {
+ for (int i = 0; i < offset; i++)
+ downstream.accept(array[i]);
+ }
+ else {
+ for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
+ downstream.accept(array[i]);
+ }
downstream.end();
array = null;
}
@@ -316,13 +366,11 @@ final class SortedOps {
/**
* {@link Sink} for implementing sort on reference streams.
*/
- private static final class RefSortingSink<T> extends Sink.ChainedReference<T, T> {
- private final Comparator<? super T> comparator;
+ private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
- super(sink);
- this.comparator = comparator;
+ super(sink, comparator);
}
@Override
@@ -336,7 +384,15 @@ final class SortedOps {
public void end() {
list.sort(comparator);
downstream.begin(list.size());
- list.forEach(downstream::accept);
+ if (!cancellationWasRequested) {
+ list.forEach(downstream::accept);
+ }
+ else {
+ for (T t : list) {
+ if (downstream.cancellationRequested()) break;
+ downstream.accept(t);
+ }
+ }
downstream.end();
list = null;
}
@@ -348,9 +404,26 @@ final class SortedOps {
}
/**
+ * Abstract {@link Sink} for implementing sort on int streams.
+ */
+ private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
+ protected boolean cancellationWasRequested;
+
+ AbstractIntSortingSink(Sink<? super Integer> downstream) {
+ super(downstream);
+ }
+
+ @Override
+ public final boolean cancellationRequested() {
+ cancellationWasRequested = true;
+ return false;
+ }
+ }
+
+ /**
* {@link Sink} for implementing sort on SIZED int streams.
*/
- private static final class SizedIntSortingSink extends Sink.ChainedInt<Integer> {
+ private static final class SizedIntSortingSink extends AbstractIntSortingSink {
private int[] array;
private int offset;
@@ -369,8 +442,14 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
- for (int i = 0; i < offset; i++)
- downstream.accept(array[i]);
+ if (!cancellationWasRequested) {
+ for (int i = 0; i < offset; i++)
+ downstream.accept(array[i]);
+ }
+ else {
+ for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
+ downstream.accept(array[i]);
+ }
downstream.end();
array = null;
}
@@ -384,7 +463,7 @@ final class SortedOps {
/**
* {@link Sink} for implementing sort on int streams.
*/
- private static final class IntSortingSink extends Sink.ChainedInt<Integer> {
+ private static final class IntSortingSink extends AbstractIntSortingSink {
private SpinedBuffer.OfInt b;
IntSortingSink(Sink<? super Integer> sink) {
@@ -403,8 +482,16 @@ final class SortedOps {
int[] ints = b.asPrimitiveArray();
Arrays.sort(ints);
downstream.begin(ints.length);
- for (int anInt : ints)
- downstream.accept(anInt);
+ if (!cancellationWasRequested) {
+ for (int anInt : ints)
+ downstream.accept(anInt);
+ }
+ else {
+ for (int anInt : ints) {
+ if (downstream.cancellationRequested()) break;
+ downstream.accept(anInt);
+ }
+ }
downstream.end();
}
@@ -415,9 +502,26 @@ final class SortedOps {
}
/**
+ * Abstract {@link Sink} for implementing sort on long streams.
+ */
+ private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
+ protected boolean cancellationWasRequested;
+
+ AbstractLongSortingSink(Sink<? super Long> downstream) {
+ super(downstream);
+ }
+
+ @Override
+ public final boolean cancellationRequested() {
+ cancellationWasRequested = true;
+ return false;
+ }
+ }
+
+ /**
* {@link Sink} for implementing sort on SIZED long streams.
*/
- private static final class SizedLongSortingSink extends Sink.ChainedLong<Long> {
+ private static final class SizedLongSortingSink extends AbstractLongSortingSink {
private long[] array;
private int offset;
@@ -436,8 +540,14 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
- for (int i = 0; i < offset; i++)
- downstream.accept(array[i]);
+ if (!cancellationWasRequested) {
+ for (int i = 0; i < offset; i++)
+ downstream.accept(array[i]);
+ }
+ else {
+ for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
+ downstream.accept(array[i]);
+ }
downstream.end();
array = null;
}
@@ -451,7 +561,7 @@ final class SortedOps {
/**
* {@link Sink} for implementing sort on long streams.
*/
- private static final class LongSortingSink extends Sink.ChainedLong<Long> {
+ private static final class LongSortingSink extends AbstractLongSortingSink {
private SpinedBuffer.OfLong b;
LongSortingSink(Sink<? super Long> sink) {
@@ -470,8 +580,16 @@ final class SortedOps {
long[] longs = b.asPrimitiveArray();
Arrays.sort(longs);
downstream.begin(longs.length);
- for (long aLong : longs)
- downstream.accept(aLong);
+ if (!cancellationWasRequested) {
+ for (long aLong : longs)
+ downstream.accept(aLong);
+ }
+ else {
+ for (long aLong : longs) {
+ if (downstream.cancellationRequested()) break;
+ downstream.accept(aLong);
+ }
+ }
downstream.end();
}
@@ -482,9 +600,26 @@ final class SortedOps {
}
/**
+ * Abstract {@link Sink} for implementing sort on long streams.
+ */
+ private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
+ protected boolean cancellationWasRequested;
+
+ AbstractDoubleSortingSink(Sink<? super Double> downstream) {
+ super(downstream);
+ }
+
+ @Override
+ public final boolean cancellationRequested() {
+ cancellationWasRequested = true;
+ return false;
+ }
+ }
+
+ /**
* {@link Sink} for implementing sort on SIZED double streams.
*/
- private static final class SizedDoubleSortingSink extends Sink.ChainedDouble<Double> {
+ private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
private double[] array;
private int offset;
@@ -503,8 +638,14 @@ final class SortedOps {
public void end() {
Arrays.sort(array, 0, offset);
downstream.begin(offset);
- for (int i = 0; i < offset; i++)
- downstream.accept(array[i]);
+ if (!cancellationWasRequested) {
+ for (int i = 0; i < offset; i++)
+ downstream.accept(array[i]);
+ }
+ else {
+ for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
+ downstream.accept(array[i]);
+ }
downstream.end();
array = null;
}
@@ -518,7 +659,7 @@ final class SortedOps {
/**
* {@link Sink} for implementing sort on double streams.
*/
- private static final class DoubleSortingSink extends Sink.ChainedDouble<Double> {
+ private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
private SpinedBuffer.OfDouble b;
DoubleSortingSink(Sink<? super Double> sink) {
@@ -537,8 +678,16 @@ final class SortedOps {
double[] doubles = b.asPrimitiveArray();
Arrays.sort(doubles);
downstream.begin(doubles.length);
- for (double aDouble : doubles)
- downstream.accept(aDouble);
+ if (!cancellationWasRequested) {
+ for (double aDouble : doubles)
+ downstream.accept(aDouble);
+ }
+ else {
+ for (double aDouble : doubles) {
+ if (downstream.cancellationRequested()) break;
+ downstream.accept(aDouble);
+ }
+ }
downstream.end();
}