Java并行流示例
使用Java Stream API创建流时,默认情况下始终为串行流。我们还可以在Java中创建并行流以并行执行流。在这种情况下,Java运行时会将流划分为多个子流。聚合操作迭代并并行处理这些子流,然后合并结果。
如何在Java中创建并行流
有两种创建并行流的方法
1使用Collection的parallelStream()方法,该方法返回以该collection为源的并行Stream。
List<Integer> myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); long count = myList.parallelStream().count();
2使用BaseStream的parallel()方法。
int value = Stream.of(1, 2, 3, 4, 5).parallel().reduce(0, (a, b) -> a+b);
关于并行流的要点
使用并行流时。多个子流由单独的线程并行处理,随后将部分结果合并。
默认情况下,并行流中的处理使用公共的fork-join线程池获取线程。
应用于并行流的操作必须是无状态且无干扰的。
并行流使我们可以使用非线程安全的集合实现并行性,前提是我们在操作集合时不修改集合。并行流中的任何操作也不应更新任何共享变量。
请注意,并行化并不会自动地比串行执行操作更快,尽管如果我们有足够的数据和处理器内核,并行化可能会更快。
Java并行流示例
假设我们有一个包含名称,部门,薪水字段的Employee类,并且想要计算财务部门的平均薪水。
public class Employee { private String name; private String dept; private int salary; Employee(String name, String dept, int salary){ this.name = name; this.dept = dept; this.salary = salary; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getSalary() { return salary; } public void setSalary(int salary) { this.salary = salary; } public String getDept() { return dept; } public void setDept(String dept) { this.dept = dept; } }
要并行计算平均工资,
List<Employee> employeeList = new ArrayList<>(); employeeList.add(new Employee("Hyman", "Finance", 5500)); employeeList.add(new Employee("Lisa", "Finance", 5600)); employeeList.add(new Employee("Scott", "Finance", 7000)); employeeList.add(new Employee("Nikita", "IT", 4500)); employeeList.add(new Employee("Tony", "IT", 8000)); double avgSalary = employeeList.parallelStream() .filter(e -> e.getDept() == "Finance") .mapToInt(e -> e.getSalary()) .average() .getAsDouble(); System.out.println("Average salary in Finance dept- " + avgSalary);
输出:
Average salary in Finance dept- 6033.333333333333
使用Collectors.groupingByConcurrent进行并发减少
对于并行流,应该使用groupingByConcurrent方法,而不要使用groupingBy,因为groupingBy操作对并行流的执行效果很差。 (这是因为它通过按键合并两个地图来进行操作,这在计算上非常昂贵。)
groupingByConcurrent方法返回ConcurrentMap的实例而不是Map。
Collectors.groupingByConcurrent示例
这是按部门对员工进行分组的示例。此示例调用collect操作,并且分组同时完成,这将收集减少为ConcurrentMap。
List<Employee> employeeList = new ArrayList<>(); employeeList.add(new Employee("Hyman", "Finance", 5500)); employeeList.add(new Employee("Lisa", "Finance", 5600)); employeeList.add(new Employee("Scott", "Finance", 7000)); employeeList.add(new Employee("Nikita", "IT", 4500)); employeeList.add(new Employee("Tony", "IT", 8000)); ConcurrentMap<String, List<Employee>> Departments = employeeList.parallelStream() .collect(Collectors.groupingByConcurrent(e -> e.getDept())); Departments.forEach((K, V)->{ System.out.println("Key- " + K ); System.out.println("Values"); V.forEach(e->System.out.println(e.getName())); });
输出:
Key- Finance Values Scott Lisa Hyman Key- IT Values Tony Nikita
通过合并器使用reduce方法
当我们使用并行流时,流将被划分为多个子流。这些子流被并行处理,并且将这些子流的部分结果合并以获得最终结果。在这种情况下,还使用组合器作为参数传递的reduce()方法的版本非常有用。
- reduce(U identity,BiFunction <U ,? super T,U>累加器,BinaryOperator <U>组合器)
在此版本的reduce()方法中,合并器函数用于合并累加器函数的结果。
考虑要获得列表中所有元素的平方乘积的方案。
List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5); int value = myList.parallelStream().reduce(1, (a, b) -> a*(b*b), (a, b) -> a*b); System.out.println("Product of square of elements in the list- " + value);
输出:
Product of square of elements in the list- 14400
在示例中,我们可以看到合并器功能与累加器功能不同。累加器函数执行求平方的逻辑,其中合并器函数合并部分结果,这可以通过乘以部分结果来完成,这就是合并器函数为(a,b)-> a * b
如果我们在这种情况下未指定组合器函数,而是使用reduce方法的版本(传递了累加器函数且该方法可作为组合器),那么我们可能会得到错误的结果。
List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5); int value = myList.parallelStream().reduce(1, (a, b) -> a*(b*b)); System.out.println("Product of square of elements in the list: " + value);
输出:
Product of square of elements in the list: -1055916032
如我们现在所见,由于相同的累加器函数(a,b)-> a *(b * b)用作组合器函数,结果也不对,在这种情况下不正确。
并行流中元素的顺序
在串行流的情况下,如果订购了source,那么还将订购该流。
例如,如果source是一个有序集合的List,则流也将有序。
List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); myList.stream().forEach(System.out::println);
输出:
1 2 3 4 5 6 7 8 9 10
并行执行流时,Java编译器和运行时将确定处理流元素的顺序,以最大程度地发挥并行计算的优势,因此即使对于有序集合,其顺序也可能会改变。
List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); myList.parallelStream().forEach(System.out::println);
输出:
7 2 1 5 4 9 10 8 3 6
如果要按其源指定的顺序处理流的元素,则无论是以串行还是并行方式执行该流,都可以使用forEachOrdered()方法。请注意,如果对并行流使用诸如forEachOrdered之类的操作,则可能会失去并行性的好处。
List<Integer>myList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); myList.parallelStream().forEachOrdered(System.out::println);
输出:
1 2 3 4 5 6 7 8 9 10