Java相位器Phaser
作为Java 5中java.util.concurrent包的一部分,添加了许多同步帮助,例如Semaphore,Exchanger。在Java 7并发中添加的另一种同步辅助工具是Phaser,它是在Java 7中添加的。
Java并发中的相位器
Java中的相位器也是CountDownLatch和CyclicBarrier之类的同步屏障,其中线程需要在屏障处等待,直到所有线程到达屏障(即屏障被触发)为止。通过在多个阶段同步线程,Phaser提供了更大的灵活性。在每个阶段中,可以动态注册和注销线程。
关于Java中的Phaser的一些要点如下:
使用Phaser,我们可以重新使用同一Phaser实例在多个阶段进行同步。
一旦所有线程注册一个阶段后到达该屏障,该阶段即被视为完成,下一阶段开始。
我们也可以使用Phaser同步单个相位,但是当用于同步多个相位时,它会更加有用。
Java Phaser构造函数
Phaser类中有四个构造函数。
Phaser()–创建一个没有初始注册方,没有父级且初始阶段号为0的新相位器。
Phaser(int party)–使用给定数目的未到达注册方,没有父级,初始阶段号为0,创建一个新的Phaser。
移相器(Phaser父级,int各方)–使用给定的父级和注册的未到达方数创建一个新的移相器。
Phaser(Phaser父级)–与给定的父级一起创建新的相位器,并且没有初始注册方。
Phaser如何在Java中工作
首先是创建一个Phaser实例。
通过调用register()方法向相位器注册参与方,我们也可以使用构造函数,其中将参与方数目作为参数传递。
要发出信号,表明一方已到达阶段(必须到达)之一。一旦所有注册方都到达一个阶段,则认为该阶段已完成。
移相器的每一代都有一个关联的相数。阶段号从零开始,并在各方到达阶段器时提前。
Java中的相位器示例
这是一个展示Phaser运作的示例。有两个可运行任务,将作为两个单独的阶段执行。第一个可运行任务(FirstTask)由三个线程执行,因此我们可以看到使用bulkRegister()方法注册了3个参与者。
对于第二个可运行任务(SecondTask),在可运行类中使用注册方法。
public class PhaserDemo { public static void main(String[] args) { Phaser ph = new Phaser(1); // registering 3 parties in bulk ph.bulkRegister(3); System.out.println("Phase in Main " + ph.getPhase() + " started"); // starting 3 threads for(int i = 0; i < 3; i++) { new Thread(new FirstTask("Thread-"+i, ph)).start(); } int curPhase = ph.getPhase(); // This is to make main thread wait ph.arriveAndAwaitAdvance(); System.out.println("Phase in Main " + curPhase + " completed"); for(int i = 0; i < 2; i++) { new Thread(new SecondTask("Thread-"+i, ph)).start(); } ph.arriveAndAwaitAdvance(); System.out.println("Phase in Main-2 " + ph.getPhase() + " completed"); // deregistering the main thread ph.arriveAndDeregister(); } } class FirstTask implements Runnable { private String threadName; private Phaser ph; FirstTask(String threadName, Phaser ph){ this.threadName = threadName; this.ph = ph; } @Override public void run() { System.out.println("In First Task.. " + threadName); // parties will wait here ph.arriveAndAwaitAdvance(); System.out.println("Deregistering, Phase- "+ ph.getPhase() + " Completed"); ph.arriveAndDeregister(); } } class SecondTask implements Runnable { private String threadName; private Phaser ph; SecondTask(String threadName, Phaser ph){ this.threadName = threadName; this.ph = ph; ph.register(); } @Override public void run() { System.out.println("In SecondTask.. " + threadName); ph.arriveAndAwaitAdvance(); System.out.println("In SecondTask.. Phase-" + ph.getPhase() + " completed" + threadName); ph.arriveAndDeregister(); } }
输出:
Phase in Main 0 started In First Task.. Thread-0 In First Task.. Thread-1 In First Task.. Thread-2 Deregistering, Phase- 1 Completed Phase in Main 0 completed Deregistering, Phase- 1 Completed Deregistering, Phase- 1 Completed In SecondTask.. Thread-0 In SecondTask.. Thread-1 Phase in Main-2 2 completed In SecondTask.. Phase-2 completedThread-0 In SecondTask.. Phase-2 completedThread-1
Phaser类中的方法
下面列出了Java中Phaser类的一些重要方法
到达()–到达此移相器,而无需等待其他人到达。
ArgumentAndAwaitAdvance()–到达此移相器并等待其他人。
ArgumentAndDeregister()–到达此相位器并从中注销,而无需等待其他相位器到达。
awaitAdvance(int phase)–等待此相位器的相位从给定的相位值开始前进,如果当前相位不等于给定的相位值或者此相位器终止,则立即返回。
bulkRegister(int party)–在此移相器中添加给定数量的未到达的新参与者。
getArrivedParties()-返回已到达此相位器当前阶段的注册方的数量。
getParent()–返回此相位器的父级;如果没有,则返回null。
getPhase()–返回当前阶段编号。
isTerminated()–如果此相位器已终止,则返回true。
onAdvance(int phase,intregisteredParties)–一种可重写的方法,用于在即将到来的阶段前进时执行操作并控制终止。
register()–向此移相器添加一个新的未到达方。