对于java开发从业人员来说,并发编程是绕不开的话题,juc并发包下提供了一系列多线程场景解决方案。
随着jdk1.8的普及,多线程处理问题,除了使用使用 线程池(ExecutorService) ,很多人选择了 parallelStream() 并行流,底层使用 forkjoin 实现并行处理。
那么并行和并发又有什么区别?究竟改如何选择?滥用时又会有什么影响?
这些问题我分以下几篇文章进行详细说明:
1. 多线程并发和并行的区别
2. parallelStream()并行滥用的后果 (本文)
3. forkjoin和forkjoinpool讲解
4. 线程池正确用法

坑点挨个举例

线程不安全

随着java8 stream的普及,很多人开始使用并行流parallelStream(),进行并行计算,很多人会忽略一个问题:
parallelStream()加入这行代码,就成了多线程,HaashMap线程不安全,会导致各种多线程问题。

		// 线程不安全的错误例子
		HashMap<String, Object> hashMap = new HashMap<>();
        numList.parallelStream().forEach(curr -> {
            hashMap.put(curr);
        });
		// 应该使用ConcurrentHashMap
		ConcurrentHashMap<String, Object> concurrentHashMap = new ConcurrentHashMap<>();
        numList.parallelStream().forEach(curr -> {
            concurrentHashMap.put(curr);
        });

公共commonPool导致程序卡顿

 cpu和内存都正常的情况下,生产环境遇到一次,脚本卡住几个小时才执行完,线程堆栈分析发现在下面代码线程进入waiting状态,而且是偶发。
 parallelStream()底层用的ForkJoinPool.commonPool();进行并行计算。代码中多个脚本同时用到parallelStream时,会共用线程池,一个脚本io慢,其他脚本都等等,用到的脚本越多,卡的时间越长。
这一点非常重要:大部分人使用parallelStream不知道底层原理,把parallelStream当做多线程使用,这个非常危险,用的地方越多,程序卡顿时间越长。

		// 脚本a
		aaaList.parallelStream().forEach(curr -> {
            // 大量数据库查询等脚本代码
        });
		// 脚本b
		bbbList.parallelStream().forEach(curr -> {
            // 大量数据库查询等脚本代码
        });

并行代码中Threadlocal失效

 java开发中,我们经常会用到Threadlocal。parallelStream()是一个隐式线程池,比如:读写分离的连接名称、request通用获取等等,并行代码块中都将失效。

parallelStream()正确用法

  • cpu密集型的运算
    parallelStream底层使用和cpu核数一样多的ForkJoinPool并行计算,适合cpu密集型业务直接使用。
		// 正常使用即可,并行为计算而生
		aaaList.parallelStream().forEach(curr -> {
            // 正常编写业务代码
        });
  • io密集的多线程场景
    首先io密集场景适合使用线程池,不建议使用parallelStream()
    如果非要使用,可以做如下优化:
    io密集的业务,消耗cpu较少,出现慢sql等场景会导致线程等待,不能使用默认ForkJoinPool.commonPool(),自定义ForkJoinPool。
		// 并行代码放到ForkJoinPool中执行,就不再使用公共ForkJoinPool.commonPool()
 		ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.execute(() -> {
            aaaList.parallelStream().forEach(curr -> {
            // 正常编写业务代码
        });
        });

注意事项总结

  • 并行代码块中需要使用AtomicInteger、ConcurrentHashMap等线程安全类。
  • parallelStream默认使用的commonPool,在io密集场景下不可大量使用
  • 数量级小的计算就别用并行了,cpu切换耗时反而慢
  • 在用到threadlocal的情景下,谨慎使用parallelStream和线程池,多线程中无法获取主线程的threadlocal。
  • 如果完全不懂parallelStream底层原理,建议不要使用
背景 对于java开发从业人员来说,并发编程是绕不开的话题,juc并发包下提供了一系列多线程场景解决方案。 随着jdk1.8的普及,多线程处理问题,除了使用使用线程池(ExecutorService),很多人选择了parallelStream() 并行流,底层使用forkjoin实现并行处理。 那么并行和并发又有什么区别?究竟改如何选择?滥用时又会有什么影响?  这些问题我分以下几篇文章进行详细说明:  1. 多线程并发和并行的区别  2. parallelStream()并行滥用的后果 (本文 该软件包有助于提高命令行应用程序的性能,这些应用程序可以转换数据以及以面向行的方式读取数据。 注意:输入行的顺序不会保留在输出中。 主要类型是 ,它从读取,将一个函数应用于每个输入行(默认情况下用换行符分隔)并将结果写入 。 采用字节片,因此不采用任何特定格式,因此输入可以是普通行,CSV,换行分隔的JSON或类似的面向行的格式。 输出仅为字节,并且可以再次采用任何格式。 一个不执行任何操作的简单转换的示例: func Noop ( b [] byte ) ([] byte , error ) { return b , nil 我们可以将此功能连接到IO并使其运行: p := parallel . NewProcessor ( os . Stdin , os . Stdout , Noop ) if err := p . Run (); err
1.因为是并行流,所以所涉及到的数据结构,需要使用线程安全的,比如 listByPage.parallelStream().forEach(str-> { //使用线程安全的数据结构 //ConcurrentHashMap //CopyOnWriteArrayList //等等进行操作 2.默认优先用在CPU密集型计算中,这里有的人就说了,用在IO密集比如HTTP请求啊什么的这种耗时高的操
Parallel Stream(并行流) 以上介绍的创建Stream流对象的3种方法都是创建的串行流(Serial Stream),串行流就是将源数据转换为一个流对象,然后单线程下执行聚合操作的流(也就是单一管道流) 并行流(Parallel Stream)就是将数据分为多个子流对象进行多线程操作(也就是多个管道流) 如下图介绍: 简述:在创建Stream流对象时,默认创建都是串行流。两种方式创建Stream并行流:第1种方式是通过Collection集合接口中的parallelStream()方法直
Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。 Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。 Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。 这种风格将要处理的元素集合看作一种流, 流在
小白趟坑—慎用 parallelStream().forEach() ps: 本系列第一贴,也可能是最后一贴。 循环遍历集合,是日常开发中常用的功能,自从java8引入lambda之后,除了常规的foreach之外,可以将遍历转为流操作,然后进行遍历,提升效率。 parallelStream() 和 stream(),都可以讲集合转化为流,通过字面意思,可以看出 parallelStream() 是并行的操作,在大数据量下会优于 stream(),于是就有了以下代码片段(片段经过简化,会有不符合编程规范的地
一、parallelStream内部使用了哪些线程 Java8中提供了能够更方便处理集合数据的Stream类,其中parallelStream()方法能够充分利用多核CPU的优势,使用多线程加快对集合数据的处理速度。不熟悉Stream类的同学,可以先参考我的另外一篇文章Java8中Stream的常用方法 以一个简单的例子,来看看parallelStream内部到底使用了哪些线程 Integer[] array = new Integer[]{1, 2, 3, 4, 5};
Java中,如果在并行流中使用`parallelStream`,则主线程不会捕获子线程中的异常。相反,异常会被包装在`CompletionException`中,然后被抛出。这是因为并行流中的任务会在多个线程中并行执行,因此异常会在子线程中抛出,而主线程无法捕获这些异常。 如果您要处理并行流中的异常,可以使用`try-catch`块捕获`CompletionException`并使用`getCause()`方法获取原始的异常。您还可以使用`exceptionally`方法在流中处理异常,例如: ```java List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); numbers.parallelStream() .map(n -> n / 0) // Causes an ArithmeticException .map(n -> n + 1) // This line will not be executed .exceptionally(ex -> { System.out.println("Caught exception: " + ex.getCause()); return 0; .forEach(System.out::println); 在这个例子中,`map(n -> n / 0)`会抛出一个`ArithmeticException`,因为它试图将每个数字除以0。由于我们使用`parallelStream`,这个异常会被包装在`CompletionException`中并抛出。我们使用`exceptionally`方法处理这个异常,并在控制台上打印出错误消息,然后返回0,以避免后续的操作抛出异常。