Java ThreadPoolExecutor.

时间:2020-02-23 14:35:16  来源:igfitidea点击:

Java 5推出了名为Eccomer Frameworks的新并发API,使程序员生活变得简单。

它简化了多线程应用的设计和开发。
它由主要是执行器,ExecutorService接口和ThreadPoolExecutor类,其实现两个接口:执行程序和ExecutorService。

ThreadPoolExecutor类提供了线程池的实现。
我们将在教程的后期部分了解更多信息。

为什么我们需要执行者框架?

当我们创建一个简单的多线程应用程序时,我们会使用Runnable创建Runnable对象并构造线程对象,我们需要创建,执行和管理线程。

我们可能很难这样做。
执行者框架为我们提供。

它负责创建,执行和管理线程,而不仅仅是这样,它也提高了应用程序的性能。

当你遵循时 task每个线程策略,我们为每个任务创建一个新线程,如果系统高载,则会退出内存错误,系统将失败。
如果你使用 ThreadPoolExecutor,我们不会为新任务创建线程。
一旦线程完成一个任务,我们将为有限数量的线程分配任务到一个任务,它将被给予另一个任务。

执行者框架的核心接口是执行程序。
它具有一个名为"执行"的方法。

public interface Executor {
 void execute(Runnable command);
}

还有另一个名为ExecutorService的接口,可扩展执行器接口。

它可以称为执行器,该执行程序提供可以控制可以生成用于跟踪一个或者多个异步任务进度的未来的终止和方法的方法。
它具有提交,关机,截止等的方法。 ThreadPoolExecutor是线池的实际实现。
它扩展了ApprestThreadPoolExecutor,它实现了ExecutorService接口。

我们可以从Executor类的出厂方法创建ThreadPoolExecutor。
建议使用方法来获得一个实例 ThreadPoolExecutor

4 factory methods在executors类中,可用于获取ThreadPoolExecutor的实例。
我们正在使用executors的newfixedthreadpool来获取ThreadPoolExecutor的实例。

例子:

ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);

以下是executors类中存在的四种工厂方法。

newfixedthreadpool:此方法返回其最大大小(假设n个线程)的线程池执行器是修复的。

如果所有N个线程正忙于执行任务并提交其他任务,那么它们必须在队列中,直到线程可用。

newcachedthreadpool:此方法返回一个无限的线程池。

它没有最大尺寸,但如果它具有较少数量的任务,那么它将拆除未使用的线程。
如果线程未使用1分钟(KeepAliveTime),那么它将撕下它。

newsinglethreadeDexecutor:此方法返回一个executor,保证使用单个线程。

NewscheduledThreadPool:此方法返回一个固定大小的线程池,可以在给定延迟之后或者定期执行以后调度命令。

让我们创建一个基本的例子 ThreadPoolExecutor我们将使用newfixedthreadpool创建一个实例

ThreadPoolExecutor
让我们创建一个任务 FetchDataFromFile
这里 Task将是阅读不同的文件并处理它们。

package org.igi.theitroad.bean;
 
public class FetchDataFromFile implements Runnable{
 
 private final String fileName;
 
 public FetchDataFromFile(String fileName) {
  super();
  this.fileName = fileName;
 }
 
 @Override
 public void run() {
  try {
   System.out.println("Fetching data from "+fileName+" by "+Thread.currentThread().getName());
   Thread.sleep(5000); //Reading file
   System.out.println("Read file successfully: "+fileName+" by "+Thread.currentThread().getName());
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
 public String getFileName() {
  return fileName;
 }
}

让我们创造 ThreadPoolExecutor这将消耗上面的任务并处理它。

package org.igi.theitroad;
 
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
 
public class ThreadPoolExecutorMain {
 public static void main(String args[]) {
  //Getting instance of ThreadPoolExecutor using  Executors.newFixedThreadPool factory method
  ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
  for (int i = 0; i < 10; i++) {
   FetchDataFromFile fdff = new FetchDataFromFile("File " + i);
   System.out.println("A new file has been added to read : " + fdff.getFileName());
   //Submitting task to executor
   threadPoolExecutor.execute(fdff);
  }
  threadPoolExecutor.shutdown();
 }
}

我们使用了新的emfixedthreadpool,所以当我们提交10个任务时,将创建5个新线程并将执行

5 tasks
其他5个任务将在等待队列中等待。
一旦任何任务都将由线程完成,此线程将挑选另一个任务,并将执行它。

使用ThreadPoolExecutor的构造函数

如果要自定义ThreadPoolExecutor的创建,我们也可以使用其构造函数。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
TimeUnit unit,BlockingQueue workQueue ,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;

corePoolSize:corepoolsize是要在池中保留的线程数,即使它们是空闲的
MaximumPoolSize:池中允许的最大线程数
keepAliveTime:当我们有更多的线程时可用
corePoolSize然后,KeepAliveTime是在终止之前的那个线程等待任务的时间。
unit:时间单位适用于KeepAliveTime
workQueue:工作用是在执行前持有任务的块板。

threadFactory:用于创建新线程的工厂。
handler:RejectedExecutionHandler在执行中使用的是块或者队列已满。
让我们创建一个 RejectedExecutionHandler用于处理被拒绝的任务。

package org.igi.theitroad.bean;
 
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
 
public class RejectTaskHandler implements RejectedExecutionHandler{
 
 @Override
 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  FetchDataFromFile ffdf=(FetchDataFromFile) r;
  System.out.println("Sorry!! We won't be able to read :"+ffdf.getFileName());  
 }
}

让我们改变 ThreadPoolExecutorMain.java到以下代码以使用ThreadPoolExecutor构造函数。

package org.igi.theitroad.bean;
 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class ThreadPoolExecutorMain {
 public static void main(String args[]) {
  
  //Wait queue is used to store waiting task
  BlockingQueue queue=new LinkedBlockingQueue(4);
  
  //Thread factory to create new threads
  ThreadFactory threadFactory=Executors.defaultThreadFactory();
  
  //Rejection handler in case task get rejected
  RejectTaskHandler rth=new RejectTaskHandler();
  //ThreadPoolExecutor constructor to create its instance
  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 10L, TimeUnit.MILLISECONDS, queue,
    threadFactory,rth
    );
  for (int i = 1; i <= 10; i++) {
   FetchDataFromFile fdff = new FetchDataFromFile("File " + i);
   System.out.println("A new file has been added to read : " + fdff.getFileName());
   //Submitting task to executor
   threadPoolExecutor.execute(fdff);
  }
  threadPoolExecutor.shutdown();
 }
}

运行上面的程序时,我们将得到以下输出:

A new file has been added to read : File 1
A new file has been added to read : File 2
A new file has been added to read : File 3
A new file has been added to read : File 4
Fetching data from File 1 by pool-1-thread-1
A new file has been added to read : File 5
A new file has been added to read : File 6
A new file has been added to read : File 7
Sorry!! We won’t be able to read :File 7
A new file has been added to read : File 8
Sorry!! We won’t be able to read :File 8
A new file has been added to read : File 9
Sorry!! We won’t be able to read :File 9
A new file has been added to read : File 10
Sorry!! We won’t be able to read :File 10
Fetching data from File 6 by pool-1-thread-2
Read file successfully: File 1 by pool-1-thread-1
Read file successfully: File 6 by pool-1-thread-2
Fetching data from File 2 by pool-1-thread-1
Fetching data from File 3 by pool-1-thread-2
Read file successfully: File 3 by pool-1-thread-2
Read file successfully: File 2 by pool-1-thread-1
Fetching data from File 4 by pool-1-thread-2
Fetching data from File 5 by pool-1-thread-1

如果我们发现此处文件7,文件8,文件9和文件10已被拒绝。
让我们明白他们为什么被拒绝。
ThreadPoolExecutor的构造函数中的最大池大小为2,因此当我们将10个任务提交给Thread Pool时,创建了2个线程并开始处理2任务和4个任务排队

LinkedBlockingQueue如果LinkedBlockingQueue成为完整的,所以休息任务被拒绝了。

如何确定线程池的大小

我们不应该纠正线程池的大小。
它应该由配置提供或者计算 Runtime.availableProcessors()

线程尺寸不应该太大或者太小。
如果我们选择太大的线程池大小,那么它将重载(overloading)系统并无法正常工作。
如果选择线程池大小太小,则会影响吞吐量和性能。