前言
基础示例与解析
static class A {@
Getter private String a;@
Getter private Integer b;
public A(String a, Integer b) {
this.a = a;
this.b = b;
}
}
public static void main(String[] args) {
List ret = Lists.newArrayList(new A("a", 1), new A("b", 2), new A("c", 3)).stream().map(A::getB).filter(b - > b >= 2).collect(Collectors.toList());
System.out.println(ret);
}
-
ArrayList.stream
-
.map
-
.filter
-
.collect
default Streamstream() {
return StreamSupport.stream(spliterator(), false);
}
public static Streamstream(Spliterator spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
public final Streammap(Function super P_OUT, ? extends R > mapper) { Objects.requireNonNull(mapper); return new StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@ Override SinkopWrapSink(int flags, Sink sink) { return new Sink.ChainedReference(sink) {@ Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
public final Streamfilter(Predicate super P_OUT > predicate) { Objects.requireNonNull(predicate); return new StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {@ Override SinkopWrapSink(int flags, Sink sink) { return new Sink.ChainedReference(sink) {@ Override public void begin(long size) { downstream.begin(-1); }@ Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
public final R, A > R collect(Collector super P_OUT, A, R > collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumerA, ? super P_OUT > accumulator = collector.accumulator(); forEach(u - > accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }
A container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);
}
public static TerminalOp makeRef(Collector super T, I, ?> collector) { Supplier supplier = Objects.requireNonNull(collector).supplier(); BiConsumersuper T > accumulator = collector.accumulator(); BinaryOperator combiner = collector.combiner(); class ReducingSink extends BoxI > implements AccumulatingSinkT, I, ReducingSink > {@ Override public void begin(long size) { state = supplier.get(); }@ Override public void accept(T t) { accumulator.accept(state, t); }@ Override public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp(StreamShape.REFERENCE) {@ Override public ReducingSink makeSink() { return new ReducingSink(); }@ Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } }; }
terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
public R evaluateSequential(PipelineHelper helper, Spliterator spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); }
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink;
final Sink wrapSink(Sink sink) { Objects.requireNonNull(sink); for (@SuppressWarnings("rawtypes") AbstractPipeline p = AbstractPipeline.this; p.depth > 0; p = p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink) sink; }
final void copyInto(Sink wrappedSink, Spliterator spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
public void forEachRemaining(Consumer super E > action) {
if ((i = index) >= 0 && (index = hi) for (; i@ SuppressWarnings("unchecked") E e = (E) a[i]; action.accept(e);
}
if (lst.modCount == mc) return;
}
而这里调用action.accept,就会通过责任链来一层层调用每个算子的accept,我们从map的accept开始:
@
OverrideSinkopWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {@
Override public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
public interface BaseStreamT, S extends BaseStreamT, S >> extends AutoCloseable {
/** * 返回stream中元素的迭代器 */
Iteratoriterator();
/** * 返回stream中元素的spliterator,用于并行执行 */
Spliteratorspliterator();
/** * 是否并行 */
boolean isParallel();
/** * 返回串行的stream,即强制parallel=false */
S sequential();
/** * 返回并行的stream,即强制parallel=true */
S parallel();
// ...}
/** * 最顶上的pipeline,即Head */ private final AbstractPipeline sourceStage; /** * 直接上游pipeline */ private final AbstractPipeline previousStage; /** * 直接下游pipeline */ @ SuppressWarnings("rawtypes") private AbstractPipeline nextStage; /** * pipeline深度 */ private int depth; /** * head的spliterator */ private Spliterator > sourceSpliterator; // ...
双流concat的场景示例及解析
static class Mapper1 implements IntUnaryOperator {@ Override public int applyAsInt(int operand) { return operand * operand; } } static class Filter1 implements IntPredicate {@ Override public boolean test(int value) { return value >= 2; } } static class Mapper2 implements IntUnaryOperator {@ Override public int applyAsInt(int operand) { return operand + operand; } } static class Filter2 implements IntPredicate {@ Override public boolean test(int value) { return value >= 10; } } static class Mapper3 implements IntUnaryOperator {@ Override public int applyAsInt(int operand) { return operand * operand; } } static class Filter3 implements IntPredicate {@ Override public boolean test(int value) { return value >= 10; } } public static void main(String[] args) { IntStream s1 = Arrays.stream(new int[] { 1, 2, 3 }).map(new Mapper1()).filter(new Filter1()); IntStream s2 = Arrays.stream(new int[] { 4, 5, 6 }).map(new Mapper2()).filter(new Filter2()); IntStream s3 = IntStream.concat(s1, s2).map(new Mapper3()).filter(new Filter3()); int sum = s3.sum(); }
terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
final > S wrapAndCopyInto(S sink, Spliterator spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; }
Head(concated s1 + s2 stream) -> Mapper3 -> Filter3 -> ReduceOp(sum)
final void copyInto(Sink wrappedSink, Spliterator spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); // ...
public void forEachRemaining(Consumer super T > consumer) { if (beforeSplit) aSpliterator.forEachRemaining(consumer); bSpliterator.forEachRemaining(consumer); }
// 包装的原始pipeline final PipelineHelper ph; // 原始pipeline的spliterator Spliterator spliterator;
public void forEachRemaining(IntConsumer consumer) { if (buffer == null && !finished) { Objects.requireNonNull(consumer); init(); ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator); finished = true; } // ...
可以看到,又调用了原始pipeline的wrapAndCopyInto方法中,而这里的consumer即为上面s3的逻辑。这样又递归回到了:
AbstractPipeline.wrapAndCopyInto -> AbstractPipeline.wrapSink-> AbstractPipeline.copyInto
方法中,而在这时的wrapSink中,现在的pipeline就是s1/s2了,这时就会对s1/s2下面的所有算子,调用AbstractPipeline.opWrapSink串联起来,以s1为例就变成: