Java并行流

时间:2020-02-23 14:34:53  来源:igfitidea点击:

在本教程中,我们将看到Java中的并行流。

Java并行流介绍

Java 8介绍了并行流的概念来进行并行处理。

由于我们现在拥有更多廉价的硬件成本,因此,并行处理可用于更快地执行操作。

让我们看个简单的例子

package org.igi.theitroad.java8;
 
import java.util.Arrays;
import java.util.stream.IntStream;
 
public class Java8ParallelStreamMain {
 
	public static void main(String[] args) {
 
		System.out.println("=================================");
		System.out.println("Using Sequential Stream");
		System.out.println("=================================");
		int[] array= {1,2,3,4,5,6,7,8,9,10};
		IntStream intArrStream=Arrays.stream(array);
		intArrStream.forEach(s->
		{
			System.out.println(s+" "+Thread.currentThread().getName());
		}
				);
 
		System.out.println("=================================");
		System.out.println("Using Parallel Stream");
		System.out.println("=================================");
		IntStream intParallelStream=Arrays.stream(array).parallel();
		intParallelStream.forEach(s->
		{
			System.out.println(s+" "+Thread.currentThread().getName());
		}
				);
	}
}

运行上面的程序时,我们将得到以下输出

=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 ForkJoinPool.commonPool-worker-3
3 ForkJoinPool.commonPool-worker-1
9 ForkJoinPool.commonPool-worker-2
2 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-2
1 ForkJoinPool.commonPool-worker-3
8 ForkJoinPool.commonPool-worker-2
4 ForkJoinPool.commonPool-worker-1

如果我们注意到输出,则主线程在顺序流时执行所有工作。
它等待当前的迭代完成,然后在下一次迭代时进行工作。
在并行流的情况下,将同时生成4个线程,并使用Fork并加入池在内部生成和管理线程.Parallel Streams Create ForkJoinPool实例通过静态 ForkJoinPool.commonPool()方法。

并行流占据所有可用的好处 CPU cores并并行处理任务。
如果任务数超出核心的数量,则剩余任务等待当前运行的任务完成。

并行流很酷,所以你应该总是用吗?

不是,只需添加即可轻松地将顺序流转换为并行流。

并行,并不意味着我们应该始终使用它。
使用并行流时需要考虑许多因素,否则我们将遭受并行流的负面影响。

并行流的开销比顺序流更高,并且在线程之间坐标需要良好的时间。
如果且仅当才能考虑并行流,则只需:

  • 我们有大型数据集要处理。
  • 如我们所知,Java使用 ForkJoinPool要实现并行性,ForkjoInpool叉源流并提交执行,因此源流应分类。
    例如:ArrayList非常易于拆分,因为我们可以通过其索引找到一个中间元素并将其拆分,但LinkedList非常困难拆分,并且在大多数情况下都不会很好地表现得很好。
  • 我们实际上患有性能问题。
  • 我们需要确保线程之间的所有共享资源都需要正确同步,否则可能会产生意外结果。

衡量并行性的最简单公式是Brian Goetz在他的演示文稿中提供的"NQ"模型。

NQ模型:

n x q> 10000

其中,n =数据集中的项目数q =每个项目的工作量

这意味着如果我们有大量数据集和每个项目的工作(例如:和),则并行性可能会更快运行程序,反之亦然也是如此。
因此,如果数据集数量少以及每件商品的更多工作(做一些计算工作),那么也是如此 parallelism可能会更快地实现结果。

让我们在另一个例子的帮助下看看。

在此示例中,我们将在并行流和顺序流的情况下执行长度计算时,我们将如何表现为多个计算。
我们正在进行一些arbit计算以使CPU忙。

package org.igi.theitroad.java8;
import java.util.ArrayList;
import java.util.List;
 
public class PerformanceComparisonMain {
 
	public static void main(String[] args) {
		
		long currentTime=System.currentTimeMillis();
		List<Integer> data=new ArrayList<Integer>();
		for (int i = 0; i < 100000; i++) {
			data.add(i);
		}
		
		long sum=data.stream()
				.map(i ->(int)Math.sqrt(i))
				.map(number->performComputation(number))
				.reduce(0,Integer::sum);
		
		System.out.println(sum);
		long endTime=System.currentTimeMillis();
		System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
		
	}
	
	public static int performComputation(int number)
	{
		int sum=0;
		for (int i = 1; i < 1000000; i++) {
			int div=(number/i);
			sum+=div;
			
		}
		return sum;
	}
}

但我们对此处的输出不感兴趣,但在执行上述操作时CPU如何表现。

正如我们在顺序流的情况下,可以看到CPU未充分利用。

让我们在16行没有改变。
并使流并行并再次运行程序。

long sum=data.stream()
				.parallel()
				.map(i ->(int)Math.sqrt(i))
				.map(number->performComputation(number))
				.reduce(0,Integer::sum);

我们使用并行流运行程序时检查CPU历史记录。

如我们所见,并行流使用所有4个CPU核心以执行计算。

并行流中的自定义线程池

默认使用并行流 ForkJoinPool.commonPool它具有比处理器数量更少的线程。
这意味着并行流使用所有可用的处理器,因为它也使用主线程。

如果我们使用多个并行流,然后它们将共享相同 ForkJoinPool.commonPool这意味着我们可能无法使用分配给每个并行流的所有处理器。

要解决此问题,我们可以在处理流时创建自己的线程池。

ForkJoinPool fjp = new ForkJoinPool(parallelism);

这将是创造的 ForkJoinPool与目标 parallelism等级。
如果我们不通过并行性,则默认情况下它将等于处理器的数量。

现在,我们可以将并行流提交到此自定义forkjoinpool。

ForkJoinPool fjp1 = new ForkJoinPool(5);
Callable<Integer> callable1 = () -> data.parallelStream()
				   .map(i -> (int) Math.sqrt(i))
				   .map(number -> performComputation(number))
				   .peek( (i) -> {
				      System.out.println("Processing with "+Thread.currentThread()+" "+ i);
				                            	  
				    })
				    .reduce(0, Integer::sum);
		
		try {
			sumFJ1 = fjp1.submit(callable1).get();
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}

例子如下:

package org.igi.theitroad;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
 
public class PerformanceComparisonMain {
 
	public static void main(String[] args) {
 
		List<Integer> data = new ArrayList<Integer>();
		for (int i = 0; i < 10; i++) {
			data.add(i);
		}
		
		System.out.println("================");
		System.out.println("Parallel stream 1");
		System.out.println("================");
		long sum1 =data.parallelStream()
        .map(i -> (int) Math.sqrt(i))
        .map(number -> performComputation(number))
        .peek( (i) -> {
     	   System.out.println("Processing with "+Thread.currentThread()+" "+ i);
     	  
     	   })
        .reduce(0, Integer::sum);
        
		System.out.println("Sum: "+sum1);
		
		System.out.println("================");
		System.out.println("Parallel stream 2");
		System.out.println("================");
		
		long sum2 = data.parallelStream()
		        .map(i -> ((int) Math.sqrt(i)*10))
		        .map(number -> performComputation(number))
		        .peek( (i) -> {
		     	   System.out.println("Processing with "+Thread.currentThread()+" "+ i);
		     	  
		     	   })
		        .reduce(0, Integer::sum);
		        
		
		System.out.println("Sum: "+sum2);
		
		System.out.println("================");
		System.out.println("Parallel stream with custom thread pool 1");
		System.out.println("================");
		
		ForkJoinPool fjp1 = new ForkJoinPool(5);
		long sumFJ1 = 0;
 
		Callable<Integer> callable1 = () -> data.parallelStream()
				   .map(i -> (int) Math.sqrt(i))
				   .map(number -> performComputation(number))
				   .peek( (i) -> {
				      System.out.println("Processing with "+Thread.currentThread()+" "+ i);
				                            	  
				    })
				    .reduce(0, Integer::sum);
		
		try {
			sumFJ1 = fjp1.submit(callable1).get();
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
		
		System.out.println("Sum: "+sumFJ1);
		
		System.out.println("================");
		System.out.println("Parallel stream with custom thread pool 2");
		System.out.println("================");
		
		Callable<Integer> callable2 = () -> data.parallelStream()
                .map(i -> (int) Math.sqrt(i)*10)
                .map(number -> performComputation(number))
                .peek( (i) -> {
             	   System.out.println("Processing with "+Thread.currentThread()+" "+ i);
             	  
             	   })
                .reduce(0, Integer::sum);
 
		
		
		long sumFJ2 = 0;
		
		ForkJoinPool fjp2 = new ForkJoinPool(4);
 
		try {
			sumFJ2 = fjp2.submit(callable2).get();
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}
		
		System.out.println("Sum: "+sumFJ2);
	}
 
	public static int performComputation(int number) {
		int sum = 0;
		for (int i = 1; i < 100000; i++) {
			int div = (number/i);
			sum += div;
 
		}
		return sum;
	}
}

如我们所见,前两个并行流正在使用 ForkJoinPool.commonPool和接下来的2是使用自定义线程池,例如: ForkJoinPool-1ForkJoinPool-2

使用并行流时,我们应该记住的事情

有状态的lambda表达

我们应该避免使用Stream操作中的状态Lambda表达式。
一个状态lambda表达式是一个输出取决于在执行流操作期间可能会改变的任何状态。

package org.igi.theitroad;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
 
public class ListOfIntegersStatefulLambda {
 
	public static void main(String[] args) {
		
		List<Integer> listOfIntegers = Arrays.asList(new Integer[] {40,34,21,37,20});
		List<Integer> syncList = Collections.synchronizedList(new ArrayList<>());
		listOfIntegers.parallelStream()
 
				//You shou! It uses a stateful lambda expression.
				.map(e -> {
					syncList.add(e);
					return e;
				})
		.forEachOrdered(e -> System.out.print(e + " "));
		
		System.out.println("");
 
		syncList.stream().forEachOrdered(e -> System.out.print(e + " "));
		System.out.println("");
	}
}

.map(e -> {syncList.add(e); return e;})是有状态的lambda和订单 .map(e -> {syncList.add(e); return e;})添加元素到 syncList可以不同,因此我们不应该在使用并行流时使用状态lambda操作。

干涉

在流操作lambda表达式不应修改流的源。
以下代码尝试将元素添加到整数列表并抛出并发映射异常。

package org.igi.theitroad;
 
import java.util.ArrayList;
import java.util.List;
 
public class ListOfIntegersStatefulLambda {
 
	public static void main(String[] args) {
		
		List<Integer> listOfIntegers = new ArrayList<>();
		Integer[] intArray =new Integer[] {40,34,21,37,20};
		for(Integer in:intArray)
		{
			listOfIntegers.add(in);
		}
		listOfIntegers.parallelStream()
 
		.peek( i -> listOfIntegers.add(7))		
		.forEach(e -> System.out.print(e + " "));
		
		System.out.println("");
 
	
	}
}