aboutsummaryrefslogtreecommitdiff
path: root/src/share/classes/java/util/stream
diff options
context:
space:
mode:
authorpsandoz <none@none>2015-06-23 09:49:55 +0200
committerpsandoz <none@none>2015-06-23 09:49:55 +0200
commit59f508a0cb69a703a4734f1c6c7602091c4d1b5e (patch)
tree80efd530bd728878ea5c6e7756dc92d247bf0d0b /src/share/classes/java/util/stream
parent2e238ba726b80b6c16e1bd682838e52f16843626 (diff)
downloadjdk8u_jdk-59f508a0cb69a703a4734f1c6c7602091c4d1b5e.tar.gz
8129120: Terminal operation properties should not be back-propagated to upstream operations
Reviewed-by: briangoetz, chegar
Diffstat (limited to 'src/share/classes/java/util/stream')
-rw-r--r--src/share/classes/java/util/stream/AbstractPipeline.java97
1 files changed, 33 insertions, 64 deletions
diff --git a/src/share/classes/java/util/stream/AbstractPipeline.java b/src/share/classes/java/util/stream/AbstractPipeline.java
index dd60c2520e..d3ccdacbf0 100644
--- a/src/share/classes/java/util/stream/AbstractPipeline.java
+++ b/src/share/classes/java/util/stream/AbstractPipeline.java
@@ -249,6 +249,11 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
// If the last intermediate operation is stateful then
// evaluate directly to avoid an extra collection step
if (isParallel() && previousStage != null && opIsStateful()) {
+ // Set the depth of this, last, pipeline stage to zero to slice the
+ // pipeline such that this operation will not be included in the
+ // upstream slice and upstream operations will not be included
+ // in this slice
+ depth = 0;
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
}
else {
@@ -379,60 +384,6 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
}
/**
- * Prepare the pipeline for a parallel execution. As the pipeline is built,
- * the flags and depth indicators are set up for a sequential execution.
- * If the execution is parallel, and there are any stateful operations, then
- * some of these need to be adjusted, as well as adjusting for flags from
- * the terminal operation (such as back-propagating UNORDERED).
- * Need not be called for a sequential execution.
- *
- * @param terminalFlags Operation flags for the terminal operation
- */
- private void parallelPrepare(int terminalFlags) {
- @SuppressWarnings("rawtypes")
- AbstractPipeline backPropagationHead = sourceStage;
- if (sourceStage.sourceAnyStateful) {
- int depth = 1;
- for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage;
- p != null;
- u = p, p = p.nextStage) {
- int thisOpFlags = p.sourceOrOpFlags;
- if (p.opIsStateful()) {
- // If the stateful operation is a short-circuit operation
- // then move the back propagation head forwards
- // NOTE: there are no size-injecting ops
- if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
- backPropagationHead = p;
- // Clear the short circuit flag for next pipeline stage
- // This stage encapsulates short-circuiting, the next
- // stage may not have any short-circuit operations, and
- // if so spliterator.forEachRemaining should be be used
- // for traversal
- thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
- }
-
- depth = 0;
- // The following injects size, it is equivalent to:
- // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags);
- thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED;
- }
- p.depth = depth++;
- p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
- }
- }
-
- // Apply the upstream terminal flags
- if (terminalFlags != 0) {
- int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
- for ( @SuppressWarnings("rawtypes") AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) {
- p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags);
- }
-
- combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
- }
- }
-
- /**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
* stateful parallel pipeline, this is a spliterator describing the results
@@ -455,31 +406,49 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
throw new IllegalStateException(MSG_CONSUMED);
}
- if (isParallel()) {
- // @@@ Merge parallelPrepare with the loop below and use the
- // spliterator characteristics to determine if SIZED
- // should be injected
- parallelPrepare(terminalFlags);
-
+ if (isParallel() && sourceStage.sourceAnyStateful) {
// Adapt the source spliterator, evaluating each stateful op
- // in the pipeline up to and including this pipeline stage
- for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
+ // in the pipeline up to and including this pipeline stage.
+ // The depth and flags of each pipeline stage are adjusted accordingly.
+ int depth = 1;
+ for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
+ int thisOpFlags = p.sourceOrOpFlags;
if (p.opIsStateful()) {
+ depth = 0;
+
+ if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
+ // Clear the short circuit flag for next pipeline stage
+ // This stage encapsulates short-circuiting, the next
+ // stage may not have any short-circuit operations, and
+ // if so spliterator.forEachRemaining should be used
+ // for traversal
+ thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
+ }
+
spliterator = p.opEvaluateParallelLazy(u, spliterator);
+
+ // Inject or clear SIZED on the source pipeline stage
+ // based on the stage's spliterator
+ thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
+ ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
+ : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
}
+ p.depth = depth++;
+ p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
}
}
- else if (terminalFlags != 0) {
+
+ if (terminalFlags != 0) {
+ // Apply flags from the terminal operation to last pipeline stage
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
}
return spliterator;
}
-
// PipelineHelper
@Override