信号量
信号量是一种线程同步构造,可用于在线程之间发送信号以避免丢失信号,或者像使用锁一样保护关键部分。 Java 5带有java.util.concurrent包中的信号量实现,因此我们不必实现自己的信号量。尽管如此,了解其实现和使用背后的理论还是很有用的。
Java 5带有内置的"信号量",因此我们不必自己实现。
简单信号量
这是一个简单的Semaphore
实现:
public class Semaphore { private boolean signal = false; public synchronized void take() { this.signal = true; this.notify(); } public synchronized void release() throws InterruptedException{ while(!this.signal) wait(); this.signal = false; } }
take()方法发送一个信号,该信号内部存储在Semaphore中。 release()
方法等待一个信号。收到信号标志后,将再次清除该标志,并退出release()
方法。
使用这样的信号量可以避免信号丢失。我们将调用take()
而不是notify()
和release()
而不是wait()
。如果对take()的调用发生在对release()的调用之前,则调用release()的线程仍将知道调用了take(),因为信号内部存储在signal中。多变的。 " wait()"和" notify()"不是这种情况。
使用信号量进行信号传递时,名称" take()"和" release()"可能看起来有些奇怪。名称源于使用信号量作为锁,如本文后面所述。在这种情况下,名称更有意义。
使用信号量发送信号
这是两个使用信号量互相发信号的线程的简化示例:
Semaphore semaphore = new Semaphore(); SendingThread sender = new SendingThread(semaphore); ReceivingThread receiver = new ReceivingThread(semaphore); receiver.start(); sender.start();
public class SendingThread { Semaphore semaphore = null; public SendingThread(Semaphore semaphore){ this.semaphore = semaphore; } public void run(){ while(true){ //do something, then signal this.semaphore.take(); } } }
public class RecevingThread { Semaphore semaphore = null; public ReceivingThread(Semaphore semaphore){ this.semaphore = semaphore; } public void run(){ while(true){ this.semaphore.release(); //receive signal, then do something... } } }
计数信号量
上一节中的"信号量"实现不计算" take()"方法调用发送给它的信号的数量。我们可以更改Semaphore
来做到这一点。这称为计数信号量。这是计数信号量的简单实现:
public class CountingSemaphore { private int signals = 0; public synchronized void take() { this.signals++; this.notify(); } public synchronized void release() throws InterruptedException{ while(this.signals == 0) wait(); this.signals--; } }
有界信号量
" CoutingSemaphore"在可以存储多少个信号方面没有上限。我们可以将信号量实现更改为上限,如下所示:
public class BoundedSemaphore { private int signals = 0; private int bound = 0; public BoundedSemaphore(int upperBound){ this.bound = upperBound; } public synchronized void take() throws InterruptedException{ while(this.signals == bound) wait(); this.signals++; this.notify(); } public synchronized void release() throws InterruptedException{ while(this.signals == 0) wait(); this.signals--; this.notify(); } }
请注意,如果信号数量等于上限,那么take()方法现在将如何阻塞。如果" BoundedSemaphore"已达到其信号上限,则直到线程调用" release()"后,才允许调用" take()"的线程传递其信号。
将信号量用作锁
可以将有界信号量用作锁。为此,将上限设置为1,并调用take()
和release()
来保护关键部分。这是一个例子:
BoundedSemaphore semaphore = new BoundedSemaphore(1); ... semaphore.take(); try{ //critical section } finally { semaphore.release(); }
与信令用例相比,方法" take()"和" release()"现在由同一线程调用。由于只允许一个线程获取信号量,因此所有其他调用take()
的线程都将被阻塞,直到调用release()
为止。永远不会阻塞对release()的调用,因为始终首先有对take()的调用。
我们还可以使用有界信号量来限制代码段中允许的线程数。例如,在上面的示例中,如果将" BoundedSemaphore"的限制设置为5,会发生什么情况?一次允许5个线程进入关键部分。但是,我们必须确保这5个线程的线程操作不会冲突,否则应用程序将失败。
从finally块内部调用relase()方法,以确保即使从关键部分抛出异常,该方法也被调用。