Java CompletableFuture

时间:2020-01-09 10:35:12  来源:igfitidea点击:

在本文中,我们将学习Java中的CompletableFuture类,以及一些示例,以了解该类提供的功能。

Java中的CompletableFuture

CompletableFuture用于任务的异步计算,其中该任务由单独的线程执行,并在就绪时返回结果。

CompletableFuture与Future有何不同

我们必须想知道已经有一个Future接口完成异步计算的相同工作并返回一个值,然后Java CompletableFuture必须提供什么。

Future接口没有提供很多功能,实际上,要获得异步计算的结果,只有future.get()方法处于阻塞状态,因此没有以非阻塞方式运行多个相关任务的范围。

那就是具有丰富API的CompletableFuture的亮点。它提供了可链接多个可异步运行的相关任务的功能。因此,我们可以创建一个任务链,当当前任务的结果可用时,将触发下一个任务。

例如

CompletableFuture.supplyAsync(()->{return 4;})
.thenApplyAsync(num-> Math.pow(num, 2))
.thenAccept(num-> System.out.println("Value- " + num))
.thenRun(()->System.out.println("Done"));

在这里,第一个任务返回一个值,一旦该值可用,则下一个任务进行计算,然后执行链中的下一个任务。

Java CompletableFuture的另一个优点是,它提供了处理在任何依赖阶段中引发的异常的方法。

Java中的CompletableFuture类实现Future和CompletionStage接口。 CompletableFuture类通过实现CompletionStage接口来获得其作为依赖阶段运行任务的行为。

有关Java CompletableFuture的要点

  • CompletableFuture可以用作已明确完成的Future,也可以用作CompletionStage,其中一个阶段的完成会触发另一个从属阶段。

  • CompletableFuture提供方法的异步和非异步变体。

  • 如果使用异步方法,则可以提供一个Executor作为参数,在这种情况下,使用Executor创建的线程池中的线程将用于执行任务。当使用不带Executor参数的异步方法时,ForkJoinPool.commonPool()中的线程将用于执行任务。例如,考虑以下thenApply()方法的三个变体-thenApply(Function <?super T,?extended U> fn)–这是一个非异步方法。

  • thenApplyAsync(Function <?super T,?extended U> fn)–异步版本,因为执行程序未作为参数传递,所以使用默认的异步执行工具(ForkJoinPool.commonPool())。

  • thenApplyAsync(Function <?super T,?extended U> fn,Executor执行程序)– thenApply()方法的另一个异步变体,使用提供的Executor执行。

CompletableFuture Java示例

1个简单示例,其中使用其构造函数创建CompletableFuture实例,并使用complete()方法显式完成该实例。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<String> cf = new CompletableFuture<>();
  cf.complete("CompletableFuture completed");
  System.out.println("Value- " + cf.get());
}

输出:

Value- CompletableFuture completed

2使用runAsync()方法执行异步任务,该任务返回CompletableFuture <Void>。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<Void> cf = CompletableFuture.runAsync(()->{
    System.out.println("Running a runnable task");
  });
  System.out.println("Returned Value- " + cf.get());
}

输出:

Running a runnable task
Returned Value- null

3从前面的示例可以看到,runAsync()方法不会返回结果。如果要返回值,则可以使用supplyAsync()方法。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{
    System.out.println("Running a task");
    return "Task Completed";
  });

  System.out.println("Returned Value- " + cf.get());
}

输出:

Running a task
Returned Value- Task Completed

4到目前为止,我们只看到了一种方法的示例,现在让我们看一些执行任务链的示例。

static void cfExample() throws InterruptedException, ExecutionException {
  StringBuilder sb = new StringBuilder();
  CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{
    return "Completable";
  }).thenApply(s->sb.append(s).append("Future").toString());
  System.out.println("Returned Value- " + cf.get());
}

输出:

Returned Value- CompletableFuture

在示例中,有两个阶段

  • 在第一阶段,将执行supplyAsync()方法,该方法返回结果。当此阶段正常完成时,它将触发下一阶段。

  • 当第一阶段完成时,其结果将应用于适当命名的方法thenApply()。

  • 由于使用了非异步的thenApply()方法,因此它将由执行supplyAsync()方法的同一线程执行,因此它也可以由调用supplyAsync()方法的线程(主线程)执行。 。

5使用该方法的异步变体。

static void cfExample() throws InterruptedException, ExecutionException {
  StringBuilder sb = new StringBuilder();
  CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{
    return "Completable";
  }).thenApplyAsync(s->sb.append(s).append("Future").toString());

  System.out.println("Returned Value- " + cf.get());
}

与前面的示例相同,唯一的区别是它使用了thenApply()方法的async变体,即thenApplyAsync。现在,链接的任务将使用从ForkJoinPool.commonPool()获得的单独线程异步执行。

6我们可以为执行器提供该方法的异步变体。

static void cfExample() throws InterruptedException, ExecutionException {
  ExecutorService executor = Executors.newFixedThreadPool(2);
  StringBuilder sb = new StringBuilder();
  CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{
    return "Completable";
  }).thenApplyAsync(s->sb.append(s).append("Future").toString(), executor);

  System.out.println("Returned Value- " + cf.get());
  executor.shutdown();
}

现在,链接的任务将使用传递的执行程序异步执行,并使用从固定线程池中获得的单独线程。

7如果只想使用上一阶段的结果而又不返回任何结果,则可以使用CompletableFuture类的thenAccept()或者thenRun()方法。

在thenAccept方法中,将使用者(功能接口)作为参数传递,并返回CompletionStage <Void>。
在thenRun()方法中,Runnable作为参数传递,并返回CompletionStage <Void>。
尽管thenAccept()方法可以访问之前完成的任务的结果,但是thenRun()方法无法访问之前完成的任务的结果。

static void cfExample() throws InterruptedException, ExecutionException {
  StringBuilder sb = new StringBuilder();
  CompletableFuture.supplyAsync(()->{return "Completable";})
    .thenApplyAsync(s->sb.append(s).append("Future").toString())
    .thenAccept(s->System.out.println("Current value is- " + s));
}

输出:

Current value is- CompletableFuture

使用thenRun()

static void cfExample() throws InterruptedException, ExecutionException {
  StringBuilder sb = new StringBuilder();
  CompletableFuture.supplyAsync(()->{return "Completable";})
    .thenApplyAsync(s->sb.append(s).append("Future").toString())
    .thenRun(()->System.out.println("Process completed"));
}

使用Java CompletableFuture的thenCompose()方法

在CompletableFuture类中,存在另一个方法thenCompose(),其中可以将一个阶段执行的计算表示为一个Function,然后另一个相同的方法就是Apply()。 thenCompose()和thenApply()这两个方法的区别在于返回值的方式。

  • thenApply()方法返回一个新的CompletionStage,其类型由计算确定。

  • thenCompose()方法返回一个新的CompletionStage,其类型类似于上一阶段。

让我们尝试举例说明一下。这里我们有两个方法getValue()和getAnotherValue()都返回CompletableFuture <String>。首先,我们将使用thenApply()方法。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<CompletableFuture<String>> cf = getValue().thenApply(s->getAnotherValue(s));
  System.out.println("Value- " + cf.get().get());
}

static CompletableFuture<String> getValue(){
  return CompletableFuture.supplyAsync(()->{return "Completable";});
}

static CompletableFuture<String> getAnotherValue(String str){
  return CompletableFuture.supplyAsync(()->{return str+"Future";});
}

如果在此处看到链,则有一个getValue()方法返回CompletableFuture <String>,然后在thenApply()方法中使用该方法,该方法再次返回CompletableFuture <String>类型的结果,使其成为CompletableFuture <CompletableFuture的嵌套结构<String >>。

当我们使用thenCompose()方法时,返回的结果的类型类似于上一阶段。这有助于展平嵌套结构。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<String> cf = getValue().thenCompose(s->getAnotherValue(s));
  System.out.println("Value- " + cf.get());
}

static CompletableFuture<String> getValue(){
  return CompletableFuture.supplyAsync(()->{return "Completable";});
}

static CompletableFuture<String> getAnotherValue(String str){
  return CompletableFuture.supplyAsync(()->{return str+"Future";});
}

Java CompletableFuture –具有多个完成阶段的操作

1结合两个完成阶段的结果–我们可以使用thenCombine()方法结合两个独立的完成阶段,该方法与两个完成阶段的结果一起执行,作为提供函数的参数。
这里我们有两个方法getValue()和getAnotherValue()都返回CompletableFuture <String>。这两个完成阶段都完成后,将同时调用这两个结果的Combine()方法。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<String> cf = getValue().thenCombine(getAnotherValue(), (s1, s2)->s1+ " " +s2);
  System.out.println("Value- " + cf.get());
}

static CompletableFuture<String> getValue(){
  return CompletableFuture.supplyAsync(()->{return "Hello";});
}

static CompletableFuture<String> getAnotherValue(){
  return CompletableFuture.supplyAsync(()->{return "World";});
}

输出:

Value- Hello World

2包含两个完成阶段的结果–就像Java CompletableFuture中的thenAccept()方法使用一个完成阶段的结果一样,还有一个thenAcceptBoth()方法使用两个完成阶段的结果。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<Void> cf = getValue().thenAcceptBoth(getAnotherValue(), 
       (s1, s2)->System.out.println("Process completed with results- " +s1+ " " +s2));
  //System.out.println("Value- " + cf.get());
}

static CompletableFuture<String> getValue(){
  return CompletableFuture.supplyAsync(()->{return "Hello";});
}

static CompletableFuture<String> getAnotherValue(){
  return CompletableFuture.supplyAsync(()->{return "World";});
}

输出:

Process completed with results- Hello World

3应用这两者中的任何一个-如果有两个CompletableFutures,并且只有一个阶段正常完成,并且我们想将该函数应用于正常完成的那个完成阶段的结果,则可以使用applyToEither()方法。

在示例中,有两种方法getValue()和getAnotherValue()。在getValue()方法中,引发了异常,该方法异常完成。另一方面,getAnotherValue()方法正常完成。

static void cfExample() throws InterruptedException, ExecutionException {
  CompletableFuture<String> cf = getValue().applyToEitherAsync(getAnotherValue(), (s)->s.toUpperCase());
  System.out.println("Value- " + cf.get());
}

static CompletableFuture<String> getValue(){
  String str = null;
  return CompletableFuture.supplyAsync(() -> {
    if (str == null) {
      throw new IllegalArgumentException("Invalid String passed  " + str);
    }
    return str;
  }).exceptionally(exp -> {
    System.out.println("Exception message- " + exp.getMessage());
    return "";
  });
}

static CompletableFuture<String> getAnotherValue(){
  return CompletableFuture.supplyAsync(()->{return "World";});
}

输出:

Exception message-  java.lang.IllegalArgumentException: Invalid String passed null
Value- WORLD

如我们所见,applyToEitherAsync()方法使用正常完成的完成阶段的结果。

Java CompletableFuture中的异常处理

对于Java CompletableFuture中的异常处理,有三种方法

  • 处理

  • whenComplete

  • 异常地

句柄和whenComplete方法始终执行,无论是在触发阶段引发异常还是阶段正常完成。

仅当触发阶段异常完成时,才会执行异常方法。

Java CompletableFuture –异常处理使用异常

在示例中,将String作为null传递,从而导致异常,从而导致异常调用。

static void cfExample() throws InterruptedException, ExecutionException {
  String str = null;
  CompletableFuture.supplyAsync(() -> {
    if (str == null) {
      throw new IllegalArgumentException("Invalid String passed " + str);
    }
    return str;
  }).exceptionally(exp -> {
      System.out.println("Exception message- " + exp.getMessage());
      return "";
  });
}

输出:

Exception message- java.lang.IllegalArgumentException: Invalid String passed null

如果在触发阶段没有例外,则不会异常呼叫。

static void cfExample() throws InterruptedException, ExecutionException {
  String str = "Hello";
  CompletableFuture<String>cf = CompletableFuture.supplyAsync(() -> {
    if (str == null) {
      throw new IllegalArgumentException("Invalid String passed " + str);
    }
    return str;
  }).exceptionally(exp -> {
    System.out.println("Exception message- " + exp.getMessage());
    return "";
  });
  System.out.println("Value- " + cf.get());
}

输出:

Value- Hello

Java CompletableFuture –使用句柄的异常处理

使用此阶段的结果和异常作为提供函数的参数来执行handle方法。如果没有抛出异常,则异常参数将为null。请注意,无论是否引发异常,都会始终执行handle方法,通过检查exception参数为null可以确定是否执行异常处理代码。

static void cfExample() throws InterruptedException, ExecutionException {
  String str = null;
  CompletableFuture<String>cf = CompletableFuture.supplyAsync(() -> {
    if (str == null) {
      throw new IllegalArgumentException("Invalid String passed " + str);
    }
    return str;
  }).handle((s, e) -> {
    if(e != null) {
      System.out.println("Exception message- " + e.getMessage());
      s = "";
    }
    return s;
  });
  System.out.println("Value- " + cf.get());
}

输出:

Exception message- java.lang.IllegalArgumentException: Invalid String passed null
Value

Java CompletableFuture –使用whenComplete的异常处理

返回一个新的CompletionStage,其结果或者异常与此阶段相同,因此在whenComplete方法中不能更改结果。请注意,无论是否引发异常,始终执行Complete方法时,通过检查异常参数是否为null,可以确定是否要执行异常处理代码。

static void cfExample() throws InterruptedException, ExecutionException {
  String str = "Hello";
  CompletableFuture<String>cf = CompletableFuture.supplyAsync(() -> {
    if (str == null) {
      throw new IllegalArgumentException("Invalid String passed " + str);
    }
    return str;
  }).whenComplete((s, e) -> {
    System.out.println("In when complete method");
    if(e != null) {
      System.out.println("Exception message- " + e.getMessage());
    }
  });
  System.out.println("Value- " + cf.get());
}

输出:

In when complete method
Value- Hello

在示例中可以看到,当调用Complete方法时,仍不会在阶段中引发异常,但是在这种情况下,异常参数将为null。