• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java8 并行流

java8 来源:Weison Wei 1次浏览
1 Java8 并行Stream
1.1 并行化流操作

Stream 可以并行化操作,迭代器只能命令式地、串行化操作;
串行方式遍历时,每个 item 读完后再读下一个 item;
并行方式遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出;
#是一个并行执行的流,它通过默认的ForkJoinPool,提高你的多线程任务的速度
#处理的过程会是将一个大任务切分成多个小任务,每个任务都是一个操作

1.2 并行流创建方式

1)已经存在Stream对象,调用parallel()方法获得
2)从集合创建一个流,调用parallelStream()获得

1.3 串行并行效果比对

1.3.1 无序性
因为并行化操作被拆分成不同的任务独立执行,所以结果会跟串行foreach有出入:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
System.out.println("-------------stream---------------");
numbers.stream().forEach(out::print);
System.out.println("");
System.out.println("----------parallelStream----------");
//并行处理 执行结果是乱序的
numbers.parallelStream().forEach(out::print);

执行结果:

-------------stream---------------
123456789
----------parallelStream----------
685793214

1.3.2 拆分成的子任务由不同线程执行,速度更快:

List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10000000; i++) {
    list.add(i);
}
System.out.println("-------------stream 10000000---------------");
long st = System.currentTimeMillis();
list.stream().map(element->element+"");
long so = System.currentTimeMillis();
System.out.println("-------------stream cost: "+(so-st));
System.out.println("----------parallelStream 10000000----------");
//并行处理 执行结果是乱序的
long st1 = System.currentTimeMillis();
list.parallelStream().map(element->element+"");
long so1 = System.currentTimeMillis();
System.out.println("-------------parallelStream cost: "+(so1-st1));

执行结果:

-------------stream 10000000---------------
-------------stream cost: 68
----------parallelStream 10000000----------
-------------parallelStream cost: 2
2 并行流规则

为了发挥并行流框架优势,在使用并行流需要遵守一些规则:
1)初始值必须为组合函数的恒等值;
2)组合操作必须符合结合律;

3 并行流性能

为了并行流框架的性能受以下因素影响:
数据大小:数据够大,每个管道处理时间够长,并行才有意义;
源数据结构:每个管道操作都是基于初始数据源,通常是集合,将不同的集合数据源分割会有一定消耗;
装箱:处理基本类型比装箱类型要快;
核的数量:默认情况下,核数量越多,底层fork/join线程池启动线程就越多;
单元处理开销:花在六中每个元素身上的时间越长,并行操作带来的性能提升越明显;

源数据结构分为以下3组:
性能好:ArrayLiast、数组或IntStream.range(数据支持随机读取,能轻易地被任意分割)
性能一般:HashSet、TreeSet(数据不易公平地分解,大部分也是可以的)
性能差:LinkedList(难以对半分解)、Stream.iterate和BuffferedReader.lines(长度未知,难以分解)

4 并行流原理

并行流底层采用ForkJoinPool线程池执行分段任务,fork递归式的分解任务,然后分段并行执行,最终由join合并结果,返回最后的值;

Fork/Join建立在ExecutorService之上,与传统的线程主要的区别在于如何在线程和支持多核的机器间分配工作; 用一个简单的
ExecutorService你能完全控制工作线程之间的负载分布,确立每个任务的大小以便线程来处理;
Fork/Join有个work-stealing算法用以分配线程间的负载,可以将大型任务可以被分成更小单元,并在不同的线程间处理;

// 构造三个100000个元素的集合
        List<Integer> list = new ArrayList<>();
        List<Integer> list1 = new ArrayList<>();
        List<Integer> list2 = new ArrayList<>();
        for (int i = 0; i < 100000; i++) {
            list.add(i);
            list1.add(i);
            list2.add(i);
        }

        // 1 统计并行执行list的线程
        // 1.1 list并行流操作线程集合threadSet
        Set<Thread> threadSet = new CopyOnWriteArraySet<>();
        // 1.2 并行遍历list,并获取当前执行线程
        list.parallelStream().forEach(integer -> {
            // 1.3 获取并统计并行执行list的线程
            Thread thread = Thread.currentThread();
            threadSet.add(thread);
        });

        System.out.println("系统一个有" + Runtime.getRuntime().availableProcessors() + "个cpu");
        System.out.println("threadSet一共有" + threadSet.size() + "个线程");

        // 2 统计并行执行list1的线程
        // 2.1 list并行流操作线程集合threadSetTwo
        Set<Thread> threadSetTwo = new CopyOnWriteArraySet<>();
        // 2.2 限制同时只能执行两个并行任务
        CountDownLatch countDownLatch = new CountDownLatch(2);
        // 2.3 遍历list1,并获取当前执行线程
        Thread threadA = new Thread(() -> {
            list1.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                threadSetTwo.add(thread);
            });
            //遍历完后计数器减一
            countDownLatch.countDown();
        });
        // 2.4 遍历list1,并获取当前执行线程
        Thread threadB = new Thread(() -> {
            list2.parallelStream().forEach(integer -> {
                Thread thread = Thread.currentThread();
                threadSetTwo.add(thread);
            });
            //遍历完后计数器减一
            countDownLatch.countDown();
        });

        threadA.start();
        threadB.start();
        //通知其他线程执行
        countDownLatch.await();
        System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "个线程");

        System.out.println("---------------------------");
        System.out.println("threadSet:" + threadSet);
        System.out.println("threadSetTwo:" + threadSetTwo);
        System.out.println("---------------------------");
        threadSetTwo.addAll(threadSet);
        System.out.println("threadSet合并后:" + threadSetTwo);
        System.out.println("threadSetTwo合并后一共有" + threadSetTwo.size() + "个线程");
        System.out.println("系统一个有" + Runtime.getRuntime().availableProcessors() + "个cpu");

执行结果:

系统一个有4个cpu
threadSet一共有4个线程
threadSetTwo一共有5个线程
---------------------------
threadSet:[Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[main,5,main], Thread[ForkJoinPool.commonPool-worker-3,5,main]]
threadSetTwo:[Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,]]
---------------------------
threadSet合并后:[Thread[ForkJoinPool.commonPool-worker-3,5,main], Thread[ForkJoinPool.commonPool-worker-2,5,main], Thread[ForkJoinPool.commonPool-worker-1,5,main], Thread[Thread-0,5,], Thread[Thread-1,5,], Thread[main,5,main]]
threadSetTwo合并后一共有6个线程

参考文献:
[ 1 ]Java8函数式编程/(英)Richard Warburton著;王群锋译。–北京:人民邮电出版社,2015-03-01.
[ 2 ]https://blog.csdn.net/u011001723/article/details/52794455


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)