Java相位器Phaser

时间:2020-01-09 10:35:10  来源:igfitidea点击:

作为Java 5中java.util.concurrent包的一部分,添加了许多同步帮助,例如Semaphore,Exchanger。在Java 7并发中添加的另一种同步辅助工具是Phaser,它是在Java 7中添加的。

Java并发中的相位器

Java中的相位器也是CountDownLatch和CyclicBarrier之类的同步屏障,其中线程需要在屏障处等待,直到所有线程到达屏障(即屏障被触发)为止。通过在多个阶段同步线程,Phaser提供了更大的灵活性。在每个阶段中,可以动态注册和注销线程。

关于Java中的Phaser的一些要点如下:

  • 使用Phaser,我们可以重新使用同一Phaser实例在多个阶段进行同步。

  • 一旦所有线程注册一个阶段后到达该屏障,该阶段即被视为完成,下一阶段开始。

  • 我们也可以使用Phaser同步单个相位,但是当用于同步多个相位时,它会更加有用。

Java Phaser构造函数

Phaser类中有四个构造函数。

  • Phaser()–创建一个没有初始注册方,没有父级且初始阶段号为0的新相位器。

  • Phaser(int party)–使用给定数目的未到达注册方,没有父级,初始阶段号为0,创建一个新的Phaser。

  • 移相器(Phaser父级,int各方)–使用给定的父级和注册的未到达方数创建一个新的移相器。

  • Phaser(Phaser父级)–与给定的父级一起创建新的相位器,并且没有初始注册方。

Phaser如何在Java中工作

  • 首先是创建一个Phaser实例。

  • 通过调用register()方法向相位器注册参与方,我们也可以使用构造函数,其中将参与方数目作为参数传递。

  • 要发出信号,表明一方已到达阶段(必须到达)之一。一旦所有注册方都到达一个阶段,则认为该阶段已完成。

  • 移相器的每一代都有一个关联的相数。阶段号从零开始,并在各方到达阶段器时提前。

Java中的相位器示例

这是一个展示Phaser运作的示例。有两个可运行任务,将作为两个单独的阶段执行。第一个可运行任务(FirstTask)由三个线程执行,因此我们可以看到使用bulkRegister()方法注册了3个参与者。

对于第二个可运行任务(SecondTask),在可运行类中使用注册方法。

public class PhaserDemo {
  public static void main(String[] args) {
    Phaser ph = new Phaser(1);
    // registering 3 parties in bulk
    ph.bulkRegister(3);
    System.out.println("Phase in Main " + ph.getPhase() + " started");
    // starting 3 threads
    for(int i = 0; i < 3; i++) {      	
      new Thread(new FirstTask("Thread-"+i, ph)).start();
    }
    int curPhase = ph.getPhase();
    // This is to make main thread wait
    ph.arriveAndAwaitAdvance();
    System.out.println("Phase in Main " + curPhase + " completed");

    for(int i = 0; i < 2; i++) {     	
      new Thread(new SecondTask("Thread-"+i, ph)).start();
    }
    ph.arriveAndAwaitAdvance();
    System.out.println("Phase in Main-2 " + ph.getPhase() + " completed");
    // deregistering the main thread
    ph.arriveAndDeregister();
  }
}

class FirstTask implements Runnable {
  private String threadName;
  private Phaser ph;

  FirstTask(String threadName, Phaser ph){
    this.threadName = threadName;
    this.ph = ph;       
  }
  @Override
  public void run() {
    System.out.println("In First Task.. " + threadName);
    // parties will wait here
    ph.arriveAndAwaitAdvance();
    
    System.out.println("Deregistering, Phase- "+ ph.getPhase() + " Completed");
    ph.arriveAndDeregister();
  }
}

class SecondTask implements Runnable {
  private String threadName;
  private Phaser ph;
    
  SecondTask(String threadName, Phaser ph){
    this.threadName = threadName;
    this.ph = ph;
    ph.register();
  }
    
  @Override
  public void run() {
    System.out.println("In SecondTask.. " + threadName);
    ph.arriveAndAwaitAdvance();
    System.out.println("In SecondTask.. Phase-" + ph.getPhase() + " completed" + threadName);
    ph.arriveAndDeregister();
  }
}

输出:

Phase in Main 0 started
In First Task.. Thread-0
In First Task.. Thread-1
In First Task.. Thread-2
Deregistering, Phase- 1 Completed
Phase in Main 0 completed
Deregistering, Phase- 1 Completed
Deregistering, Phase- 1 Completed
In SecondTask.. Thread-0
In SecondTask.. Thread-1
Phase in Main-2 2 completed
In SecondTask.. Phase-2 completedThread-0
In SecondTask.. Phase-2 completedThread-1

Phaser类中的方法

下面列出了Java中Phaser类的一些重要方法

  • 到达()–到达此移相器,而无需等待其他人到达。

  • ArgumentAndAwaitAdvance()–到达此移相器并等待其他人。

  • ArgumentAndDeregister()–到达此相位器并从中注销,而无需等待其他相位器到达。

  • awaitAdvance(int phase)–等待此相位器的相位从给定的相位值开始前进,如果当前相位不等于给定的相位值或者此相位器终止,则立即返回。

  • bulkRegister(int party)–在此移相器中添加给定数量的未到达的新参与者。

  • getArrivedParties()-返回已到达此相位器当前阶段的注册方的数量。

  • getParent()–返回此相位器的父级;如果没有,则返回null。

  • getPhase()–返回当前阶段编号。

  • isTerminated()–如果此相位器已终止,则返回true。

  • onAdvance(int phase,intregisteredParties)–一种可重写的方法,用于在即将到来的阶段前进时执行操作并控制终止。

  • register()–向此移相器添加一个新的未到达方。