Java执行器–Executor,ExecutorService,ScheduledExecutorService
在本Java执行器教程中,我们将学习如何使用Executor,ExecutorService,ScheduledExecutorService及其线程池实现来有效管理大型应用程序中的线程。
Java执行器API
在使用Java多线程创建线程时,由新线程(由其Runnable对象定义)执行的任务与线程本身之间存在紧密的联系。这种管理线程的方法可能不适用于大型应用程序。在大规模应用程序中,最好将线程创建和线程管理与业务逻辑分开。 Java Executor框架通过将线程创建和管理功能封装在称为执行程序的对象中来帮助实现这一目标。 Java Executor框架分为以下三个部分:
执行程序接口–定义三个执行程序对象类型的三个接口Executor,ExecutorService和ScheduledExecutorService。
线程池–这些是执行器实现类,例如ThreadPoolExecutor和ScheduledThreadPoolExecutor,它们使用线程池中的一个线程执行每个提交的任务。
Fork / Join框架–它是ExecutorService接口的实现,可利用多个处理器。
Java执行器界面
Executor类型的对象执行提交的可运行任务。通过使用Executor,我们无需显式创建线程。
例如,如果有一个Runnable对象,则可以替换
(new Thread(runnable)).start();
与
executor.execute(runnable);
executor是Executor对象。
Java Executor接口具有单个方法execute,其定义如下
void execute(Runnable命令)–在将来的某个时间执行给定的runnable。传递的可运行对象可以根据执行程序的判断,在新线程,池线程或者调用线程中执行。
Java ExecutorService接口
ExecutorService接口扩展了Executor,并添加了用于关闭执行程序的功能以及执行任务后返回Future的功能。
除了基本方法execute(从Executor接口继承)之外,ExecutorService还具有更通用的Submit方法,该方法被重载以接受Runnable对象以及Callable对象,这些对象允许任务返回值。
在ExecutorService中提交方法
<T> Future <T> Submit(Callable <T> task)–提交一个返回值的任务以执行,并返回一个Future,代表该任务的未决结果。
Future <?> Submit(Runnable task)–提交要执行的Runnable任务,并返回代表该任务的Future。成功完成后,Future的get方法将返回null。
<T> Future <T> Submit(可运行任务,T结果)–提交要执行的可运行任务,并返回表示该任务的Future。 Future的get方法将在成功完成后返回给定的结果。
ExecutorService中的关机方法
我们可以关闭ExecutorService,这将导致它拒绝新任务。
void shutdown()–启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。
List <Runnable> shutdownNow()–尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表。
Java ScheduledExecutorService接口
ScheduledExecutorService接口扩展了ExecutorService接口,并添加了一些功能以计划在给定延迟后运行或者定期执行的命令。
ScheduledExecutorService接口中的计划方法
schedule(Callable <V>可调用,长延迟,TimeUnit单位)–创建并执行ScheduledFuture,该延迟在给定延迟后变为启用状态。
schedule(Runnable命令,长延迟,TimeUnit单位)–创建并执行一次操作,该操作在给定延迟后变为启用状态。
scheduleAtFixedRate(Runnable命令,long initialDelay,长周期,TimeUnit单位)–创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,然后在给定的周期后启用。
scheduleWithFixedDelay(Runnable命令,长initialDelay,长延迟,TimeUnit单位)–创建并执行一个周期性操作,该操作将在给定的初始延迟后首先启用,然后以给定的延迟在一次执行终止与下一次执行之间延迟。
Java Executor实现类
现在我们知道了执行程序接口和这些接口中定义的方法。 Java Executor框架还具有实现这些接口的预定义执行程序类。
ThreadPoolExecutor –此类实现Executor和ExecutorService接口。 ThreadPoolExecutor使用可能的多个池线程之一执行每个提交的任务。
ScheduledThreadPoolExecutor –此类扩展ThreadPoolExecutor并实现ScheduledExecutorService。 ScheduledThreadPoolExecutor类计划在给定的延迟后运行或者定期执行的命令。
ForkJoinPool –此类是Executor和ExecutorService接口的实现。 Fork / Join框架中使用ForkJoinPool类来运行ForkJoinTasks。
要阅读有关Java中ThreadPoolExecutor类的更多信息,请参考这篇文章Java ThreadPoolExecutor –带有ExecutorService的线程池
要阅读有关Java中ScheduledThreadPoolExecutor类的更多信息,请参阅这篇文章。Java ScheduledThreadPoolExecutor –使用ExecutorService进行计划
java.util.concurrent中的大多数执行程序实现都使用线程池,该线程池由工作线程组成。通过使用线程池获得的优势是
池线程与它执行的Runnable和Callable任务分开存在,通常用于执行多个任务。
线程对象占用大量内存。在大型应用程序中,如果每个任务使用其自己的线程,则分配和取消分配许多线程对象会产生大量的内存管理开销。使用池化线程可最大程度地减少线程创建带来的开销。
使用Executors类创建执行程序
在进入Executor和ExecutorService的示例之前,我们必须了解另一类。 Java并发API中的Executors类。
可以直接使用Executors类提供的静态工厂方法来获取执行程序,而不必直接创建和使用ThreadPoolExecutor和ScheduledThreadPoolExecutor的实例。这些工厂方法可以创建并返回带有常用配置设置的ExecutorService,ScheduledExecutorService。
以下是最常用的工厂方法列表
静态ExecutorService newCachedThreadPool()–创建一个线程池,该线程池可根据需要创建新线程,但在可用时将重用以前构造的线程。
静态ExecutorService newFixedThreadPool(int nThreads)–创建一个线程池,该线程池重用在共享的无界队列上操作的固定数量的线程。在任何时候,最多nThreads个线程都是活动的处理任务。
静态ExecutorService newSingleThreadExecutor()–创建一个执行程序,该执行程序使用在不受限制的队列上操作的单个工作线程
静态ScheduledExecutorService newSingleThreadScheduledExecutor()–创建一个单线程执行器,该执行器可以计划命令在给定延迟后运行或者定期执行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)–创建一个线程池,该线程池可以计划在给定延迟后运行或者定期执行的命令。
Java ExecutorService示例
1在此示例中,使用Executors类的newFixedThreadPool()方法创建了ExecutorService。线程池由2个线程创建,因此这2个线程将用于执行提交的任务。
public class ExecutorExp { public static void main(String[] args) { // creating executor with pool of 2 threads ExecutorService executor = Executors.newFixedThreadPool(2); // running 4 tasks using pool of 2 threads executor.execute(new Task()); executor.execute(new Task()); executor.execute(new Task()); executor.execute(new Task()); executor.shutdown(); } } class Task implements Runnable{ @Override public void run() { System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
输出:
Executing task (Thread name)- pool-1-thread-2 Executing task (Thread name)- pool-1-thread-1 Executing task (Thread name)- pool-1-thread-2 Executing task (Thread name)- pool-1-thread-1
如我们所见,使用池中的2个线程执行了4个任务。
2在此Java ExecutorService示例中,ExecutorService的Submit方法用于运行可运行任务。
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class ExecutorExp { public static void main(String[] args) { // creating executor with pool of 2 threads ExecutorService executor = Executors.newFixedThreadPool(2); // running 4 tasks using pool of 2 threads Future<?> f1 = executor.submit(new Task()); Future<?> f2 = executor.submit(new Task()); Future<?> f3 = executor.submit(new Task()); Future<?> f4 = executor.submit(new Task()); try { System.out.println("f1- " + f1.get()); System.out.println("f2- " + f2.get()); if(f3.get() == null) { System.out.println("submitted task executed successfully"); } } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } executor.shutdown(); } } class Task implements Runnable{ @Override public void run() { System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
输出:
Executing task (Thread name)- pool-1-thread-2 Executing task (Thread name)- pool-1-thread-1 Executing task (Thread name)- pool-1-thread-2 Executing task (Thread name)- pool-1-thread-1 f1- null f2- null submitted task executed successfully
如我们所见,对于可运行任务,Future的get()方法在成功完成任务后返回null。
3在此示例中,ExecutorService的Submit方法用于运行可调用任务。有两个实现Callable的类,submit方法用于运行那些可调用的任务。显示从Callable返回的更高值。
public class ExecutorExp { public static void main(String[] args) { // creating executor with pool of 2 threads ExecutorService executor = Executors.newFixedThreadPool(2); // running 4 tasks using pool of 2 threads Future<String> f1 = executor.submit(new Task1()); Future<String> f2 = executor.submit(new Task1()); Future<String> f3 = executor.submit(new Task2()); Future<String> f4 = executor.submit(new Task2()); try { System.out.println("f1- " + f1.get()); System.out.println("f2- " + f2.get()); System.out.println("f3- " + f3.get()); System.out.println("f4- " + f4.get()); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } executor.shutdown(); } } class Task1 implements Callable<String>{ @Override public String call() throws Exception { System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return "In Task1"; } } class Task2 implements Callable<String>{ @Override public String call() throws Exception { System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return "In Task2"; } }
输出:
Executing task (Thread name)- pool-1-thread-1 Executing task (Thread name)- pool-1-thread-2 f1- In Task1 Executing task (Thread name)- pool-1-thread-1 f2- In Task1 Executing task (Thread name)- pool-1-thread-2 f3- In Task2 f4- In Task2
Java ScheduledExecutorService示例
在此示例中,使用Executors类的newScheduledThreadPool()方法创建了ScheduledExecutorService。计划在3秒的延迟后执行可调用任务。
public class ExecutorExp { public static void main(String[] args) { ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2); // Callable implementation Callable<String> c = ()->{ System.out.println("Executed at- " + new Date()); return "Executing task"; }; System.out.println("Time before execution started- " + new Date()); // scheduling tasks with callable as param to be // executed after a delay of 3 Secs ScheduledFuture<String> sf = scheduledExecutor.schedule(c, 3, TimeUnit.SECONDS); try { System.out.println("Value- " + sf.get()); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } scheduledExecutor.shutdown(); } }
输出:
Time before execution started- Fri Jan 04 10:25:14 IST 2019 Executed at- Fri Jan 04 10:25:17 IST 2019 Value- Executing task
ExecutorService关闭示例
在前面的示例中,使用shutdown()方法终止执行程序。因为shutdown()方法可确保在关机之前执行先前提交的任务,所以没有问题。但是还有一个shutdownNow()方法,它不等待主动执行的任务终止。让我们看一个例子。
public class ExecutorExp { public static void main(String[] args) { // creating executor with pool of 2 threads ExecutorService executor = Executors.newFixedThreadPool(2); // running 4 tasks using pool of 2 threads Future<?> f1 = executor.submit(new Task()); Future<?> f2 = executor.submit(new Task()); Future<?> f3 = executor.submit(new Task()); Future<?> f4 = executor.submit(new Task()); System.out.println("shutting down instantly"); executor.shutdownNow(); } } class Task implements Runnable{ @Override public void run() { System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
输出:
java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:340) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:403) at com.theitroad.Task.run(ExecutorExp.java:46) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:844) java.lang.InterruptedException: sleep interrupted at java.base/java.lang.Thread.sleep(Native Method) at java.base/java.lang.Thread.sleep(Thread.java:340) at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:403) at com.theitroad.Task.run(ExecutorExp.java:46) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:844) shutting down instantly Executing task (Thread name)- pool-1-thread-1 Executing task (Thread name)- pool-1-thread-2
如我们所见,关机是即时的。由于在线程上调用了sleep方法,因此为了关机而被中断,这就是为什么抛出InterruptedException的原因。
根据Java文档的建议是分两个阶段关闭ExecutorService。
首先,通过调用shutdown拒绝传入的任务,然后在必要时调用shutdownNow()来取消所有延迟的任务。应该将shutdownNow()与awaitTermination()方法一起调用,以留出时间来完成正在执行的任务。下一个示例显示此用法。
public class ExecutorExp { public static void main(String[] args) { // creating executor with pool of 2 threads ExecutorService executor = Executors.newFixedThreadPool(2); // running 4 tasks using pool of 2 threads Future<?> f1 = executor.submit(new Task()); Future<?> f2 = executor.submit(new Task()); Future<?> f3 = executor.submit(new Task()); Future<?> f4 = executor.submit(new Task()); System.out.println("shutting down instantly"); //executor.shutdownNow(); shutdownAndAwaitTermination(executor); } // For shutdown static void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(1000, TimeUnit.MILLISECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(500, TimeUnit.MILLISECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } } class Task implements Runnable{ @Override public void run() { System.out.println("Executing task (Thread name)- " + Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }