Java并行流
在本教程中,我们将看到Java中的并行流。
Java并行流介绍
Java 8介绍了并行流的概念来进行并行处理。
由于我们现在拥有更多廉价的硬件成本,因此,并行处理可用于更快地执行操作。
让我们看个简单的例子
package org.igi.theitroad.java8; import java.util.Arrays; import java.util.stream.IntStream; public class Java8ParallelStreamMain { public static void main(String[] args) { System.out.println("================================="); System.out.println("Using Sequential Stream"); System.out.println("================================="); int[] array= {1,2,3,4,5,6,7,8,9,10}; IntStream intArrStream=Arrays.stream(array); intArrStream.forEach(s-> { System.out.println(s+" "+Thread.currentThread().getName()); } ); System.out.println("================================="); System.out.println("Using Parallel Stream"); System.out.println("================================="); IntStream intParallelStream=Arrays.stream(array).parallel(); intParallelStream.forEach(s-> { System.out.println(s+" "+Thread.currentThread().getName()); } ); } }
运行上面的程序时,我们将得到以下输出
================================= Using Sequential Stream ================================= 1 main 2 main 3 main 4 main 5 main 6 main 7 main 8 main 9 main 10 main ================================= Using Parallel Stream ================================= 7 main 6 ForkJoinPool.commonPool-worker-3 3 ForkJoinPool.commonPool-worker-1 9 ForkJoinPool.commonPool-worker-2 2 ForkJoinPool.commonPool-worker-3 5 ForkJoinPool.commonPool-worker-1 10 ForkJoinPool.commonPool-worker-2 1 ForkJoinPool.commonPool-worker-3 8 ForkJoinPool.commonPool-worker-2 4 ForkJoinPool.commonPool-worker-1
如果我们注意到输出,则主线程在顺序流时执行所有工作。
它等待当前的迭代完成,然后在下一次迭代时进行工作。
在并行流的情况下,将同时生成4个线程,并使用Fork并加入池在内部生成和管理线程.Parallel Streams Create ForkJoinPool
实例通过静态 ForkJoinPool.commonPool()
方法。
并行流占据所有可用的好处 CPU cores
并并行处理任务。
如果任务数超出核心的数量,则剩余任务等待当前运行的任务完成。
并行流很酷,所以你应该总是用吗?
不是,只需添加即可轻松地将顺序流转换为并行流。
并行,并不意味着我们应该始终使用它。
使用并行流时需要考虑许多因素,否则我们将遭受并行流的负面影响。
并行流的开销比顺序流更高,并且在线程之间坐标需要良好的时间。
如果且仅当才能考虑并行流,则只需:
- 我们有大型数据集要处理。
- 如我们所知,Java使用
ForkJoinPool
要实现并行性,ForkjoInpool叉源流并提交执行,因此源流应分类。
例如:ArrayList非常易于拆分,因为我们可以通过其索引找到一个中间元素并将其拆分,但LinkedList非常困难拆分,并且在大多数情况下都不会很好地表现得很好。 - 我们实际上患有性能问题。
- 我们需要确保线程之间的所有共享资源都需要正确同步,否则可能会产生意外结果。
衡量并行性的最简单公式是Brian Goetz在他的演示文稿中提供的"NQ"模型。
NQ模型:
n x q> 10000
其中,n =数据集中的项目数q =每个项目的工作量
这意味着如果我们有大量数据集和每个项目的工作(例如:和),则并行性可能会更快运行程序,反之亦然也是如此。
因此,如果数据集数量少以及每件商品的更多工作(做一些计算工作),那么也是如此 parallelism
可能会更快地实现结果。
让我们在另一个例子的帮助下看看。
在此示例中,我们将在并行流和顺序流的情况下执行长度计算时,我们将如何表现为多个计算。
我们正在进行一些arbit计算以使CPU忙。
package org.igi.theitroad.java8; import java.util.ArrayList; import java.util.List; public class PerformanceComparisonMain { public static void main(String[] args) { long currentTime=System.currentTimeMillis(); List<Integer> data=new ArrayList<Integer>(); for (int i = 0; i < 100000; i++) { data.add(i); } long sum=data.stream() .map(i ->(int)Math.sqrt(i)) .map(number->performComputation(number)) .reduce(0,Integer::sum); System.out.println(sum); long endTime=System.currentTimeMillis(); System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes"); } public static int performComputation(int number) { int sum=0; for (int i = 1; i < 1000000; i++) { int div=(number/i); sum+=div; } return sum; } }
但我们对此处的输出不感兴趣,但在执行上述操作时CPU如何表现。
正如我们在顺序流的情况下,可以看到CPU未充分利用。
让我们在16行没有改变。
并使流并行并再次运行程序。
long sum=data.stream() .parallel() .map(i ->(int)Math.sqrt(i)) .map(number->performComputation(number)) .reduce(0,Integer::sum);
我们使用并行流运行程序时检查CPU历史记录。
如我们所见,并行流使用所有4个CPU核心以执行计算。
并行流中的自定义线程池
默认使用并行流 ForkJoinPool.commonPool
它具有比处理器数量更少的线程。
这意味着并行流使用所有可用的处理器,因为它也使用主线程。
如果我们使用多个并行流,然后它们将共享相同 ForkJoinPool.commonPool
这意味着我们可能无法使用分配给每个并行流的所有处理器。
要解决此问题,我们可以在处理流时创建自己的线程池。
ForkJoinPool fjp = new ForkJoinPool(parallelism);
这将是创造的 ForkJoinPool
与目标 parallelism
等级。
如果我们不通过并行性,则默认情况下它将等于处理器的数量。
现在,我们可以将并行流提交到此自定义forkjoinpool。
ForkJoinPool fjp1 = new ForkJoinPool(5); Callable<Integer> callable1 = () -> data.parallelStream() .map(i -> (int) Math.sqrt(i)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); try { sumFJ1 = fjp1.submit(callable1).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
例子如下:
package org.igi.theitroad; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; public class PerformanceComparisonMain { public static void main(String[] args) { List<Integer> data = new ArrayList<Integer>(); for (int i = 0; i < 10; i++) { data.add(i); } System.out.println("================"); System.out.println("Parallel stream 1"); System.out.println("================"); long sum1 =data.parallelStream() .map(i -> (int) Math.sqrt(i)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); System.out.println("Sum: "+sum1); System.out.println("================"); System.out.println("Parallel stream 2"); System.out.println("================"); long sum2 = data.parallelStream() .map(i -> ((int) Math.sqrt(i)*10)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); System.out.println("Sum: "+sum2); System.out.println("================"); System.out.println("Parallel stream with custom thread pool 1"); System.out.println("================"); ForkJoinPool fjp1 = new ForkJoinPool(5); long sumFJ1 = 0; Callable<Integer> callable1 = () -> data.parallelStream() .map(i -> (int) Math.sqrt(i)) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); try { sumFJ1 = fjp1.submit(callable1).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("Sum: "+sumFJ1); System.out.println("================"); System.out.println("Parallel stream with custom thread pool 2"); System.out.println("================"); Callable<Integer> callable2 = () -> data.parallelStream() .map(i -> (int) Math.sqrt(i)*10) .map(number -> performComputation(number)) .peek( (i) -> { System.out.println("Processing with "+Thread.currentThread()+" "+ i); }) .reduce(0, Integer::sum); long sumFJ2 = 0; ForkJoinPool fjp2 = new ForkJoinPool(4); try { sumFJ2 = fjp2.submit(callable2).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("Sum: "+sumFJ2); } public static int performComputation(int number) { int sum = 0; for (int i = 1; i < 100000; i++) { int div = (number/i); sum += div; } return sum; } }
如我们所见,前两个并行流正在使用 ForkJoinPool.commonPool
和接下来的2是使用自定义线程池,例如: ForkJoinPool-1
和 ForkJoinPool-2
使用并行流时,我们应该记住的事情
有状态的lambda表达
我们应该避免使用Stream操作中的状态Lambda表达式。
一个状态lambda表达式是一个输出取决于在执行流操作期间可能会改变的任何状态。
package org.igi.theitroad; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; public class ListOfIntegersStatefulLambda { public static void main(String[] args) { List<Integer> listOfIntegers = Arrays.asList(new Integer[] {40,34,21,37,20}); List<Integer> syncList = Collections.synchronizedList(new ArrayList<>()); listOfIntegers.parallelStream() //You shou! It uses a stateful lambda expression. .map(e -> { syncList.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); syncList.stream().forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); } }
.map(e -> {syncList.add(e); return e;})
是有状态的lambda和订单 .map(e -> {syncList.add(e); return e;})
添加元素到 syncList
可以不同,因此我们不应该在使用并行流时使用状态lambda操作。
干涉
在流操作lambda表达式不应修改流的源。
以下代码尝试将元素添加到整数列表并抛出并发映射异常。
package org.igi.theitroad; import java.util.ArrayList; import java.util.List; public class ListOfIntegersStatefulLambda { public static void main(String[] args) { List<Integer> listOfIntegers = new ArrayList<>(); Integer[] intArray =new Integer[] {40,34,21,37,20}; for(Integer in:intArray) { listOfIntegers.add(in); } listOfIntegers.parallelStream() .peek( i -> listOfIntegers.add(7)) .forEach(e -> System.out.print(e + " ")); System.out.println(""); } }