Java8 Lambda实现源码解析Java语言

  • 34

Java8 Lambda实现源码解析

前言

Java8的lambda应该大家都比较熟悉了,本文主要从源码层面探讨一下lambda的设计和实现。

基础示例与解析

先看下面的示例代码:
    static class A {@
Getter private String a;@
Getter private Integer b;
public A(String a, Integer b) {
this.a = a;
this.b = b;
}
}
public static void main(String[] args) {
List ret = Lists.newArrayList(new A("a", 1), new A("b", 2), new A("c", 3)).stream().map(A::getB).filter(b - > b >= 2).collect(Collectors.toList());
System.out.println(ret);
}
上面代码中,其实主要就是几步:
  1. ArrayList.stream

  2. .map

  3. .filter

  4. .collect

一步步来看,ArrayList.stream 实际调用的是Collector.stream方法:
 

 default Streamstream() {
return StreamSupport.stream(spliterator(), false);
}
spliterator()方法生成的是 IteratorSpliterator 对象,spliterator的意思就是可以split的iterator,这个主要是用于lambda中的parallelStream中的并行操作,上面的例子中由于调用的是stream,所以parallel=false。
StreamSupport.stream最后生成的是一个ReferencePipeline.Head对象:
 public static Streamstream(Spliterator spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);
}
Head类是从ReferencePipeline派生的,表示lambda的pipeline中的头节点。
有了这个Head对象之后,在它之上调用.map,实际上就是调用了基类ReferencePipeline.map方法:
 

    public final Streammap(Function super P_OUT, ? extends R > mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {@
Override SinkopWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {@
Override public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
返回的是一个StatelessOp,表示一个无状态的算子,这个类也是ReferencePipeline的子类,可以看到它的构造函数,第一个参数this,表示把Head对象作为StatelessOp对象的upstream,也就是它的上游。StatelessOp.opWrapSink方法先不讲,后面会讲到。
接着调用StatelessOp.filter方法,也还是会回到ReferencePipeline.filter方法:
 public final Streamfilter(Predicate super P_OUT > predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {@
Override SinkopWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {@
Override public void begin(long size) {
downstream.begin(-1);
}@
Override public void accept(P_OUT u) {
if (predicate.test(u)) downstream.accept(u);
}
};
}
};
}
可以看到,仍然生成的是一个StatelessOp对象,只是它的upstream变了而已。
最后调用StatelessOp.collect,继续回到ReferencePipeline.collect方法:

 

  public final R, A > R collect(Collector super P_OUT, A, R > collector) {
A container;
if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumerA, ? super P_OUT > accumulator = collector.accumulator();
forEach(u - > accumulator.accept(container, u));
} else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);
}
在前面几步,.map, .filter方法其实都只是创建StatelessOp对象,但是到collect就不一样了,了解spark/flink的就知道,collect其实是个action/sink,调用了collect,就会真实地触发这个stream上各个operator的执行。这也就是我们经常听到的lazy execution,所有的操作,只有碰到action的算子才会开始执行。
之前讲到这个stream的parallel=false,所以上面的实际执行逻辑是:
A container = evaluate(ReduceOps.makeRef(collector));
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container);
}
在进入evaluate方法之前,先看一下ReduceOps.makeRef(collector),它实际上就是基于Collectors.toList生成的CollectorImpl实例包装了一层,返回了一个 TerminalOp对象(实际是ReduceOp)。
 

  public static TerminalOp makeRef(Collector super T, I, ?> collector) {
Supplier supplier = Objects.requireNonNull(collector).supplier();
BiConsumersuper T > accumulator = collector.accumulator();
BinaryOperator combiner = collector.combiner();
class ReducingSink extends BoxI > implements AccumulatingSinkT, I, ReducingSink > {@
Override public void begin(long size) {
state = supplier.get();
}@
Override public void accept(T t) {
accumulator.accept(state, t);
}@
Override public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp(StreamShape.REFERENCE) {@
Override public ReducingSink makeSink() {
return new ReducingSink();
}@
Override public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0;
}
};
}
上面代码可以看到,基本也就是直接调用了collector的实现,稍微需要注意的是,ReducingSink从Box派生,Box的意思就是盒子,它里面有个state成员,表示一个计算的状态。ReducingSink就是通过这个state,进行combine, accumulate操作(实际就是一个List)。
回到evaluate方法,它实际调用了:
 

terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
这里this就是最后阶段的ReferencePipeline,即StatelessOp,这里我们称它为 ReferencePipeline$2,即经过两个算子操作的pipeline。
sourceSpliterator 则会取到sourceStage的spliterator,即最上面Head的spliterator。
ReduceOp.evaluateSequential:
      public R evaluateSequential(PipelineHelper helper, Spliterator spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
helper即ReferencePipeline$2,这里makeSink即上面返回的ReducingSink重载的方法。
ReferencePipeline.wrapAndCopyInto,在其父类AbstractPipeline中实现:
 

       copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
wrapSink代码:
 final Sink wrapSink(Sink sink) {
Objects.requireNonNull(sink);
for (@SuppressWarnings("rawtypes") AbstractPipeline p = AbstractPipeline.this; p.depth > 0; p = p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink) sink;
}
可以看到,这里就是将pipeline从后至前,分别调用每个pipeline的opWrapSink方法,就是一个责任链的模式。opWrapSink可以看上面map的opWrapSink的filter的opWrapSink实现,map的很简单,直接调用mapper.apply,实际上就是A::getB方法,filter的也很简单,调用的是 predicate.test 方法。
接下来到copyInto方法,到这里才会有真正的执行逻辑:
   final void copyInto(Sink wrappedSink, Spliterator spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
} else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
它会走入到这部分的逻辑中:
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
这里面最重要的是就是中间这行了,由于spliterator持有的Collection引用,是ArrayList,因此它会调用ArrayList.forEachRemaining方法:
public void forEachRemaining(Consumer super E > action) {
if ((i = index) >= 0 && (index = hi) for (; i@ SuppressWarnings("unchecked") E e = (E) a[i]; action.accept(e);
}
if (lst.modCount == mc) return;
}
这里的action参数,就是上面经过责任链封装的Sink(它也是Consumer的子类)。
而这里调用action.accept,就会通过责任链来一层层调用每个算子的accept,我们从map的accept开始:
@
OverrideSinkopWrapSink(int flags, Sink sink) {
return new Sink.ChainedReference(sink) {@
Override public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
可以看到,它先调用mapper.apply,然后把结果直接传给downstream.accept,也就是调用filter的accept,接着来到ReducingSink.accept,也就是往state中添加一个结果元素,这样forEach执行完之后,结果自然就有了。
看完上面的流程,接下来看一下lambda里面部分类设计,首先来看一下Stream,它的基类是BaseStream,提供以下接口:

 

public interface BaseStreamT, S extends BaseStreamT, S >> extends AutoCloseable {
/** * 返回stream中元素的迭代器 */
Iteratoriterator();
/** * 返回stream中元素的spliterator,用于并行执行 */
Spliteratorspliterator();
/** * 是否并行 */
boolean isParallel();
/** * 返回串行的stream,即强制parallel=false */
S sequential();
/** * 返回并行的stream,即强制parallel=true */
S parallel();
// ...}
直接继承此接口的,是如IntStream, LongStream,DoubleStream等,这些是在BaseStream基础上,提供了filter, map, mapToObj, distinct等算子的接口,但是这些算子,是限定类型的,如IntStream.filter, 它接受的就是 IntPredicate, 而不是常规的Predicate;map方法也是,接受的是 IntUnaryOperator。
IntStream, LongStream这些都是接口,也就是仅仅用来描述算子的。它们的实现都是基于Pipeline的,基类为 AbstractPipeline,它的几个关键成员变量:
    /** * 最顶上的pipeline,即Head */
private final AbstractPipeline sourceStage;
/** * 直接上游pipeline */
private final AbstractPipeline previousStage;
/** * 直接下游pipeline */
@
SuppressWarnings("rawtypes") private AbstractPipeline nextStage;
/** * pipeline深度 */
private int depth;
/** * head的spliterator */
private Spliterator > sourceSpliterator;
// ...
这个基类还提供了pipeline的基础实现,以及对BaseStream和PipelineHelper接口的实现,如evaluate, sourceStageSpliterator, wrapAndCopyInto, wrapSink等。
类似地,从AbstractPipeline派生的子类有:IntPipeline, LongPipeline, DoublePipeline, ReferencePipeline等。前面三种比较容易理解,提供的是基于原始类型的lambda操作(且都实现了对应的XXStream接口),而ReferencePipeline提供的是基于对象的lambda操作。
类层次如下:

Java8 Lambda实现源码解析

注意这些子类,也都是abstract的,每一种pipeline下面,都有Head, StatelessOp, StatefulOp三个子类。分别用于描述pipeline的头节点,无状态中间算子,有状态中间算子。
Head是非抽象类,StatelessOp也是抽象类,它在map、filter、mapToObj等算子中,会动态创建它的匿名子类,并实现opWrapSink方法。
通过这种设计,除了collect之外,所有算子的返回结果都是Stream的子类,在IntPipeline中,map, flatMap, filter等都返回IntStream,即使它们的实现可能是StatelessOp, Head等,都对外提供了统一的接口。同时由于lambda中每个算子的实现是动态的,如最上面例子中A::getB, b -> b>=2等,那就通过每个算子重载 opWrapSink 方法来动态封装这些逻辑。
同时,通过将XXStream和XXPipeline分开的设计,可以保持Stream接口的简洁(对用户透出的接口)。否则如果将BaseStream做成抽象类,将AbstractPipeline相关的逻辑移到这里面,会导致Stream变得非常臃肿,在API层面用户使用的时候也会很困惑。
创建Pipeline的地方,则统一收口到了StreamSupport类中,这是一个大的工厂类。虽然ArrayList, Arrays等类中都提供了stream的方法,但是最后都统一调用了StreamSupport里来创建Pipeline的实例,通常也就是创建 XXPipeline.Head对象,然后通过这个对象进行其他lambda算子的添加。

双流concat的场景示例及解析

接下来看一个相对比较复杂的例子,双流concat的场景,代码如下:
 

  static class Mapper1 implements IntUnaryOperator {@
Override public int applyAsInt(int operand) {
return operand * operand;
}
}
static class Filter1 implements IntPredicate {@
Override public boolean test(int value) {
return value >= 2;
}
}
static class Mapper2 implements IntUnaryOperator {@
Override public int applyAsInt(int operand) {
return operand + operand;
}
}
static class Filter2 implements IntPredicate {@
Override public boolean test(int value) {
return value >= 10;
}
}
static class Mapper3 implements IntUnaryOperator {@
Override public int applyAsInt(int operand) {
return operand * operand;
}
}
static class Filter3 implements IntPredicate {@
Override public boolean test(int value) {
return value >= 10;
}
}
public static void main(String[] args) {
IntStream s1 = Arrays.stream(new int[] {
1, 2, 3
}).map(new Mapper1()).filter(new Filter1());
IntStream s2 = Arrays.stream(new int[] {
4, 5, 6
}).map(new Mapper2()).filter(new Filter2());
IntStream s3 = IntStream.concat(s1, s2).map(new Mapper3()).filter(new Filter3());
int sum = s3.sum();
}
上面代码中,先分别创建两个IntStream:s1, s2。然后进行concat操作,生成s2,最后调用sum操作做reduce。
代码分析还是从sink开始,reduce跟前面的collect类似,实际会基于s3这个stream, 在AbstractPipeline.evaluate方法中执行:
terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
这里terminalOp即为sum这个ReduceOp,sourceSpliterator为Streams.ConcatSpliterator,也即调用s3这个pipeline的wrapAndCopyInto方法:
final > S wrapAndCopyInto(S sink, Spliterator spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
这里的wrapSink,就会将s3中的算子与最后的reduce串在一起,大致如下:
Head(concated s1 + s2 stream) -> Mapper3 -> Filter3 -> ReduceOp(sum)
到目前为止,我们还只看到s3的逻辑,那么s1和s2两个stream的mapper和filter逻辑在哪里呢,接着看下面的copyInto方法:
   final void copyInto(Sink wrappedSink, Spliterator spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end(); // ...
上面讲到,这里的spliterator是Streams.ConcatSpliterator对象,看下Streams.ConcatSpliterator.forEachRemaining实现:
       public void forEachRemaining(Consumer super T > consumer) {
if (beforeSplit) aSpliterator.forEachRemaining(consumer);
bSpliterator.forEachRemaining(consumer);
}
这里就区分出了两个不同的流,对每个流的spliterator分别调用forEachRemaining方法,这里的spliterator是IntWrappingSpliterator, 它是对s1/s2的一个封装,它有两个关键成员:
     // 包装的原始pipeline final PipelineHelper ph;
// 原始pipeline的spliterator Spliterator spliterator;
所以就走到了 
IntWrappingSpliterator.foreachMaining方法中:
      public void forEachRemaining(IntConsumer consumer) {
if (buffer == null && !finished) {
Objects.requireNonNull(consumer);
init();
ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
finished = true;
} // ...

可以看到,又调用了原始pipeline的wrapAndCopyInto方法中,而这里的consumer即为上面s3的逻辑。这样又递归回到了:

AbstractPipeline.wrapAndCopyInto -> AbstractPipeline.wrapSink-> AbstractPipeline.copyInto

方法中,而在这时的wrapSink中,现在的pipeline就是s1/s2了,这时就会对s1/s2下面的所有算子,调用AbstractPipeline.opWrapSink串联起来,以s1为例就变成:

Head(array[1,2,3]) -> Mapper1 -> Filter1 -> Mapper3 -> Filter3 -> ReduceOp(sum)
这样s1流跟s3流就串起来执行完成了,然后就是s2和s3流串起来执行。

📚文章说明:
1、本文章链接失效后,请通过右下角企鹅QQ告知管理员,24小时内补链接,谢谢。点击下载说明了解。
2、天天精品分享的资源均通过网络公开合法渠道获取的,仅阅读交流测试使用,请在下载后24小时内删除。
3、版权归作者或出版社方所有,本站不对涉及的版权问题负法律责任。点击免责申明了解详情。
4、若版权方认为天天精品侵权,请联系客服QQ或发送邮件myttjp@163.com处理。
5、会员和精品豆系对搜集搬运、整理及网站运营做的友情赞助,非购买文件费用,敬请谅解。点击关于本站了解本站。
6、每位访客应尊重版权方的知识产权,支持版权方和出版社。

发表评论