Java CompletableFuture
在本文中,我们将学习Java中的CompletableFuture类,以及一些示例,以了解该类提供的功能。
Java中的CompletableFuture
CompletableFuture用于任务的异步计算,其中该任务由单独的线程执行,并在就绪时返回结果。
CompletableFuture与Future有何不同
我们必须想知道已经有一个Future接口完成异步计算的相同工作并返回一个值,然后Java CompletableFuture必须提供什么。
Future接口没有提供很多功能,实际上,要获得异步计算的结果,只有future.get()方法处于阻塞状态,因此没有以非阻塞方式运行多个相关任务的范围。
那就是具有丰富API的CompletableFuture的亮点。它提供了可链接多个可异步运行的相关任务的功能。因此,我们可以创建一个任务链,当当前任务的结果可用时,将触发下一个任务。
例如
CompletableFuture.supplyAsync(()->{return 4;}) .thenApplyAsync(num-> Math.pow(num, 2)) .thenAccept(num-> System.out.println("Value- " + num)) .thenRun(()->System.out.println("Done"));
在这里,第一个任务返回一个值,一旦该值可用,则下一个任务进行计算,然后执行链中的下一个任务。
Java CompletableFuture的另一个优点是,它提供了处理在任何依赖阶段中引发的异常的方法。
Java中的CompletableFuture类实现Future和CompletionStage接口。 CompletableFuture类通过实现CompletionStage接口来获得其作为依赖阶段运行任务的行为。
有关Java CompletableFuture的要点
CompletableFuture可以用作已明确完成的Future,也可以用作CompletionStage,其中一个阶段的完成会触发另一个从属阶段。
CompletableFuture提供方法的异步和非异步变体。
如果使用异步方法,则可以提供一个Executor作为参数,在这种情况下,使用Executor创建的线程池中的线程将用于执行任务。当使用不带Executor参数的异步方法时,ForkJoinPool.commonPool()中的线程将用于执行任务。例如,考虑以下thenApply()方法的三个变体-thenApply(Function <?super T,?extended U> fn)–这是一个非异步方法。
thenApplyAsync(Function <?super T,?extended U> fn)–异步版本,因为执行程序未作为参数传递,所以使用默认的异步执行工具(ForkJoinPool.commonPool())。
thenApplyAsync(Function <?super T,?extended U> fn,Executor执行程序)– thenApply()方法的另一个异步变体,使用提供的Executor执行。
CompletableFuture Java示例
1个简单示例,其中使用其构造函数创建CompletableFuture实例,并使用complete()方法显式完成该实例。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<String> cf = new CompletableFuture<>(); cf.complete("CompletableFuture completed"); System.out.println("Value- " + cf.get()); }
输出:
Value- CompletableFuture completed
2使用runAsync()方法执行异步任务,该任务返回CompletableFuture <Void>。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<Void> cf = CompletableFuture.runAsync(()->{ System.out.println("Running a runnable task"); }); System.out.println("Returned Value- " + cf.get()); }
输出:
Running a runnable task Returned Value- null
3从前面的示例可以看到,runAsync()方法不会返回结果。如果要返回值,则可以使用supplyAsync()方法。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{ System.out.println("Running a task"); return "Task Completed"; }); System.out.println("Returned Value- " + cf.get()); }
输出:
Running a task Returned Value- Task Completed
4到目前为止,我们只看到了一种方法的示例,现在让我们看一些执行任务链的示例。
static void cfExample() throws InterruptedException, ExecutionException { StringBuilder sb = new StringBuilder(); CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{ return "Completable"; }).thenApply(s->sb.append(s).append("Future").toString()); System.out.println("Returned Value- " + cf.get()); }
输出:
Returned Value- CompletableFuture
在示例中,有两个阶段
在第一阶段,将执行supplyAsync()方法,该方法返回结果。当此阶段正常完成时,它将触发下一阶段。
当第一阶段完成时,其结果将应用于适当命名的方法thenApply()。
由于使用了非异步的thenApply()方法,因此它将由执行supplyAsync()方法的同一线程执行,因此它也可以由调用supplyAsync()方法的线程(主线程)执行。 。
5使用该方法的异步变体。
static void cfExample() throws InterruptedException, ExecutionException { StringBuilder sb = new StringBuilder(); CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{ return "Completable"; }).thenApplyAsync(s->sb.append(s).append("Future").toString()); System.out.println("Returned Value- " + cf.get()); }
与前面的示例相同,唯一的区别是它使用了thenApply()方法的async变体,即thenApplyAsync。现在,链接的任务将使用从ForkJoinPool.commonPool()获得的单独线程异步执行。
6我们可以为执行器提供该方法的异步变体。
static void cfExample() throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(2); StringBuilder sb = new StringBuilder(); CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{ return "Completable"; }).thenApplyAsync(s->sb.append(s).append("Future").toString(), executor); System.out.println("Returned Value- " + cf.get()); executor.shutdown(); }
现在,链接的任务将使用传递的执行程序异步执行,并使用从固定线程池中获得的单独线程。
7如果只想使用上一阶段的结果而又不返回任何结果,则可以使用CompletableFuture类的thenAccept()或者thenRun()方法。
在thenAccept方法中,将使用者(功能接口)作为参数传递,并返回CompletionStage <Void>。
在thenRun()方法中,Runnable作为参数传递,并返回CompletionStage <Void>。
尽管thenAccept()方法可以访问之前完成的任务的结果,但是thenRun()方法无法访问之前完成的任务的结果。
static void cfExample() throws InterruptedException, ExecutionException { StringBuilder sb = new StringBuilder(); CompletableFuture.supplyAsync(()->{return "Completable";}) .thenApplyAsync(s->sb.append(s).append("Future").toString()) .thenAccept(s->System.out.println("Current value is- " + s)); }
输出:
Current value is- CompletableFuture
使用thenRun()
static void cfExample() throws InterruptedException, ExecutionException { StringBuilder sb = new StringBuilder(); CompletableFuture.supplyAsync(()->{return "Completable";}) .thenApplyAsync(s->sb.append(s).append("Future").toString()) .thenRun(()->System.out.println("Process completed")); }
使用Java CompletableFuture的thenCompose()方法
在CompletableFuture类中,存在另一个方法thenCompose(),其中可以将一个阶段执行的计算表示为一个Function,然后另一个相同的方法就是Apply()。 thenCompose()和thenApply()这两个方法的区别在于返回值的方式。
thenApply()方法返回一个新的CompletionStage,其类型由计算确定。
thenCompose()方法返回一个新的CompletionStage,其类型类似于上一阶段。
让我们尝试举例说明一下。这里我们有两个方法getValue()和getAnotherValue()都返回CompletableFuture <String>。首先,我们将使用thenApply()方法。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<CompletableFuture<String>> cf = getValue().thenApply(s->getAnotherValue(s)); System.out.println("Value- " + cf.get().get()); } static CompletableFuture<String> getValue(){ return CompletableFuture.supplyAsync(()->{return "Completable";}); } static CompletableFuture<String> getAnotherValue(String str){ return CompletableFuture.supplyAsync(()->{return str+"Future";}); }
如果在此处看到链,则有一个getValue()方法返回CompletableFuture <String>,然后在thenApply()方法中使用该方法,该方法再次返回CompletableFuture <String>类型的结果,使其成为CompletableFuture <CompletableFuture的嵌套结构<String >>。
当我们使用thenCompose()方法时,返回的结果的类型类似于上一阶段。这有助于展平嵌套结构。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<String> cf = getValue().thenCompose(s->getAnotherValue(s)); System.out.println("Value- " + cf.get()); } static CompletableFuture<String> getValue(){ return CompletableFuture.supplyAsync(()->{return "Completable";}); } static CompletableFuture<String> getAnotherValue(String str){ return CompletableFuture.supplyAsync(()->{return str+"Future";}); }
Java CompletableFuture –具有多个完成阶段的操作
1结合两个完成阶段的结果–我们可以使用thenCombine()方法结合两个独立的完成阶段,该方法与两个完成阶段的结果一起执行,作为提供函数的参数。
这里我们有两个方法getValue()和getAnotherValue()都返回CompletableFuture <String>。这两个完成阶段都完成后,将同时调用这两个结果的Combine()方法。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<String> cf = getValue().thenCombine(getAnotherValue(), (s1, s2)->s1+ " " +s2); System.out.println("Value- " + cf.get()); } static CompletableFuture<String> getValue(){ return CompletableFuture.supplyAsync(()->{return "Hello";}); } static CompletableFuture<String> getAnotherValue(){ return CompletableFuture.supplyAsync(()->{return "World";}); }
输出:
Value- Hello World
2包含两个完成阶段的结果–就像Java CompletableFuture中的thenAccept()方法使用一个完成阶段的结果一样,还有一个thenAcceptBoth()方法使用两个完成阶段的结果。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<Void> cf = getValue().thenAcceptBoth(getAnotherValue(), (s1, s2)->System.out.println("Process completed with results- " +s1+ " " +s2)); //System.out.println("Value- " + cf.get()); } static CompletableFuture<String> getValue(){ return CompletableFuture.supplyAsync(()->{return "Hello";}); } static CompletableFuture<String> getAnotherValue(){ return CompletableFuture.supplyAsync(()->{return "World";}); }
输出:
Process completed with results- Hello World
3应用这两者中的任何一个-如果有两个CompletableFutures,并且只有一个阶段正常完成,并且我们想将该函数应用于正常完成的那个完成阶段的结果,则可以使用applyToEither()方法。
在示例中,有两种方法getValue()和getAnotherValue()。在getValue()方法中,引发了异常,该方法异常完成。另一方面,getAnotherValue()方法正常完成。
static void cfExample() throws InterruptedException, ExecutionException { CompletableFuture<String> cf = getValue().applyToEitherAsync(getAnotherValue(), (s)->s.toUpperCase()); System.out.println("Value- " + cf.get()); } static CompletableFuture<String> getValue(){ String str = null; return CompletableFuture.supplyAsync(() -> { if (str == null) { throw new IllegalArgumentException("Invalid String passed " + str); } return str; }).exceptionally(exp -> { System.out.println("Exception message- " + exp.getMessage()); return ""; }); } static CompletableFuture<String> getAnotherValue(){ return CompletableFuture.supplyAsync(()->{return "World";}); }
输出:
Exception message- java.lang.IllegalArgumentException: Invalid String passed null Value- WORLD
如我们所见,applyToEitherAsync()方法使用正常完成的完成阶段的结果。
Java CompletableFuture中的异常处理
对于Java CompletableFuture中的异常处理,有三种方法
处理
whenComplete
异常地
句柄和whenComplete方法始终执行,无论是在触发阶段引发异常还是阶段正常完成。
仅当触发阶段异常完成时,才会执行异常方法。
Java CompletableFuture –异常处理使用异常
在示例中,将String作为null传递,从而导致异常,从而导致异常调用。
static void cfExample() throws InterruptedException, ExecutionException { String str = null; CompletableFuture.supplyAsync(() -> { if (str == null) { throw new IllegalArgumentException("Invalid String passed " + str); } return str; }).exceptionally(exp -> { System.out.println("Exception message- " + exp.getMessage()); return ""; }); }
输出:
Exception message- java.lang.IllegalArgumentException: Invalid String passed null
如果在触发阶段没有例外,则不会异常呼叫。
static void cfExample() throws InterruptedException, ExecutionException { String str = "Hello"; CompletableFuture<String>cf = CompletableFuture.supplyAsync(() -> { if (str == null) { throw new IllegalArgumentException("Invalid String passed " + str); } return str; }).exceptionally(exp -> { System.out.println("Exception message- " + exp.getMessage()); return ""; }); System.out.println("Value- " + cf.get()); }
输出:
Value- Hello
Java CompletableFuture –使用句柄的异常处理
使用此阶段的结果和异常作为提供函数的参数来执行handle方法。如果没有抛出异常,则异常参数将为null。请注意,无论是否引发异常,都会始终执行handle方法,通过检查exception参数为null可以确定是否执行异常处理代码。
static void cfExample() throws InterruptedException, ExecutionException { String str = null; CompletableFuture<String>cf = CompletableFuture.supplyAsync(() -> { if (str == null) { throw new IllegalArgumentException("Invalid String passed " + str); } return str; }).handle((s, e) -> { if(e != null) { System.out.println("Exception message- " + e.getMessage()); s = ""; } return s; }); System.out.println("Value- " + cf.get()); }
输出:
Exception message- java.lang.IllegalArgumentException: Invalid String passed null Value
Java CompletableFuture –使用whenComplete的异常处理
返回一个新的CompletionStage,其结果或者异常与此阶段相同,因此在whenComplete方法中不能更改结果。请注意,无论是否引发异常,始终执行Complete方法时,通过检查异常参数是否为null,可以确定是否要执行异常处理代码。
static void cfExample() throws InterruptedException, ExecutionException { String str = "Hello"; CompletableFuture<String>cf = CompletableFuture.supplyAsync(() -> { if (str == null) { throw new IllegalArgumentException("Invalid String passed " + str); } return str; }).whenComplete((s, e) -> { System.out.println("In when complete method"); if(e != null) { System.out.println("Exception message- " + e.getMessage()); } }); System.out.println("Value- " + cf.get()); }
输出:
In when complete method Value- Hello
在示例中可以看到,当调用Complete方法时,仍不会在阶段中引发异常,但是在这种情况下,异常参数将为null。