Java并行流示例

时间:2020-01-09 10:35:15  来源:igfitidea点击:

使用Java Stream API创建流时,默认情况下始终为串行流。我们还可以在Java中创建并行流以并行执行流。在这种情况下,Java运行时会将流划分为多个子流。聚合操作迭代并并行处理这些子流,然后合并结果。

如何在Java中创建并行流

有两种创建并行流的方法

1使用Collection的parallelStream()方法,该方法返回以该collection为源的并行Stream。

List<Integer> myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);  
long count = myList.parallelStream().count();

2使用BaseStream的parallel()方法。

int value = Stream.of(1, 2, 3, 4, 5).parallel().reduce(0, (a, b) -> a+b);

关于并行流的要点

  • 使用并行流时。多个子流由单独的线程并行处理,随后将部分结果合并。

  • 默认情况下,并行流中的处理使用公共的fork-join线程池获取线程。

  • 应用于并行流的操作必须是无状态且无干扰的。

  • 并行流使我们可以使用非线程安全的集合实现并行性,前提是我们在操作集合时不修改集合。并行流中的任何操作也不应更新任何共享变量。

  • 请注意,并行化并不会自动地比串行执行操作更快,尽管如果我们有足够的数据和处理器内核,并行化可能会更快。

Java并行流示例

假设我们有一个包含名称,部门,薪水字段的Employee类,并且想要计算财务部门的平均薪水。

public class Employee {
  private String name;
  private String dept;
  private int salary;

  Employee(String name, String dept, int salary){
    this.name = name;
    this.dept = dept;
    this.salary = salary;
  }
  public String getName() {
    return name;
  }
  public void setName(String name) {
    this.name = name;
  }
  public int getSalary() {
    return salary;
  }
  public void setSalary(int salary) {
    this.salary = salary;
  }
  public String getDept() {
    return dept;
  }
  public void setDept(String dept) {
    this.dept = dept;
  }
}

要并行计算平均工资,

List<Employee> employeeList = new ArrayList<>(); 
  
employeeList.add(new Employee("Hyman", "Finance", 5500)); 
employeeList.add(new Employee("Lisa", "Finance", 5600)); 
employeeList.add(new Employee("Scott", "Finance", 7000));
employeeList.add(new Employee("Nikita", "IT", 4500));
employeeList.add(new Employee("Tony", "IT", 8000)); 
		  
double avgSalary = employeeList.parallelStream()
		                        .filter(e -> e.getDept() == "Finance")
		                        .mapToInt(e -> e.getSalary())
		                        .average()
		                        .getAsDouble();
  
System.out.println("Average salary in Finance dept- " + avgSalary);

输出:

Average salary in Finance dept- 6033.333333333333

使用Collectors.groupingByConcurrent进行并发减少

对于并行流,应该使用groupingByConcurrent方法,而不要使用groupingBy,因为groupingBy操作对并行流的执行效果很差。 (这是因为它通过按键合并两个地图来进行操作,这在计算上非常昂贵。)

groupingByConcurrent方法返回ConcurrentMap的实例而不是Map。

Collectors.groupingByConcurrent示例

这是按部门对员工进行分组的示例。此示例调用collect操作,并且分组同时完成,这将收集减少为ConcurrentMap。

List<Employee> employeeList = new ArrayList<>(); 
  
employeeList.add(new Employee("Hyman", "Finance", 5500)); 
employeeList.add(new Employee("Lisa", "Finance", 5600)); 
employeeList.add(new Employee("Scott", "Finance", 7000));
employeeList.add(new Employee("Nikita", "IT", 4500));
employeeList.add(new Employee("Tony", "IT", 8000)); 

ConcurrentMap<String, List<Employee>> Departments = employeeList.parallelStream()
																.collect(Collectors.groupingByConcurrent(e -> e.getDept()));
Departments.forEach((K, V)->{
    System.out.println("Key- " + K );
    System.out.println("Values");
    V.forEach(e->System.out.println(e.getName()));
});

输出:

Key- Finance
Values
Scott
Lisa
Hyman
Key- IT
Values
Tony
Nikita

通过合并器使用reduce方法

当我们使用并行流时,流将被划分为多个子流。这些子流被并行处理,并且将这些子流的部分结果合并以获得最终结果。在这种情况下,还使用组合器作为参数传递的reduce()方法的版本非常有用。

  • reduce(U identity,BiFunction <U ,? super T,U>累加器,BinaryOperator <U>组合器)

在此版本的reduce()方法中,合并器函数用于合并累加器函数的结果。

考虑要获得列表中所有元素的平方乘积的方案。

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5); 
int value = myList.parallelStream().reduce(1, (a, b) -> a*(b*b), (a, b) -> a*b);
System.out.println("Product of square of elements in the list- " + value);

输出:

Product of square of elements in the list- 14400

在示例中,我们可以看到合并器功能与累加器功能不同。累加器函数执行求平方的逻辑,其中合并器函数合并部分结果,这可以通过乘以部分结果来完成,这就是合并器函数为(a,b)-> a * b

如果我们在这种情况下未指定组合器函数,而是使用reduce方法的版本(传递了累加器函数且该方法可作为组合器),那么我们可能会得到错误的结果。

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5); 
int value = myList.parallelStream().reduce(1, (a, b) -> a*(b*b));
System.out.println("Product of square of elements in the list: " + value);

输出:

Product of square of elements in the list: -1055916032

如我们现在所见,由于相同的累加器函数(a,b)-> a *(b * b)用作组合器函数,结果也不对,在这种情况下不正确。

并行流中元素的顺序

在串行流的情况下,如果订购了source,那么还将订购该流。

例如,如果source是一个有序集合的List,则流也将有序。

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
myList.stream().forEach(System.out::println);

输出:

1
2
3
4
5
6
7
8
9
10

并行执行流时,Java编译器和运行时将确定处理流元素的顺序,以最大程度地发挥并行计算的优势,因此即使对于有序集合,其顺序也可能会改变。

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
myList.parallelStream().forEach(System.out::println);

输出:

7
2
1
5
4
9
10
8
3
6

如果要按其源指定的顺序处理流的元素,则无论是以串行还是并行方式执行该流,都可以使用forEachOrdered()方法。请注意,如果对并行流使用诸如forEachOrdered之类的操作,则可能会失去并行性的好处。

List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
myList.parallelStream().forEachOrdered(System.out::println);

输出:

1
2
3
4
5
6
7
8
9
10