Java ThreadPoolExecutor –带ExecutorService服务的线程池

时间:2020-01-09 10:35:13  来源:igfitidea点击:

Java中的ThreadPoolExecutor用于使用可能的多个池线程之一执行每个提交的任务。创建ThreadPoolExecutor的实例时,也会创建一个线程池,并且该线程池中的一个线程用于执行任务。

Java ThreadPoolExecutor

ThreadPoolExecutor类是Java Executor框架的一部分,并且在Java并发API中。此类同时实现Executor和ExecutorService接口。

ThreadPoolExecutor中的线程池

ThreadPoolExecutor使用线程池中的线程执行任务。通过使用线程池获得的优势是

  • 池线程与它执行的Runnable和Callable任务分开存在,通常用于执行多个任务。

  • 线程对象占用大量内存。在大型应用程序中,如果每个任务使用其自己的线程,则分配和取消分配许多线程对象会产生大量的内存管理开销。使用池化线程可最大程度地减少线程创建带来的开销。

  • 使用线程池还提供了一种绑定和管理资源的方法。我们可以有一个有限的线程池。我们还可以为线程配置keepAliveTime,以便在没有太多任务的情况下终止线程,从而减小了整个池的大小。

  • 除了通过线程池提供的这些优势之外,ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量,正在主动执行任务的线程数量。

Java ThreadPoolExecutor构造函数

ThreadPoolExecutor类具有4个构造函数,可用于获取ThreadPoolExecutor的实例。

  • ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit单位,BlockingQueue <Runnable> workQueue)–使用给定的初始参数,默认的线程工厂和默认的拒绝执行处理程序创建一个新的ThreadPoolExecutor。

  • ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,长keepAliveTime,TimeUnit单位,BlockingQueue <Runnable> workQueue,RejectedExecutionHandler处理程序)–使用给定的初始参数和默认线程工厂创建一个新的ThreadPoolExecutor。

  • ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit单位,BlockingQueue <Runnable> workQueue,ThreadFactory threadFactory)–使用给定的初始参数和默认拒绝执行处理程序创建一个新的ThreadPoolExecutor。

  • ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,长keepAliveTime,TimeUnit单元,BlockingQueue <Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler处理程序)–使用给定的初始参数创建一个新的ThreadPoolExecutor。

在ThreadPoolExecutor类的这些构造函数中使用的参数如下:

  • corePoolSize –要保留在池中的线程数。除非设置了allowCoreThreadTimeOut,否则即使线程空闲,也将创建这些数量的线程。

  • maximumPoolSize –允许在池中的最大线程数。

  • keepAliveTime –当线程数大于线程池中的corePoolSize时,keepAliveTime是多余的空闲线程将在终止之前等待新任务的最长时间。

  • unit – keepAliveTime参数的时间单位

  • workQueue –在执行任务之前用于保留任务的队列。此队列将仅保存execute方法提交的Runnable任务。使用的队列可以是有界或者无界阻塞队列。

  • handler –因达到线程边界和队列容量而被阻止执行时使用的处理程序。

  • threadFactory –执行程序创建新线程时要使用的工厂。

使用执行器工厂方法创建ThreadPoolExecutor实例

我们可以使用Executors类提供的静态工厂方法来获取ThreadPoolExecutor,而不是直接使用上述构造方法之一直接创建ThreadPoolExecutor的实例。

  • newCachedThreadPool()–创建一个线程池,该线程池根据需要创建新线程,但是将在可用之前重用以前构造的线程。

  • newCachedThreadPool(ThreadFactory threadFactory)–创建一个线程池,该线程池根据需要创建新线程,但是在可用之前重用以前构造的线程,并在需要时使用提供的ThreadFactory创建新线程。

  • newFixedThreadPool(int nThreads)–创建一个线程池,该线程池重用在共享无界队列上操作的固定数量的线程。

  • newFixedThreadPool(int nThreads,ThreadFactory threadFactory)–创建一个线程池,该线程池重复使用固定数量的线程,这些线程在共享的无边界队列上操作,并在需要时使用提供的ThreadFactory创建新线程。

  • newSingleThreadExecutor()–创建一个执行程序,该执行程序使用在不受限制的队列上操作的单个工作线程。

  • newSingleThreadExecutor(ThreadFactory threadFactory)–创建一个执行程序,该执行程序使用在不受限制的队列上操作的单个工作线程,并在需要时使用提供的ThreadFactory创建新线程。

使用构造函数的Java ThreadPoolExecutor示例

如果我们选择自己创建ThreadPoolExecutor实例,并通过传递参数使用构造函数对其进行初始化。

public class ExecutorExp {
  public static void main(String[] args) {
    // creating executor with core pool of 2 threads. max pool is 4
    //, keep alive time- 5 secs
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 5, 
                  TimeUnit.SECONDS, 
                  new ArrayBlockingQueue<Runnable>(3), 
                  Executors.defaultThreadFactory(), 
                  new ThreadPoolExecutor.DiscardOldestPolicy());
    for(int i = 0; i < 6; i++)
      executor.execute(new Task());
    
    executor.shutdown();
  }
}
class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (thread name)- " + Thread.currentThread().getName());
    // delay to keep the thread busy
    // so that pool is used
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

输出:

Executing task (thread name)- pool-1-thread-1
Executing task (thread name)- pool-1-thread-2
Executing task (thread name)- pool-1-thread-3
Executing task (thread name)- pool-1-thread-2
Executing task (thread name)- pool-1-thread-3
Executing task (thread name)- pool-1-thread-1

如我们所见,使用睡眠方法的线程一直处于繁忙状态,因此除了2个线程的核心池之外,还会创建一个线程(最大池大小为4)来执行任务。

使用Executors工厂方法的Java ThreadPoolExecutor示例

1个使用Executors.newSingleThreadExecutor()的Java示例,该示例使用单个工作线程。

public class ExecutorExp {
  public static void main(String[] args) {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    for(int i = 0; i < 4; i++) {
      executor.execute(new Task());	
    }
    executor.shutdown();
  }
}

class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (thread name)- " + Thread.currentThread().getName());
    // delay to keep the thread busy
    // so that pool is used
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

输出:

Executing task (thread name)- pool-1-thread-1
Executing task (thread name)- pool-1-thread-1
Executing task (thread name)- pool-1-thread-1
Executing task (thread name)- pool-1-thread-1

如我们所见,一个线程执行所有4个任务。

2使用Executors.newFixedThreadPool的Java示例。此方法创建一个线程池,该线程池重用在共享的无边界队列上操作的固定数量的线程。使用此方法时,内部Executors类使用以下参数创建ThreadPoolExecutor实例:

new ThreadPoolExecutor(nThreads, nThreads,
          0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue());
public class ExecutorExp {
  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    for(int i = 0; i < 4; i++) {
      executor.execute(new Task());	
    }
    executor.shutdown();
  }
}
class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (thread name)- " + Thread.currentThread().getName());
    // delay to keep the thread busy
    // so that pool is used
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

输出:

Executing task (thread name)- pool-1-thread-2
Executing task (thread name)- pool-1-thread-1
Executing task (thread name)- pool-1-thread-2
Executing task (thread name)- pool-1-thread-1

如我们所见,有2个线程用于执行提交的任务。

3使用Executors.newCachedThreadPool()的Java示例。此方法创建一个线程池,该线程池可根据需要创建新线程,但是将在先前构造的线程可用时重用它们。使用此方法时,内部Executors类使用以下参数创建ThreadPoolExecutor实例:

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());
public class ExecutorExp {

  public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    for(int i = 0; i < 4; i++) {
      executor.execute(new Task());	
    }
    executor.shutdown();
  }
}
class Task implements Runnable{
  @Override
  public void run() {
    System.out.println("Executing task (thread name)- " + Thread.currentThread().getName());
    // delay to keep the thread busy
    // so that pool is used
    try {
      Thread.sleep(500);
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

输出:

Executing task (thread name)- pool-1-thread-3
Executing task (thread name)- pool-1-thread-2
Executing task (thread name)- pool-1-thread-4
Executing task (thread name)- pool-1-thread-1

如我们所见,有4个新线程用于4个提交的任务。