线程池
当我们需要限制同时在应用程序中运行的线程数时,线程池很有用。启动新线程会带来性能开销,并且每个线程还为其堆栈分配了一些内存等。
可以将任务传递给线程池,而不是为要同时执行的每个任务启动新线程。一旦池中有任何空闲线程,便将任务分配给其中一个并执行。在内部将任务插入到阻塞队列中,池中的线程将从中退出。当将新任务插入队列时,空闲线程之一将成功使它出队并执行它。池中的其余空闲线程将被阻塞,以等待出队任务。
线程池通常在多线程服务器中使用。通过网络到达服务器的每个连接都包装为一个任务,并传递给线程池。线程池中的线程将同时处理连接上的请求。后面的线索将详细介绍有关在Java中实现多线程服务器的信息。
Java 5在java.util.concurrent
包中带有内置的线程池,因此我们不必实现自己的线程池。我们可以在java.util.concurrent.ExecutorService的文本中阅读有关此内容的更多信息。无论如何,了解线程池的实现还是很有用的。
这是一个简单的线程池实现。请注意,此实现使用我自己的BlockingQueue
类,如我的"阻塞队列"教程中所述。在实际的实现中,我们可能会改用Java的内置阻塞队列之一。
public class ThreadPool { private BlockingQueue taskQueue = null; private List<PoolThread> threads = new ArrayList<PoolThread>(); private boolean isStopped = false; public ThreadPool(int noOfThreads, int maxNoOfTasks){ taskQueue = new BlockingQueue(maxNoOfTasks); for(int i=0; i<noOfThreads; i++){ threads.add(new PoolThread(taskQueue)); } for(PoolThread thread : threads){ thread.start(); } } public synchronized void execute(Runnable task) throws Exception{ if(this.isStopped) throw new IllegalStateException("ThreadPool is stopped"); this.taskQueue.enqueue(task); } public synchronized void stop(){ this.isStopped = true; for(PoolThread thread : threads){ thread.doStop(); } } }
public class PoolThread extends Thread { private BlockingQueue taskQueue = null; private boolean isStopped = false; public PoolThread(BlockingQueue queue){ taskQueue = queue; } public void run(){ while(!isStopped()){ try{ Runnable runnable = (Runnable) taskQueue.dequeue(); runnable.run(); } catch(Exception e){ //log or otherwise report exception, //but keep pool thread alive. } } } public synchronized void doStop(){ isStopped = true; this.interrupt(); //break pool thread out of dequeue() call. } public synchronized boolean isStopped(){ return isStopped; } }
线程池实现由两部分组成。一个ThreadPool
类是线程池的公共接口,一个PoolThread
类实现了执行任务的线程。
为了执行任务,使用Runnable
实现作为参数调用ThreadPool.execute(Runnable r)
方法。 " Runnable"在内部进入阻塞队列,等待被出队。
Runnable
将由一个空闲的PoolThread
出队并执行。我们可以在PoolThread.run()
方法中看到这一点。执行后,PoolThread
循环并尝试再次使任务出队,直到停止。
要停止ThreadPool,需要调用ThreadPool.stop()方法。在isStopped成员内部记录了所调用的停止。然后,通过在每个线程上调用doStop()
来停止池中的每个线程。注意,如果在调用stop()之后调用了execute(),则execute()方法将如何抛出IllegalStateException。
线程在完成当前正在执行的任何任务后将停止。注意PoolThread.doStop()
中的this.interrupt()
调用。这确保了在taskQueue.dequeue()调用内的wait()调用中阻塞的线程脱离了wait()调用,并使dequeue()方法调用带有InterruptedException。 抛出。报告此异常后,将其报告到" PoolThread.run()"方法中,然后检查" isStopped"变量。由于" isStopped"现在为真,因此" PoolThread.run()"将退出并且线程死亡。