diff options
author | psandoz <none@none> | 2015-06-23 09:49:55 +0200 |
---|---|---|
committer | psandoz <none@none> | 2015-06-23 09:49:55 +0200 |
commit | 59f508a0cb69a703a4734f1c6c7602091c4d1b5e (patch) | |
tree | 80efd530bd728878ea5c6e7756dc92d247bf0d0b /src/share/classes/java/util/stream | |
parent | 2e238ba726b80b6c16e1bd682838e52f16843626 (diff) | |
download | jdk8u_jdk-59f508a0cb69a703a4734f1c6c7602091c4d1b5e.tar.gz |
8129120: Terminal operation properties should not be back-propagated to upstream operations
Reviewed-by: briangoetz, chegar
Diffstat (limited to 'src/share/classes/java/util/stream')
-rw-r--r-- | src/share/classes/java/util/stream/AbstractPipeline.java | 97 |
1 files changed, 33 insertions, 64 deletions
diff --git a/src/share/classes/java/util/stream/AbstractPipeline.java b/src/share/classes/java/util/stream/AbstractPipeline.java index dd60c2520e..d3ccdacbf0 100644 --- a/src/share/classes/java/util/stream/AbstractPipeline.java +++ b/src/share/classes/java/util/stream/AbstractPipeline.java @@ -249,6 +249,11 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) { + // Set the depth of this, last, pipeline stage to zero to slice the + // pipeline such that this operation will not be included in the + // upstream slice and upstream operations will not be included + // in this slice + depth = 0; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator); } else { @@ -379,60 +384,6 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> } /** - * Prepare the pipeline for a parallel execution. As the pipeline is built, - * the flags and depth indicators are set up for a sequential execution. - * If the execution is parallel, and there are any stateful operations, then - * some of these need to be adjusted, as well as adjusting for flags from - * the terminal operation (such as back-propagating UNORDERED). - * Need not be called for a sequential execution. - * - * @param terminalFlags Operation flags for the terminal operation - */ - private void parallelPrepare(int terminalFlags) { - @SuppressWarnings("rawtypes") - AbstractPipeline backPropagationHead = sourceStage; - if (sourceStage.sourceAnyStateful) { - int depth = 1; - for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage; - p != null; - u = p, p = p.nextStage) { - int thisOpFlags = p.sourceOrOpFlags; - if (p.opIsStateful()) { - // If the stateful operation is a short-circuit operation - // then move the back propagation head forwards - // NOTE: there are no size-injecting ops - if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { - backPropagationHead = p; - // Clear the short circuit flag for next pipeline stage - // This stage encapsulates short-circuiting, the next - // stage may not have any short-circuit operations, and - // if so spliterator.forEachRemaining should be be used - // for traversal - thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; - } - - depth = 0; - // The following injects size, it is equivalent to: - // StreamOpFlag.combineOpFlags(StreamOpFlag.IS_SIZED, p.combinedFlags); - thisOpFlags = (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED; - } - p.depth = depth++; - p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); - } - } - - // Apply the upstream terminal flags - if (terminalFlags != 0) { - int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK; - for ( @SuppressWarnings("rawtypes") AbstractPipeline p = backPropagationHead; p.nextStage != null; p = p.nextStage) { - p.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, p.combinedFlags); - } - - combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); - } - } - - /** * Get the source spliterator for this pipeline stage. For a sequential or * stateless parallel pipeline, this is the source spliterator. For a * stateful parallel pipeline, this is a spliterator describing the results @@ -455,31 +406,49 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> throw new IllegalStateException(MSG_CONSUMED); } - if (isParallel()) { - // @@@ Merge parallelPrepare with the loop below and use the - // spliterator characteristics to determine if SIZED - // should be injected - parallelPrepare(terminalFlags); - + if (isParallel() && sourceStage.sourceAnyStateful) { // Adapt the source spliterator, evaluating each stateful op - // in the pipeline up to and including this pipeline stage - for ( @SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; + // in the pipeline up to and including this pipeline stage. + // The depth and flags of each pipeline stage are adjusted accordingly. + int depth = 1; + for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { + int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { + depth = 0; + + if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { + // Clear the short circuit flag for next pipeline stage + // This stage encapsulates short-circuiting, the next + // stage may not have any short-circuit operations, and + // if so spliterator.forEachRemaining should be used + // for traversal + thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; + } + spliterator = p.opEvaluateParallelLazy(u, spliterator); + + // Inject or clear SIZED on the source pipeline stage + // based on the stage's spliterator + thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) + ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED + : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } + p.depth = depth++; + p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } - else if (terminalFlags != 0) { + + if (terminalFlags != 0) { + // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } - // PipelineHelper @Override |