Java 使用ForkJoinPool实现Fork和Join

时间:2020-01-09 10:36:26  来源:igfitidea点击:

" ForkJoinPool"已添加到Java 7中的Java中。" ForkJoinPool"类似于Java ExecutorService,但有一个区别。 ForkJoinPool使任务很容易将其工作分解为较小的任务,然后再将这些任务提交给ForkJoinPool。只要可以拆分任务,任务就可以继续将其工作拆分为较小的子任务。听起来可能有点抽象,所以在此fork and join教程中,我将解释ForkJoinPool的工作方式以及拆分任务的工作方式。

Fork和Join解释

在查看ForkJoinPool之前,我想先解释一下fork和join原理是如何工作的。

分叉和联接原理包括两个步骤,这些步骤以递归方式执行。这两个步骤是fork步骤和join步骤。

Fork

使用派生和联接原理的任务可以将自己派生(拆分)为较小的子任务,这些子任务可以同时执行。如下图所示:

通过将自身拆分为多个子任务,每个子任务可以由不同的CPU或者同一CPU上的不同线程并行执行。

如果给定任务的工作足够大,则任务只能将自身拆分为子任务。将任务拆分为子任务会产生开销,因此对于少量工作,此开销可能会大于同时执行子任务所实现的加速。

将任务分叉到子任务中的合理时限也称为阈值。由每个任务决定一个合理的阈值。这在很大程度上取决于所完成的工作。

Join

当任务将自身拆分为子任务时,任务将等待直到子任务完成执行。

一旦子任务完成执行,该任务可以将所有结果合并(合并)为一个结果。如下图所示:

当然,并非所有类型的任务都可能返回结果。如果任务没有返回结果,则任务仅等待其子任务完成。这样就不会合并结果。

ForkJoinPool

ForkJoinPool是一个特殊的线程池,旨在与fork-and-join任务拆分一起很好地工作。 ForkJoinPool位于java.util.concurrent包中,因此完整的类名是java.util.concurrent.ForkJoinPool

创建一个ForkJoinPool

我们使用其构造函数创建一个" ForkJoinPool"。作为ForkJoinPool构造函数的参数,我们可以传递所需的指示级别的并行性。并行级别指示要在传递给ForkJoinPool的任务上同时处理多少个线程或者CPU。这是一个" ForkJoinPool"创建示例:

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

本示例创建一个并行级别为4的ForkJoinPool

提交任务到ForkJoinPool

我们将任务提交到ForkJoinPool类似于将任务提交到ExecutorService。我们可以提交两种类型的任务。一个不返回任何结果的任务(一个"动作"),一个不返回结果的任务(一个"任务")。这两种任务由RecursiveActionRecursiveTask类表示。以下各节将介绍如何使用这两项任务以及如何提交它们。

递归动作

RecursiveAction是不返回任何值的任务。它只是做一些工作,例如将数据写入磁盘,然后退出。

RecursiveAction可能仍需要将其工作分解为较小的块,这些块可以由独立的线程或者CPU执行。

我们可以通过将其子类化来实现" RecursiveAction"。这是一个" RecursiveAction"示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;

public class MyRecursiveAction extends RecursiveAction {

    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);

            List<MyRecursiveAction> subtasks =
                new ArrayList<MyRecursiveAction>();

            subtasks.addAll(createSubtasks());

            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }

        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
        }
    }

    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
            new ArrayList<MyRecursiveAction>();

        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

}

这个例子非常简化。 MyRecursiveAction只是将虚构的workLoad作为其构造函数的参数。如果workLoad高于某个阈值,则将工作拆分为多个子任务,这些子任务也计划执行(通过子任务的.fork()方法。如果workLoad低于某个阈值,则该工作由MyRecursiveAction本身执行。

我们可以像这样安排MyRecursiveAction来执行:

MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);

forkJoinPool.invoke(myRecursiveAction);

递归任务

RecursiveTask是一个返回结果的任务。它可以将工作分解为较小的任务,然后将这些较小的任务的结果合并为一个集合结果。拆分和合并可以在多个级别上进行。这是一个" RecursiveTask"示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
    
    
public class MyRecursiveTask extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }

    protected Long compute() {

        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);

            List<MyRecursiveTask> subtasks =
                new ArrayList<MyRecursiveTask>();
            subtasks.addAll(createSubtasks());

            for(MyRecursiveTask subtask : subtasks){
                subtask.fork();
            }

            long result = 0;
            for(MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;

        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
        new ArrayList<MyRecursiveTask>();

        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }
}

此示例与RecursiveAction示例相似,不同之处在于它返回结果。 MyRecursiveTask类扩展了RecursiveTask <Long>,这意味着任务返回的结果是Long。

MyRecursiveTask示例还将工作分解为子任务,并使用它们的fork()方法安排这些子任务的执行。

另外,此示例然后通过调用每个子任务的join()方法接收每个子任务返回的结果。子任务结果将合并为更大的结果,然后将其返回。子任务结果的这种合并/合并可能在递归的多个级别上递归发生。

我们可以像这样安排RecursiveTask

MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);

long mergedResult = forkJoinPool.invoke(myRecursiveTask);

System.out.println("mergedResult = " + mergedResult);

注意如何从ForkJoinPool.invoke()方法调用中获得最终结果。