Java 使用ForkJoinPool实现Fork和Join
" ForkJoinPool"已添加到Java 7中的Java中。" ForkJoinPool"类似于Java ExecutorService,但有一个区别。 ForkJoinPool
使任务很容易将其工作分解为较小的任务,然后再将这些任务提交给ForkJoinPool
。只要可以拆分任务,任务就可以继续将其工作拆分为较小的子任务。听起来可能有点抽象,所以在此fork and join教程中,我将解释ForkJoinPool
的工作方式以及拆分任务的工作方式。
Fork和Join解释
在查看ForkJoinPool
之前,我想先解释一下fork和join原理是如何工作的。
分叉和联接原理包括两个步骤,这些步骤以递归方式执行。这两个步骤是fork步骤和join步骤。
Fork
使用派生和联接原理的任务可以将自己派生(拆分)为较小的子任务,这些子任务可以同时执行。如下图所示:
通过将自身拆分为多个子任务,每个子任务可以由不同的CPU或者同一CPU上的不同线程并行执行。
如果给定任务的工作足够大,则任务只能将自身拆分为子任务。将任务拆分为子任务会产生开销,因此对于少量工作,此开销可能会大于同时执行子任务所实现的加速。
将任务分叉到子任务中的合理时限也称为阈值。由每个任务决定一个合理的阈值。这在很大程度上取决于所完成的工作。
Join
当任务将自身拆分为子任务时,任务将等待直到子任务完成执行。
一旦子任务完成执行,该任务可以将所有结果合并(合并)为一个结果。如下图所示:
当然,并非所有类型的任务都可能返回结果。如果任务没有返回结果,则任务仅等待其子任务完成。这样就不会合并结果。
ForkJoinPool
ForkJoinPool
是一个特殊的线程池,旨在与fork-and-join任务拆分一起很好地工作。 ForkJoinPool
位于java.util.concurrent
包中,因此完整的类名是java.util.concurrent.ForkJoinPool
。
创建一个ForkJoinPool
我们使用其构造函数创建一个" ForkJoinPool"。作为ForkJoinPool
构造函数的参数,我们可以传递所需的指示级别的并行性。并行级别指示要在传递给ForkJoinPool
的任务上同时处理多少个线程或者CPU。这是一个" ForkJoinPool"创建示例:
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
本示例创建一个并行级别为4的ForkJoinPool
。
提交任务到ForkJoinPool
我们将任务提交到ForkJoinPool类似于将任务提交到ExecutorService。我们可以提交两种类型的任务。一个不返回任何结果的任务(一个"动作"),一个不返回结果的任务(一个"任务")。这两种任务由RecursiveAction
和RecursiveTask
类表示。以下各节将介绍如何使用这两项任务以及如何提交它们。
递归动作
RecursiveAction
是不返回任何值的任务。它只是做一些工作,例如将数据写入磁盘,然后退出。
RecursiveAction可能仍需要将其工作分解为较小的块,这些块可以由独立的线程或者CPU执行。
我们可以通过将其子类化来实现" RecursiveAction"。这是一个" RecursiveAction"示例:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveAction; public class MyRecursiveAction extends RecursiveAction { private long workLoad = 0; public MyRecursiveAction(long workLoad) { this.workLoad = workLoad; } @Override protected void compute() { //if work is above threshold, break tasks up into smaller tasks if(this.workLoad > 16) { System.out.println("Splitting workLoad : " + this.workLoad); List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>(); subtasks.addAll(createSubtasks()); for(RecursiveAction subtask : subtasks){ subtask.fork(); } } else { System.out.println("Doing workLoad myself: " + this.workLoad); } } private List<MyRecursiveAction> createSubtasks() { List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>(); MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2); MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2); subtasks.add(subtask1); subtasks.add(subtask2); return subtasks; } }
这个例子非常简化。 MyRecursiveAction只是将虚构的workLoad作为其构造函数的参数。如果workLoad
高于某个阈值,则将工作拆分为多个子任务,这些子任务也计划执行(通过子任务的.fork()方法。如果workLoad
低于某个阈值,则该工作由MyRecursiveAction本身执行。
我们可以像这样安排MyRecursiveAction
来执行:
MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24); forkJoinPool.invoke(myRecursiveAction);
递归任务
RecursiveTask是一个返回结果的任务。它可以将工作分解为较小的任务,然后将这些较小的任务的结果合并为一个集合结果。拆分和合并可以在多个级别上进行。这是一个" RecursiveTask"示例:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveTask; public class MyRecursiveTask extends RecursiveTask<Long> { private long workLoad = 0; public MyRecursiveTask(long workLoad) { this.workLoad = workLoad; } protected Long compute() { //if work is above threshold, break tasks up into smaller tasks if(this.workLoad > 16) { System.out.println("Splitting workLoad : " + this.workLoad); List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>(); subtasks.addAll(createSubtasks()); for(MyRecursiveTask subtask : subtasks){ subtask.fork(); } long result = 0; for(MyRecursiveTask subtask : subtasks) { result += subtask.join(); } return result; } else { System.out.println("Doing workLoad myself: " + this.workLoad); return workLoad * 3; } } private List<MyRecursiveTask> createSubtasks() { List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>(); MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2); MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2); subtasks.add(subtask1); subtasks.add(subtask2); return subtasks; } }
此示例与RecursiveAction
示例相似,不同之处在于它返回结果。 MyRecursiveTask类扩展了RecursiveTask <Long>,这意味着任务返回的结果是Long。
MyRecursiveTask示例还将工作分解为子任务,并使用它们的fork()方法安排这些子任务的执行。
另外,此示例然后通过调用每个子任务的join()方法接收每个子任务返回的结果。子任务结果将合并为更大的结果,然后将其返回。子任务结果的这种合并/合并可能在递归的多个级别上递归发生。
我们可以像这样安排RecursiveTask
:
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128); long mergedResult = forkJoinPool.invoke(myRecursiveTask); System.out.println("mergedResult = " + mergedResult);
注意如何从ForkJoinPool.invoke()方法调用中获得最终结果。