• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

java8 stream学习笔记

Java基础 来源:绝情谷 3次浏览

pipeline

为了更好地对集合进行并行操作,java8中加入了stream API。以前对集合的操作策略由客户端提供,有了stream API后,对集合的操作集成到了集合内部,客户端只需要按需调用即可。stream API支持函数式编程,它对集合的操作一般分为三个阶段:

(1)source:即集合的开始状态。

(2)intermedia operations :0个或者多个中间阶段,比如Stream.filter,Stream.map等,中间阶段通常都是lazy,比如filter操作并不会马上开始过滤,而是返回一个新的stream对象。直到遇到terminal operation时才会真正地执行。

(3)terminal operation:一个终结操作,比如foreach,IntStream.sum。

以代码为例分析stream的执行过程:

public static void main(String[] args) {
	List<String> words = asList("aboutjava", "accessibility", "addressing", "addshortcut", "about", "all", "become","bacteriumparatyphosum");
	// 以a开头的字符串的最大长度
	int result = words.stream().filter(s -> s.startsWith("a")).mapToInt(s -> s.length()).max().getAsInt();
	System.out.println(result);
}

以上代码是计算字符串列表中以a开头的字符串的最大长度,一共包括两个intermedia operations:filter和mapToInt,一个terminal operation:max。

如下图所示,每一个操作都会创建一个Stream Pipeline,每个Pipeline包含一个Sink(也就是具体操作),max实际上是由reduce操作来实现的,Stream Pipeline通过upstream字段形成一个链表。

当调用max()方法时,PipelineHelper.wrapAndCopyInto()方法会被调用,这个方法的实现如下:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
	copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
	return sink;
}

主要做了两件事:

1、将所有的Sink(Ops)进行合并,合并的代码如下:

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
	Objects.requireNonNull(sink);

	for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
		sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
	}
	return (Sink<P_IN>) sink;
}

这些Sink最终会通过downstream形成一个链表。

举个栗子,要从豆腐中过滤出豆浆,通常的做法是准备好几张过滤网(每张过滤网的功能不一样,有的是过滤豆腐渣的,有的可能是过滤沙,有的过滤其他更小的杂质),将豆腐倒入第一个网中,用桶接住,将桶中的豆腐再倒入第二张网中。。。依次类推,最后得到豆浆。

这里合并Sink的过程,就是把所有过滤网叠在一起,看起来像一张网但兼有以上几张网的功能,以后只需要过滤一次就可以得到豆浆了。

2、遍历集合,代码如下:

if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
	wrappedSink.begin(spliterator.getExactSizeIfKnown());
	spliterator.forEachRemaining(wrappedSink);
	wrappedSink.end();
}
else {
	copyIntoWithCancel(wrappedSink, spliterator);
}

wrappedSink.begin会依次调用所有Sink的begin方法,相当于叠好网,放到指定位置。

spliterator.forEachRemaining会依赖调用Sink的accept方法,相当于把豆腐倒在网上进行过滤。

这里有对Short_circuit类型的操作进行特殊处理,比如findFirst只需要找一个满足要求的,当找到一个后,就不再找了。好比桶中装满一杯豆浆后,就不倒豆腐了。

wrappedSink.end表明已处理完,直接出结果。

状态

在Stream的这些操作中,有的是无状态的,有的是有状态的,有状态的操作包括:distinct、sorted、limit、skip,比如distince依赖之前的结果,否则不知道当前结果是否唯一,这种情况下线程之前会有竞争,从而影响并发的性能,因此并行情况下要谨慎使用有状态的操作。

short-circuiting

有时候需要在遍历中途停止操作,比如查找第一个满足条件的元素或者limit操作。在Stream中short-circuiting操作有:anyMatch、allMatch、noneMatch、findFirst、findAny、limit,这些操作在Sink中都有一个变量来判断是否短路,比如limit用的是m,match用的是stop,find用的是hasValue。Sink中的cancellationRequested方法就是用来针对short-circuiting操作的,一旦发现是短路操作,就会调用AbstractPipeline的copyIntoWithCancel方法:

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
	@SuppressWarnings({"rawtypes","unchecked"})
	AbstractPipeline p = AbstractPipeline.this;
	while (p.depth > 0) {
		p = p.previousStage;
	}
	wrappedSink.begin(spliterator.getExactSizeIfKnown());
	p.forEachWithCancel(spliterator, wrappedSink);
	wrappedSink.end();
}

其中forEachWithCancel方法会跳过后面所有的Sink:

final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
	Spliterator.OfInt spl = adapt(spliterator);
	IntConsumer adaptedSink = adapt(sink);
	do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
}

 

顺序

在Stream中存在两种顺序: (1)encounter order:如果集合本身有序(比如:list),返回的结果就按集合的顺序; (2)thread order :在并行情况下,集合被分成几部分分别在不同的线程中执行,有可能处于后面的元素先处理,这种线程顺序也称为时间顺序。 以下代码中forEach打印的结果是乱序的,forEachOrdered是有顺序的。

public static void main(String[] args) {
	List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
	list.stream().parallel().forEach(x -> System.out.print(x));
	list.stream().parallel().forEachOrdered(x -> System.out.print(x));
}

除了使用forEachOrdered保证顺序外,Collectors.toList()也可以保证顺序,二都最终都是通过ForEachOrderedTask类来实现的,具体可以参看ForEachOp.ForEachOrderedTask类中的代码。


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)