• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java 8 Stream 完全教程

Java基础 来源:tinysakurac 1次浏览

转载自浮梁翁的博客Java 8 Stream 教程

文章目录

序言

本文采用实例驱动的方式,对JAVA8的stream API进行一个深入的介绍。虽然JAVA8中的stream API与JAVA I/O中的InputStream和OutputStream在名字上比较类似,但是其实是另外一个东西,Stream API是JAVA函数式编程中的一个重要组成部分。

本文描述如何使用JAVA8的Stream API。通过本文,你可以了解Stream API的执行顺序,不同的执行顺序会对stream api的执行效率有较大的影响。本文会详细描述Stream API中的reduce,collect,flatMap等操作,结尾部分会深入讲解parallel streams。

如果你对JAVA8中新增的概念:lambda表达式,函数式接口,方法引用不熟悉。可以从:Java 8 Tutorial一文中获取相关的知识。

Stream是如何工作的

stream是一个可以对个序列中的元素执行各种计算操作的一个元素序列。

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2

stream包含中间(intermediate operations)最终(terminal operation)两种形式的操作。中间操作(intermediate operations)的返回值还是一个stream,因此可以通过链式调用将中间操作(intermediate operations)串联起来。最终操作(terminal operation)只能返回void或者一个非stream的结果。在上述例子中:filter, map ,sorted是中间操作,而forEach是一个最终操作。更多关于stream的中可用的操作可以查看java doc。上面例子中的链式调用也被称为操作管道流。

大多stream操作接受某种形式的lambda表达式作为参数,通过方法接口的形式指定操作的具体行为,这些方法接口的行为基本上都是无干扰(non-interfering)和无状态(stateless)。无干扰(non-interfering)的方法的定义是:该方法不修改stream的底层数据源,比如上述例子中:没有lambda表达式添加或者删除myList中的元素。无状态(stateless)方法的定义:操作的执行是独立的,比如上述例子中,没有lambda表达式在执行中依赖可能发生变化的外部变量或状态。

streams分类

可以从不同的数据源创建stream。java collection包中的Collections,Lists,Sets这些类中新增stream()和parallelStream()方法,通过这些方法可以创建一个顺序stream(sequential streams)或者一个并发的stream(Parallel streams)。并发stream(Parallel streams)更适合在多线程中使用,本文先介绍顺序流(sequential streams)在结尾会描述并发stream(Parallel streams),

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

List对象上调用stream()方法可以返回一个常规的对象流。在下面的例子中我们不需要创建一个collection对象也可以使用stream:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

直接使用Stream.of()方法就能从一组对象创建一个stream对象,

除了常规的对象流,JAVA 8中的IntStream,LongStream,DoubleStream这些流能够处理基本数据类型如:int,long,double。比如:IntStream可以使用range()方法能够替换掉传统的for循环

IntStream.range(1, 4)
    .forEach(System.out::println);

基本类型流(primitive streams)使用方式与常规对象流类型(regular object streams)大部分相同,但是基本类型流(primitive streams)能使用一些特殊的lambda表达式,比如:用IntFunction代替Function,用IntPredicate代替Predicate,同时基本类型流(primitive streams)中可以支持一些聚合方法,如:sum(),average()等。

Arrays.stream(new int[] { 1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

可以通过常规对象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本类型对象流(primitive streams)中的mapToObj()等方法完成常规对象流和基本类型流之间的相互转换

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

下面这个例子中doubles stream先被映射成int stream,然后又被映射成String类型的对象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

处理顺序

前面描述了如何创建和使用各种stream,现在开始深入了解stream执行引擎的工作原理。

Laziness(延迟加载)是中间操作(intermediate operations)的一个重要特性。如下面这个例子:中间操作(terminal operation)缺失,当执行这个代码片段的时候,并不会在控制台打印相应的内容,这是因为只有最终操作(terminal operation)存在的时候,中间操作(intermediate operations)才会执行。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> { 
        System.out.println("filter: " + s);
        return true;
    });

给上面的例子添加最终操作(terminal operation)forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> { 
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

执行结果如下:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

执行结果比较让人惊奇,想当然的做法是水平执行此流上的所有元素。但是实际上是每一个元素沿着链垂直移动,第一个字符串”d2″执行完filter和forEach后第二个元素”a2″才开始执行。

这种沿着链垂直移动的行为可以降低每一个元素上进行操作的数量,如我们在下面的例子中所示:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> { 
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> { 最终操作
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2

当对给定元素执行判断为真时anyMatch操作会立刻返回true,在上面例子中执行到元素“A2”的时候,元素判断为真anyMatch立刻返回true,由于流是沿着链垂直移动的,因此上面的map操作只会执行两次。

注:stream的执行流程类似shell中管道:ps xxx | grep “sss” | grep “ccc”,是按照输入行的形式进行处理。

执行效率与steream执行链顺序的关系

下面的例子由两个中间操作(intermediate operations)map和filter以及一个最终操作(terminal operation)forEach构成,我们观察这些动作是如何执行的。

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> { 
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> { 
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));
// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

你可能已经猜想到:map和filter操作被执行了5次,但是forEach操作只被执行了1次。我们可以通过修改操作的执行顺序(如:将filter操作移到操作链的头部),大幅度降低执行次数

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> { 
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> { 
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1中间操作
// filter:  b3
// filter:  c

修改后map只被执行了1次,如果此时数据量比较大则操作管道的执行效率会有较大的提升,在处理复杂方法链的时候需要注意执行顺序对执行效率的影响。

给上面的例子添加sort操作。

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> { 
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> { 
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> { 
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

执行结果如下:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

Sorting 是一种特殊的中间操作(intermediate operation),在对集合中元素进行排序过程中需要保存元素的状态,因此Sorting 是一种有状态的操作(stateful operation)。

首先,在整个输入集上执行排序操作(即先对集合进行水平操作),由于输入集合中的元素间存在多种组合,因此上面的例子中sorted操作被执行了8次。

可以通过对执行链重排序的方式,提升stream的执行效率。修改执行链顺序之后由于filter操作的过滤,导致sorted操作的输入集只有一个元素,在大数据量的情况下能够大幅度提高执行效率。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> { 
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> { 
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> { 
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

流复用

Java 8 streams不能被复用,当你执行完任何一个最终操作(terminal operation)的时候流就被关闭了

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));        

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

在同一个stream中执行完anyMatch后再执行noneMatch就会抛出如下异常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

可以通过为每个最终操作(terminal operation)创建一个新的stream链的方式来解决上面的重用问题,Stream api中已经提供了一个stream supplier类来在已经存在的中间操作(intermediate operations )的stream基础上构建一个新的stream。

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok
streamSupplier的每个get()方法会构造一个新的stream,我们可以在这个stream上执行期望的最终操作(terminal operation)。

高级操作

Streams支持多种不同的操作(operations),我们已经了解过filter,map等比较重要的操作。你可以通过Stream Javadoc进一步了解更多的操作。现在我们开始深入探讨更复杂的操作:collect flatMap reduce。

假设存在如下的用户列表:

class Person { 
    String name;
    int age;

    Person(String name, int age) { 
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() { 
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect(收集)

Collect(收集)是一种是十分有用的最终操作,它可以把stream中的元素转换成另外一种形式,比如;list,set,map。Collect使用Collector作为参数,Collector包含四种不同的操作:supplier(初始构造器), accumulator(累加器), combiner(组合器), finisher(终结者)。这听起来很复杂,但是一个好消息是java 8通过Collectors类内置了各种复杂的收集操作,因此对于大部分常用的操作来说,你不需要自己去实现collector类。

从一个十分常见的用类开始:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

通过上面的demo可以看出,将stream转换为List十分简单,如果想转换为Set的话,只需使用Collectors.toSet()就可以了。

下面的例子暂时将用户按年龄分组:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors类功能繁多,你可以通过Collectors对stream中的元素进行汇聚,比如:计算所有用户的年纪。

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

可以通过summarizing collectors能返回一个内置的统计对象,通过这个对象能够获取更加全面的统计信息,比如用户年纪中的最大值,最小值,平均年纪等结果。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子展示如何将所有用户连接成一个字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

join collector的三个参数分别表示:连接符,字符串前缀,字符串后缀(可选)。

将一个stream转换为map,我们必须指定map的key和value如何映射。要注意的是key的值必须是唯一性的,否则会抛出IllegalStateException,但是可以通过使用合并函数(可选)绕过这个IllegalStateException异常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

前文已经介绍了jdk内置的一些很有用的collectors,接下来开始介绍如何构造我们自己所需的collector,我们的目标是将stream中所有用户的用户名变成大写并用”|”符号连接成一个字符串。为了达成这个目标我们通过Collector.of()方法创建了一个新的collector,我们必须给这个collector提供四种功能:supplier, accumulator, combiner,finisher.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

由于JAVA中String是一个不可变对象,因此我们需要一个辅助类(比如StringJoiner)来帮助collect构造我们的字符串。supplier创建了一个包含适当分隔符的StringJoiner对象,accumulator用来将每个用户名转为大写并添加到supplier创建的StringJoiner中,combiner将两个StringJoiners对象连接成一个,最后一步的finisher从StringJoiner中构建出所希望的得到的string对象。

FlatMap

我们已经了解:通过map方法可以将stream中的一种对象转换成另外一种对象。但是map方法还是有使用场景限制,只能将一种对象映射为另外一种特定的已经存在的对象。是否能够将一个对象映射为多种对象,或者映射成一个根本不存在的对象呢。这就是flatMap方法出现的目的。

FlatMap方法可以将一个stream中的每一个元素对象转换为另一个stream中的另一种元素对象,因此可以将stream中的每个对象改造成零,一个或多个。flatMap操作的返回流包含这些改造后的对象。

为了演示flatMap,定义一个继承关系如下:

class Foo { 
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) { 
        this.name = name;
    }
}

class Bar { 
    String name;

    Bar(String name) { 
        this.name = name;
    }
}

通过流实例化一队对象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
完成上述操作之后我们得到三个foos,每个foos包含三个bars。

FlatMap接收一个返回值为stream的函数做参数,通过传递合适的函数,就可以解析每一个foo下对应的bar对象

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

正如所见,我们成功地将三个对象的stream转换成一个包含九个对象的stream

最后,上面的示例代码可以简化为一个单一管道流:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

FlatMap也支持JAVA8中新引入的Optional类,Optionals flatMap能返回一个另外的类的optional包装类,可以用来减少对null的检查。

假设有如下这种多层级结构:

class Outer { 
    Nested nested;
}

class Nested { 
    Inner inner;
}

class Inner { 
    String foo;
}

为了获取内部outer实例的内部foo对象,需要添加一系列空指针判断

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) { 
    System.out.println(outer.nested.inner.foo);
}
可以采用optionals flatMap 操作获得相同的结果:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

上面的例子中flatMap的每次调用都会返回一个用Optional对象,如果有返回值则这个Optional对象是这个返回值的包装类,如果返回值不存在则返回null。

Reduce(减少)

reduce操作可以将stream中所有元素组合起来得到一个元素,JAVA8支持三中不同的reduce方法。

第一种能从stream元素序列中提取一个特定的元素。比如下面的从用户列表中选择年纪最大的用户操作:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

上面的实例中reduce方法接收一个二元累加计算函数(BinaryOperator accumulator function)作为参数,二元操作(BinaryOperator)实际就是上在两个操作数共享同一类型。示例中函数比较两人年龄,返回的最大年龄的人。

第二种reduce操作接收一个标识值和一个二元操作累加器作为参数,这个reduce方法可以把stream中所有用户的名字和年龄汇总得到一个新用户。

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> { 
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三种reduce方法,接收三个参数:一个标示值(identity value),一个二元操作累加器(BiFunction accumulator),一个二元组合方法。由于标识符参数未被严格限制为person类型,因此我们可以用这个reduce方法来获取用户的总年龄。

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

计算的结果是76,通过添加调试输出,我们可以详细地了解执行引擎中发生了什么。

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> { 
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> { 
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

从调试输出中可以看到,累加器做了所有的工作,它首先获取值为0的标示值和第一个用户Max,接下来的三步中持续sum值由于累加不断变大,在最后一步汇总的年纪增长到76。

注意,上面的调试输出中combiner没有执行,通过parallel执行上面相同stream。

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> { 
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> { 
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

通过并行的方式执行上面的stream操作,得到的是另外一种完全不相同的执行动作。在并行stream中combiner方法会被调用。这是由于累加器是被并行调用的,因此组合器需要对分开的累加操作进行求和。

下一章会详细描述并行stream。

Parallel Streams(并行流)

为了提高大量输入时的执行效率,stream可以采用并行的放行执行。并行流(Parallel Streams)通过ForkJoinPool.commonPool() 方法获取一个可用的ForkJoinPool。这个ForkJoinPool使用5个线程(实际上是由底层可用的物理cpu核数决定的)

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3
On my machine the common pool is initialized with a parallelism of 3 per default. This value can be decreased or increased by setting the following JVM parameter:

在我的机器上公共池初始化为每个默认3并行,这个值可以通过调整jvm参数来修改:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Collections中包含parallelStream()方法,通过这个方法能够为Collections中的元素创建并行流。另外也可以调用stream的parallel()方法将一个顺序流转变为一个并行流的拷贝。

为了了解并行流的执行动作,下面的例子会打印当前线程的执行信息。

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> { 
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> { 
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

执行的结果如下:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

通过分析调试输出,我们可以更好地了解哪一个线程执行了哪些stream操作。从上面的输出中我们可以看到parallel stream使用了ForkJoinPool提供的所有可用的线程来执行流的各种操作。由于不能确定哪个线程会执行并行流的哪个操作,因此反复执行上面的代码,打印的结果会不同。

扩充上面的例子,添加sort操作

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> { 
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> { 
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> { 
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

执行结果如下:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

这个执行结果看起来比较奇怪,看起来sort操作只是在main线程中顺序执行的。实际上,parallel stream中的sort操作使用了JAVA 8的一个新方法:Arrays.parallelSort()。JAVA doc中是这样描述Arrays.parallelSort()的:待排序数组的长度决定了排序操作是顺序执行还是并行执行。java doc 描述如下:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上一章的例子,我们已经了解combiner方法只能在parallel streams中调用,让我们来看下那些线程被实际调用:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> { 
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> { 
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

执行结果如下:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

从控制台输出可以看到accumulator和combiner操作都被可用的线程并行执行了。

总结起来:在大数据量输入的时候,parallel streams可以带来比较大的性能提升。但是应该记住,一些并行操作,比如:reduce,collect需要额外的计算(组合操作),但是在顺序流中,这些组合操作是不需要的。

另外,我们知道所有的parallel stream操作共享一个jvm范围内的ForkJoinPool,所以你应该注意避免在parallel stream上执行慢阻塞流操作,因为这些操作可能导致你应用中依赖parallel streams操作的其他部分也会响应变慢。

结尾

如果你想更多了解JAVA 8 的stream,你可以阅读stream的JAVA doc,如果你想更深入了解stream的底层机制,你可以阅读Martin Fowlers的文章Collection Pipelines。

如果你对js也感兴趣,你可以查看Stream.js(一个用js实现的java 8 stream api),你也可以查看我写的java8教程。

希望这个教程对你有帮助,你也喜欢阅读这个教程。这个教程的源码和例子在github上,你可以免费fork或者在twitter上给我反馈。


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)