- 函數式接口
函數式接口
初識lambda呢,函數式接口肯定是繞不過去的,函數式接口就是一個有且僅有一個抽象方法,但是可以有多個非抽象方法的接口。函數式接口可以被隱式轉換為lambda表達式。
@FunctionalInterface
publicinterfaceCloseable{
voidclose();
}
在java.util.function
它包含了很多類,用來支持Java的函數式編程,該包中的函數式接口有:
操作
流程
Stream相關接口繼承圖:
Stream流水線組織結構示意圖(圖是盜的):
Collection
類路徑java.util.colltction
@Override
defaultSpliteratorspliterator() {
returnSpliterators.spliterator(this,0);
}
//常用Stream流轉換
defaultStreamstream() {
returnStreamSupport.stream(spliterator(),false);
}
//并行流
defaultStreamparallelStream() {
returnStreamSupport.stream(spliterator(),true);
}
//java.util.stream.StreamSupport#stream(java.util.Spliterator,boolean)
publicstaticStreamstream(Spliteratorspliterator,booleanparallel) {
Objects.requireNonNull(spliterator);
returnnewReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}
AbstractPipeline
類路徑java.util.stream.AbstractPipeline
//反向鏈接到管道鏈的頭部(如果是源階段,則為自身)。
privatefinalAbstractPipelinesourceStage;
//“上游”管道,如果這是源階段,則為null。
privatefinalAbstractPipelinepreviousStage;
//此管道對象表示的中間操作的操作標志。
protectedfinalintsourceOrOpFlags;
//管道中的下一個階段;如果這是最后一個階段,則為null。在鏈接到下一個管道時有效地結束。
privateAbstractPipelinenextStage;
//如果是順序的,則此管道對象與流源之間的中間操作數;如果是并行的,則為先前有狀態的中間操作數。在管道準備進行評估時有效。
privateintdepth;
//源和所有操作的組合源標志和操作標志,直到此流水線對象表示的操作為止(包括該流水線對象所代表的操作)。在管道準備進行評估時有效。
privateintcombinedFlags;
//源拆分器。僅對頭管道有效。如果管道使用非null值,那么在使用管道之前,sourceSupplier必須為null。在使用管道之后,如果非null,則將其設置為null。
privateSpliterator>sourceSpliterator;
//來源供應商。僅對頭管道有效。如果非null,則在使用管道之前,sourceSpliterator必須為null。在使用管道之后,如果非null,則將其設置為null。
privateSupplier?extends?Spliterator>>sourceSupplier;
//如果已鏈接或使用此管道,則為True
privatebooleanlinkedOrConsumed;
//如果正在執行任何有狀態操作,則為true;否則為true。僅對源階段有效。
privatebooleansourceAnyStateful;
privateRunnablesourceCloseAction;
//如果管道是并行的,則為true;否則,管道為順序的;否則為true。僅對源階段有效。
privatebooleanparallel;
ReferencePipeline
類路徑:java.util.stream.ReferencePipeline
filter
//java.util.stream.ReferencePipeline#filter
@Override
publicfinalStreamfilter(Predicate?superP_OUT>predicate) {
Objects.requireNonNull(predicate);
//返回一個匿名無狀態的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED){
//下游生產線所需要的回調接口
@Override
SinkopWrapSink(intflags,Sinksink) {
returnnewSink.ChainedReference(sink){
@Override
publicvoidbegin(longsize){
downstream.begin(-1);
}
//真正執行操作的方法,依靠ChainedReference內置ReferencePipeline引用下游的回調
@Override
publicvoidaccept(P_OUTu){
//只有滿足條件的元素才能被下游執行
if(predicate.test(u))
downstream.accept(u);
}
};
}
};
}
map
//java.util.stream.ReferencePipeline#map
publicfinalStreammap(Function?superP_OUT,?extendsR>mapper) {
Objects.requireNonNull(mapper);
//返回一個匿名無狀態的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT){
//下游生產線所需要的回調接口
@Override
SinkopWrapSink(intflags,Sinksink) {
returnnewSink.ChainedReference(sink){
//真正執行操作的方法,依靠ChainedReference內置ReferencePipeline引用下游的回調
@Override
publicvoidaccept(P_OUTu){
//執行轉換后提供給下游執行
downstream.accept(mapper.apply(u));
}
};
}
};
}
flatMap
//java.util.stream.ReferencePipeline#flatMap
@Override
publicfinalStreamflatMap(Function?superP_OUT,?extendsStream?extends?R>>mapper) {
Objects.requireNonNull(mapper);
//返回一個匿名無狀態的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT|StreamOpFlag.NOT_SIZED){
//下游生產線所需要的回調接口
@Override
SinkopWrapSink(intflags,Sinksink) {
returnnewSink.ChainedReference(sink){
@Override
publicvoidbegin(longsize){
downstream.begin(-1);
}
//真正執行操作的方法,依靠ChainedReference內置ReferencePipeline引用下游的回調
@Override
publicvoidaccept(P_OUTu){
try(Stream?extends?R>result=mapper.apply(u)){
//劃分為多個流執行下游(分流)
if(result!=null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
peek
//java.util.stream.ReferencePipeline#peek
@Override
publicfinalStreampeek(Consumer?superP_OUT>action) {
Objects.requireNonNull(action);
//返回一個匿名無狀態的管道
returnnewStatelessOp(this,StreamShape.REFERENCE,0){
//下游生產線所需要的回調接口
@Override
SinkopWrapSink(intflags,Sinksink) {
returnnewSink.ChainedReference(sink){
//真正執行操作的方法,依靠ChainedReference內置ReferencePipeline引用下游的回調
@Override
publicvoidaccept(P_OUTu){
//先執行自定義方法,在執行下游方法
action.accept(u);
downstream.accept(u);
}
};
}
};
}
sorted
@Override
publicfinalStreamsorted() {
//不提供Comparator,會使用元素自實現Comparator的compareTo方法
returnSortedOps.makeRef(this);
}
@Override
publicfinalStreamsorted(Comparator?superP_OUT>comparator) {
returnSortedOps.makeRef(this,comparator);
}
//Sorted.makeRef
staticStreammakeRef(AbstractPipeline,?T,??>upstream,
Comparator?superT>comparator) {
returnnewOfRef<>(upstream,comparator);
}
//ofRef類
privatestaticfinalclassOfRef<T>extendsReferencePipeline.StatefulOp<T,T>{
privatefinalbooleanisNaturalSort;
privatefinalComparator?superT>comparator;
@Override
publicSinkopWrapSink(intflags,Sinksink) {
Objects.requireNonNull(sink);
//根據不同的flag進行不同排序
if(StreamOpFlag.SORTED.isKnown(flags)&&isNaturalSort)
returnsink;
elseif(StreamOpFlag.SIZED.isKnown(flags))
returnnewSizedRefSortingSink<>(sink,comparator);
else
returnnewRefSortingSink<>(sink,comparator);
}
}
distinct
@Override
publicfinalStreamdistinct() {
returnDistinctOps.makeRef(this);
}
staticReferencePipelinemakeRef(AbstractPipeline,?T,??>upstream) {
//返回一個匿名有狀態的管道
returnnewReferencePipeline.StatefulOp(upstream,StreamShape.REFERENCE,StreamOpFlag.IS_DISTINCT|StreamOpFlag.NOT_SIZED){
@Override
SinkopWrapSink(intflags,Sinksink) {
Objects.requireNonNull(sink);
if(StreamOpFlag.DISTINCT.isKnown(flags)){
//已經是去重過了
returnsink;
}elseif(StreamOpFlag.SORTED.isKnown(flags)){
//有序流
returnnewSink.ChainedReference(sink){
booleanseenNull;
//這個為先執行的前序元素
TlastSeen;
@Override
publicvoidbegin(longsize){
seenNull=false;
lastSeen=null;
downstream.begin(-1);
}
@Override
publicvoidend(){
seenNull=false;
lastSeen=null;
downstream.end();
}
//這里通過有序的特性,前序元素與后序元素比較,如果相等則跳過執行后序的元素
@Override
publicvoidaccept(Tt){
if(t==null){
//這里控制元素為null只有一個
if(!seenNull){
seenNull=true;
downstream.accept(lastSeen=null);
}
}elseif(lastSeen==null||!t.equals(lastSeen)){
//這里將前序元素賦值給lastSeen
downstream.accept(lastSeen=t);
}
}
};
}else{
//底層通過Set進行去重,所以該元素需要重寫hashCode和equals方法
returnnewSink.ChainedReference(sink){
Setseen;
@Override
publicvoidbegin(longsize){
seen=newHashSet<>();
downstream.begin(-1);
}
@Override
publicvoidend(){
seen=null;
downstream.end();
}
@Override
publicvoidaccept(Tt){
if(!seen.contains(t)){
seen.add(t);
downstream.accept(t);
}
}
};
}
}
};
}
skip、limit
publicstaticStreammakeRef(AbstractPipeline,?T,??>upstream,
longskip,longlimit) {
if(skip0)
thrownewIllegalArgumentException("Skipmustbenon-negative:"+skip);
//返回一個匿名有狀態的管道
returnnewReferencePipeline.StatefulOp(upstream,StreamShape.REFERENCE,flags(limit)){
SpliteratorunorderedSkipLimitSpliterator(Spliterators,longskip,longlimit,longsizeIfKnown) {
if(skip<=?sizeIfKnown)?{
????????????????????limit?=?limit?>=0?Math.min(limit,sizeIfKnown-skip):sizeIfKnown-skip;
skip=0;
}
returnnewStreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s,skip,limit);
}
//自己實現真正操作的方法
@Override
SinkopWrapSink(intflags,Sinksink) {
returnnewSink.ChainedReference(sink){
longn=skip;
longm=limit>=0?limit:Long.MAX_VALUE;
@Override
publicvoidbegin(longsize){
downstream.begin(calcSize(size,skip,m));
}
@Override
publicvoidaccept(Tt){
if(n==0){
//limit
if(m>0){
m--;
downstream.accept(t);
}
}
//skip
else{
n--;
}
}
@Override
publicbooleancancellationRequested(){
returnm==0||downstream.cancellationRequested();
}
};
}
};
}
reduce
//java.util.stream.ReferencePipeline#reduce(P_OUT,java.util.function.BinaryOperator)
@Override
publicfinalP_OUTreduce(finalP_OUTidentity,finalBinaryOperatoraccumulator) {
returnevaluate(ReduceOps.makeRef(identity,accumulator,accumulator));
}
//java.util.stream.ReferencePipeline#reduce(java.util.function.BinaryOperator)
@Override
publicfinalOptionalreduce(BinaryOperatoraccumulator) {
returnevaluate(ReduceOps.makeRef(accumulator));
}
//java.util.stream.ReferencePipeline#reduce(R,java.util.function.BiFunction,java.util.function.BinaryOperator)
@Override
publicfinalRreduce(Ridentity,BiFunctionsuper P_OUT,R>accumulator,BinaryOperatorcombiner) {
returnevaluate(ReduceOps.makeRef(identity,accumulator,combiner));
}
//java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)
finalRevaluate(TerminalOpterminalOp) {
assertgetOutputShape()==terminalOp.inputShape();
if(linkedOrConsumed)
thrownewIllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed=true;
returnisParallel()
?terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
collect
//java.util.stream.ReferencePipeline#collect(java.util.stream.Collector?super?P_OUT,A,R>)
@Override
@SuppressWarnings("unchecked")
publicfinalRcollect(Collector?superP_OUT,A,R>collector){
Acontainer;
if(isParallel()
&&(collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&&(!isOrdered()||collector.characteristics().contains(Collector.Characteristics.UNORDERED))){
container=collector.supplier().get();
BiConsumersuperP_OUT>accumulator=collector.accumulator();
forEach(u->accumulator.accept(container,u));
}
else{
container=evaluate(ReduceOps.makeRef(collector));
}
//具有特定轉換的使用finisher處理
returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
?(R)container
:collector.finisher().apply(container);
}
//java.util.stream.ReferencePipeline#collect(java.util.function.Supplier,java.util.function.BiConsumer,java.util.function.BiConsumer)
@Override
publicfinalRcollect(Suppliersupplier,BiConsumersuper P_OUT>accumulator,BiConsumercombiner) {
returnevaluate(ReduceOps.makeRef(supplier,accumulator,combiner));
}
//java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp)
finalRevaluate(TerminalOpterminalOp) {
assertgetOutputShape()==terminalOp.inputShape();
if(linkedOrConsumed)
thrownewIllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed=true;
returnisParallel()
?terminalOp.evaluateParallel(this,sourceSpliterator(terminalOp.getOpFlags()))
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
forEach
//java.util.stream.ReferencePipeline#forEach
@Override
publicvoidforEach(Consumer?superP_OUT>action){
evaluate(ForEachOps.makeRef(action,false));
}
//java.util.stream.ForEachOps#makeRef
publicstaticTerminalOpmakeRef(Consumer?superT>action,booleanordered) {
Objects.requireNonNull(action);
returnnewForEachOp.OfRef<>(action,ordered);
}
//java.util.stream.ForEachOps.ForEachOp.OfRef
staticfinalclassOfRef<T>extendsForEachOp<T>{
finalConsumer?superT>consumer;
OfRef(Consumer?superT>consumer,booleanordered){
super(ordered);
this.consumer=consumer;
}
//只是簡單的消費
@Override
publicvoidaccept(Tt){
consumer.accept(t);
}
}
Head
流的數據元的頭,類路徑java.util.stream.ReferencePipeline.Head
//java.util.stream.ReferencePipeline.Head
staticclassHead<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{
Head(Supplier?extends?Spliterator>>source,intsourceFlags,booleanparallel){
super(source,sourceFlags,parallel);
}
Head(Spliterator>source,intsourceFlags,booleanparallel){
super(source,sourceFlags,parallel);
}
@Override
finalbooleanopIsStateful(){
thrownewUnsupportedOperationException();
}
@Override
finalSinkopWrapSink(intflags,Sinksink) {
thrownewUnsupportedOperationException();
}
//Optimizedsequentialterminaloperationsfortheheadofthepipeline
@Override
publicvoidforEach(Consumer?superE_OUT>action){
if(!isParallel()){
sourceStageSpliterator().forEachRemaining(action);
}
else{
super.forEach(action);
}
}
@Override
publicvoidforEachOrdered(Consumer?superE_OUT>action){
if(!isParallel()){
sourceStageSpliterator().forEachRemaining(action);
}
else{
super.forEachOrdered(action);
}
}
}
StatelessOp
無狀態的中間管道,類路徑java.util.stream.ReferencePipeline.StatelessOp
//java.util.stream.ReferencePipeline.StatelessOp
abstractstaticclassStatelessOp<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{
StatelessOp(AbstractPipeline,?E_IN,??>upstream,StreamShapeinputShape,intopFlags){
super(upstream,opFlags);
assertupstream.getOutputShape()==inputShape;
}
@Override
finalbooleanopIsStateful(){
returnfalse;
}
}
StatefulOp
有狀態的中間管道,類路徑java.util.stream.ReferencePipeline.StatefulOp
//java.util.stream.ReferencePipeline.StatefulOp
abstractstaticclassStatefulOp<E_IN,E_OUT>extendsReferencePipeline<E_IN,E_OUT>{
StatefulOp(AbstractPipeline,?E_IN,??>upstream,StreamShapeinputShape,intopFlags){
super(upstream,opFlags);
assertupstream.getOutputShape()==inputShape;
}
@Override
finalbooleanopIsStateful(){
returntrue;
}
@Override
abstractNodeopEvaluateParallel(PipelineHelperhelper,
Spliteratorspliterator,
IntFunctiongenerator) ;
TerminalOp
管道流的結束操作,類路徑java.util.stream.TerminalOp
interfaceTerminalOp<E_IN,R>{
//獲取此操作的輸入類型的形狀
defaultStreamShapeinputShape(){returnStreamShape.REFERENCE;}
//獲取操作的流標志。終端操作可以設置StreamOpFlag定義的流標志的有限子集,并且這些標志與管道的先前組合的流和中間操作標志組合在一起。
defaultintgetOpFlags(){return0;}
//使用指定的PipelineHelper對操作執行并行評估,該操作描述上游中間操作。
defaultRevaluateParallel(PipelineHelperhelper,Spliteratorspliterator) {
if(Tripwire.ENABLED)
Tripwire.trip(getClass(),"{0}triggeringTerminalOp.evaluateParallelserialdefault");
returnevaluateSequential(helper,spliterator);
}
//使用指定的PipelineHelper對操作執行順序評估,該操作描述上游中間操作。
RevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) ;
}
ReduceOp
類路徑java.util.stream.ReduceOps.ReduceOp
privatestaticabstractclassReduceOp<T,R,SextendsAccumulatingSink<T,R,S>>implementsTerminalOp<T,R>{
privatefinalStreamShapeinputShape;
ReduceOp(StreamShapeshape){
inputShape=shape;
}
publicabstractSmakeSink();
@Override
publicStreamShapeinputShape(){
returninputShape;
}
//通過匿名子類實現makeSink()獲取Sink
@Override
publicRevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) {
returnhelper.wrapAndCopyInto(makeSink(),spliterator).get();
}
@Override
publicRevaluateParallel(PipelineHelperhelper,Spliteratorspliterator) {
returnnewReduceTask<>(this,helper,spliterator).invoke().get();
}
}
MatchOp
類路徑java.util.stream.MatchOps.MatchOp
privatestaticfinalclassMatchOp<T>implementsTerminalOp<T,Boolean>{
privatefinalStreamShapeinputShape;
finalMatchKindmatchKind;
finalSupplier>sinkSupplier;
MatchOp(StreamShapeshape,MatchKindmatchKind,Supplier>sinkSupplier){
this.inputShape=shape;
this.matchKind=matchKind;
this.sinkSupplier=sinkSupplier;
}
@Override
publicintgetOpFlags(){
returnStreamOpFlag.IS_SHORT_CIRCUIT|StreamOpFlag.NOT_ORDERED;
}
@Override
publicStreamShapeinputShape(){
returninputShape;
}
//使用內置的sinkSupplier獲取Sink
@Override
publicBooleanevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) {
returnhelper.wrapAndCopyInto(sinkSupplier.get(),spliterator).getAndClearState();
}
@Override
publicBooleanevaluateParallel(PipelineHelperhelper,Spliteratorspliterator) {
returnnewMatchTask<>(this,helper,spliterator).invoke();
}
}
FindOp
類路徑java.util.stream.FindOps.FindOp
privatestaticfinalclassFindOp<T,O>implementsTerminalOp<T,O>{
privatefinalStreamShapeshape;
finalbooleanmustFindFirst;
finalOemptyValue;
finalPredicatepresentPredicate;
finalSupplier>sinkSupplier;
FindOp(booleanmustFindFirst,
StreamShapeshape,
OemptyValue,
PredicatepresentPredicate,
Supplier>sinkSupplier){
this.mustFindFirst=mustFindFirst;
this.shape=shape;
this.emptyValue=emptyValue;
this.presentPredicate=presentPredicate;
this.sinkSupplier=sinkSupplier;
}
@Override
publicintgetOpFlags(){
returnStreamOpFlag.IS_SHORT_CIRCUIT|(mustFindFirst?0:StreamOpFlag.NOT_ORDERED);
}
@Override
publicStreamShapeinputShape(){
returnshape;
}
//通過內置sinkSupplier獲取Sink
@Override
publicOevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) {
Oresult=helper.wrapAndCopyInto(sinkSupplier.get(),spliterator).get();
returnresult!=null?result:emptyValue;
}
@Override
publicOevaluateParallel(PipelineHelperhelper,Spliteratorspliterator) {
returnnewFindTask<>(this,helper,spliterator).invoke();
}
}
ForEachOp
類路徑java.util.stream.ForEachOps.ForEachOp
staticabstractclassForEachOp<T>implementsTerminalOp<T,Void>,TerminalSink<T,Void>{
privatefinalbooleanordered;
protectedForEachOp(booleanordered){
this.ordered=ordered;
}
@Override
publicintgetOpFlags(){
returnordered?0:StreamOpFlag.NOT_ORDERED;
}
//自己實現了Sink
@Override
publicVoidevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) {
returnhelper.wrapAndCopyInto(this,spliterator).get();
}
@Override
publicVoidevaluateParallel(PipelineHelperhelper,Spliteratorspliterator) {
if(ordered)
newForEachOrderedTask<>(helper,spliterator,this).invoke();
else
newForEachTask<>(helper,spliterator,helper.wrapSink(this)).invoke();
returnnull;
}
@Override
publicVoidget(){
returnnull;
}
staticfinalclassOfRef<T>extendsForEachOp<T>{
finalConsumer?superT>consumer;
OfRef(Consumer?superT>consumer,booleanordered){
super(ordered);
this.consumer=consumer;
}
@Override
publicvoidaccept(Tt){
consumer.accept(t);
}
}
...
}
Sink
類路徑java.util.stream.Sink
interfaceSink<T>extendsConsumer<T>{
//開始遍歷元素之前調用該方法,通知Sink做好準備。
defaultvoidbegin(longsize){}
//所有元素遍歷完成之后調用,通知Sink沒有更多的元素了。
defaultvoidend(){}
//是否可以結束操作,可以讓短路操作盡早結束。
defaultbooleancancellationRequested(){
returnfalse;
}
//遍歷元素時調用,接受一個待處理元素,并對元素進行處理。Stage把自己包含的操作和回調方法封裝到該方法里,前一個Stage只需要調用當前Stage.accept(Tt)方法就行了。
voidaccept(Tt);
}
這里Sink的子類實現中分為兩種:中間操作匿名實現ChainedReference
和TerminalOp
子類所提供的Sink。
ChainedReference
類路徑java.util.stream.Sink.ChainedReference
,這里是中間操作的默認模板父類
staticabstractclassChainedReference<T,E_OUT>implementsSink<T>{
protectedfinalSink?superE_OUT>downstream;
publicChainedReference(Sink?superE_OUT>downstream){
this.downstream=Objects.requireNonNull(downstream);
}
@Override
publicvoidbegin(longsize){
downstream.begin(size);
}
@Override
publicvoidend(){
downstream.end();
}
@Override
publicbooleancancellationRequested(){
returndownstream.cancellationRequested();
}
}
在上述的中間操作管道流中都是通過匿名類繼承ChainedReference
實現onWrapSink(int, Sink)
返回一個指定操作的Sink。
TerminalSink
這里為什么講提供呢?這是因為不同的實現TerminalOp的子類中在實現java.util.stream.TerminalOp#evaluateSequential
中都是通過helper.wrapAndCopyInto(TerminalOp子類實現提供的Sink, spliterator)
中通過參數傳遞的方式提供的,不同的子類傳遞的方式不一樣所以此處用了一個提供Sink
由ReduceOps中實現TerminalOp
所提供的ReducingSink
,它是由匿名類實現java.util.stream.ReduceOps.ReduceOp#makeSink
來交付給helper.wrapAndCopyInto(makeSink(), spliterator)
的。
publicstaticTerminalOpmakeRef(Useed,BiFunctionsuperT,U>reducer,BinaryOperatorcombiner) {
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
classReducingSinkextendsBox<U>implementsAccumulatingSink<T,U,ReducingSink>{
@Override
publicvoidbegin(longsize){
state=seed;
}
@Override
publicvoidaccept(Tt){
state=reducer.apply(state,t);
}
@Override
publicvoidcombine(ReducingSinkother){
state=combiner.apply(state,other.state);
}
}
returnnewReduceOp(StreamShape.REFERENCE){
@Override
publicReducingSinkmakeSink(){
returnnewReducingSink();
}
};
}
由ForEachOps
中實現TerminalOp
所提供的是this,它的提供方式就是通過this交付給helper.wrapAndCopyInto(this, spliterator)
。
//這里ForEachOp自己通過TerminalSink間接的實現了Sink
staticabstractclassForEachOp<T>implementsTerminalOp<T,Void>,TerminalSink<T,Void>{
@Override
publicVoidevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) {
returnhelper.wrapAndCopyInto(this,spliterator).get();
}
}
由MatchOps中實現TerminalOp
所提供的sinkSupplier
通過構造函數由外部賦值,通過Supplier接口的get()
來交付給helper.wrapAndCopyInto(sinkSupplier.get(), spliterator)
。
privatestaticfinalclassMatchOp<T>implementsTerminalOp<T,Boolean>{
finalSupplier>sinkSupplier;
@Override
publicBooleanevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) {
returnhelper.wrapAndCopyInto(sinkSupplier.get(),spliterator).getAndClearState();
}
}
由FindOps中實現TerminalOp
所提供的與上述MatchOps
是一致的
privatestaticfinalclassFindOp<T,O>implementsTerminalOp<T,O>{
finalSupplier>sinkSupplier;
@Override
publicOevaluateSequential(PipelineHelperhelper,Spliteratorspliterator) {
Oresult=helper.wrapAndCopyInto(sinkSupplier.get(),spliterator).get();
returnresult!=null?result:emptyValue;
}
}
Collector
在Collector中有以下幾個實現接口:
-
Supplier
:結果類型的提供器。 -
BiConsumer
:將元素放入結果的累加器。 -
BinaryOperator
:合并部分結果的組合器。 -
Function
:對結果類型轉換為最終結果類型的轉換器。 -
Set
:保存Collector特征的集合
并行流
前述都是基于串行流的講解,其實并行流也是基于上述的helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator)
這個方法上面做的一層基于ForkJoinTask
多線程框架的封裝。
ForkJoinTask
ForkJoin框架的思想就是分而治之,它將一個大任務切割為多個小任務這個過程稱為fork,將每個任務的執行的結果進行匯總的過程稱為join。ForkJoin框架相關的接口關系圖如下(圖是盜的):
AbstractTask
類路徑java.util.stream.AbstractTask
,AbstractTask繼承了在JUC中已經封裝好的ForkJoinTask
抽象子類java.util.concurrent.CountedCompleter
。
此類基于CountedCompleter
,它是fork-join
任務的一種形式,其中每個任務都有未完成子代的信號量計數,并且該任務隱式完成并在其最后一個子代完成時得到通知。 內部節點任務可能會覆蓋CountedCompleter
的onCompletion
方法,以將子任務的結果合并到當前任務的結果中。
拆分和設置子任務鏈接是由內部節點的compute()
完成的。 在葉節點的compute()
時間,可以確保將為所有子代設置父代的子代相關字段(包括父代子代的同級鏈接)。
例如,執行減少任務的任務將覆蓋doLeaf()
以使用Spliterator
對該葉節點的塊執行減少Spliterator
,并覆蓋onCompletion()
以合并內部節點的子任務的結果:
@Override
protectedReduceTaskmakeChild(Spliteratorspliterator) {
//返回一個ForkJoinTask任務
returnnewReduceTask<>(this,spliterator);
}
@Override
protectedSdoLeaf(){
//其他實現大同小異
returnhelper.wrapAndCopyInto(op.makeSink(),spliterator);
}
@Override
publicvoidonCompletion(CountedCompleter>caller){
//非葉子節點進行結果組合
if(!isLeaf()){
SleftResult=leftChild.getLocalResult();
leftResult.combine(rightChild.getLocalResult());
setLocalResult(leftResult);
}
//GCspliterator,leftandrightchild
super.onCompletion(caller);
}
AbstractTask
封裝了分片任務的算法模板,通過是Spliterator
的trySplit()
方法來實現分片的細節,詳細算法源碼如下(類路徑:java.util.stream.AbstractTask#compute
):
@Override
publicvoidcompute(){
//將當前這個spliterator作為右節點(此時為root節點)
Spliteratorrs=spliterator,ls;
//評估任務的大小
longsizeEstimate=rs.estimateSize();
//獲取任務閾值
longsizeThreshold=getTargetSize(sizeEstimate);
booleanforkRight=false;
@SuppressWarnings("unchecked")Ktask=(K)this;
//細節不多贅述,下面我用圖來講解算法
/**
*根節點指定為:右邊節點
*root
*split()
*leftright
*left.fork()
*split()
*lr
*rs=ls
*right.fork()
*split()
*lr
*l.fork()
*/
while(sizeEstimate>sizeThreshold&&(ls=rs.trySplit())!=null){
KleftChild,rightChild,taskToFork;
task.leftChild=leftChild=task.makeChild(ls);
task.rightChild=rightChild=task.makeChild(rs);
task.setPendingCount(1);
if(forkRight){
forkRight=false;
//左右節點切換進行fork和split
rs=ls;
task=leftChild;
taskToFork=rightChild;
}
else{
forkRight=true;
task=rightChild;
taskToFork=leftChild;
}
//fork任務加入隊列中去
taskToFork.fork();
sizeEstimate=rs.estimateSize();
}
//將執行doLeaf底層就是單個串行流的操作
task.setLocalResult(task.doLeaf());
//將結果組合成一個最終結果
task.tryComplete();
}
AbstractTask
執行與分片流程圖如下:
到這里Stream流的相關知識介紹到這,這里附上一副總體圖來加深下印象
審核編輯 :李倩
-
接口
+關注
關注
33文章
8526瀏覽量
150862 -
JAVA
+關注
關注
19文章
2960瀏覽量
104563 -
函數
+關注
關注
3文章
4308瀏覽量
62445
原文標題:還有人不知道 Java 8 Stream流底層原理?
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論