Java中的CyclicBarrier

时间:2020-01-09 10:35:10  来源:igfitidea点击:

java.util.concurrent包提供了许多同步器实用程序,这些实用程序涵盖了多个线程相互通信且需要某种同步器来调节线程之间的交互的用例。 Java中的CyclicBarrier就是这样一种同步辅助工具,当我们希望线程在公共执行点等待,直到集合中的所有线程都到达该公共障碍点时,该控件非常有用。

Java中的CyclicBarrier类

CyclicBarrier类是在Java 1.5中添加的,它是Java中java.util.concurrent包的一部分。该类名为CyclicBarrier,因为它可以在释放等待线程之后重用。

CyclicBarrier如何在Java中工作

使用给定的值初始化CyclicBarrier,并且当在屏障处等待的线程数等于该值时,屏障会跳闸。
为了使线程在此屏障处等待,在线程上调用await()方法。
调用await()方法的线程表示该线程已到达公共屏障点,并且该线程被禁用,直到所需数量的线程已调用await()方法为止,此时屏障被触发并且线程被解除阻塞。这样线程可以同步其执行。

Java CyclicBarrier构造函数

  • CyclicBarrier(int party)–创建一个新的CyclicBarrier,当给定数量的参与者(线程)在等待它时,它将跳闸。

  • CyclicBarrier(int party,Runnable barrierAction)–创建一个新的CyclicBarrier,它将在给定数目的参与者(线程)等待时跳闸,并在跳闸时执行给定的屏障动作,由最后一个进入线程执行障碍。

Java CyclicBarrier方法

  • await()–等待直到所有各方都在此障碍上调用了await。

  • await(长超时,TimeUnit单位)–等待,直到所有各方都在此屏障上调用了await,或者经过了指定的等待时间。

  • getNumberWaiting()-返回当前在障碍处等待的参与者数量。

  • getParties()–返回突破此障碍所需的参与方数量。

  • isBroken()–查询此屏障是否处于断开状态。

  • reset()–将屏障重置为其初始状态。

CyclicBarrier Java示例

在示例中,创建了三个线程,并且在这些线程之间共享ArrayList。所有这些线程都处理一些数据,并将结果放入ArrayList中。仅在所有三个线程均已完成并调用await()之后,才应开始进一步处理。
在这种情况下,我们将使用初始化为值3的CyclicBarrier以及可运行的动作进行进一步处理。

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicDemo {
  public static void main(String[] args) {		
    List<Integer> dataList = Collections.synchronizedList(new ArrayList<Integer>());
    // Initializing cyclicbarrier
    CyclicBarrier cb = new CyclicBarrier(3, new ListBarrierAction(dataList));
    // starting threads
    for(int i = 0; i < 3; i++) {
      new Thread(new ListWorker(dataList, cb)).start();;
    }
  }    
}

class ListWorker implements Runnable{
  private CyclicBarrier cb;
  private List<Integer> dataList;
  ListWorker(List<Integer> dataList, CyclicBarrier cb) {
    this.dataList = dataList;
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("Executing run method for thread - " + Thread.currentThread().getName());
    for(int i = 0; i < 10; i++) {
      dataList.add(i);
    }
    
    try {
      System.out.println("Calling await.. " + Thread.currentThread().getName());
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}
// Barrier action to be executed when barrier is tripped
class ListBarrierAction implements Runnable {
  private List<Integer> dataList;
  ListBarrierAction(List<Integer> dataList){
    this.dataList = dataList;
  }
  @Override
  public void run() {
    System.out.println("In ListBarrierAction, start further processing on list with length " + dataList.size());
  }
}

输出:

Executing run method for thread - Thread-0
Calling await.. Thread-0
Executing run method for thread - Thread-2
Executing run method for thread - Thread-1
Calling await.. Thread-2
Calling await.. Thread-1
In ListBarrierAction, start further processing on list with length 30

调用await方法时会发生什么

当在当前线程上调用CyclicBarrier类的await()方法并且当前线程不是最后一个到达时,则出于线程调度目的将其禁用,并使其处于休眠状态,直到发生以下情况之一:

  • 最后一个线程到达;或者

  • 其他一些线程中断当前线程。或者

  • 其他一些线程中断其他正在等待的线程之一;或者

  • 等待屏障时其他线程超时;或者

  • 其他一些线程在此屏障上调用reset()。

如果当前线程是最后到达的线程,则

如果构造函数中提供了非null屏障操作,则当前线程将在允许其他线程继续运行之前运行该操作。如果在屏障操作期间发生异常,则该异常将在当前线程中传播,并且屏障处于断开状态。

CyclicBarrier中的BrokenBarrierException

如果在障碍点等待时任何线程被中断,则所有其他等待线程将抛出BrokenBarrierException并将障碍置于断开状态。

如果在任何线程正在等待时屏障重置(),则会引发BrokenBarrierException。

CyclicBarrier是可重用的

与无法重用的其他同步辅助工具CountDownLatch不同,Java中的CyclicBarrier可以在释放等待线程之后重用。

让我们重用与上面相同的示例,但是现在CyclicBarrier初始化为值4,因为我们也将等待主线程。一旦从屏障释放了三个线程的初始集合,这些新线程将使用相同的CyclicBarrier对象,则将再启动三个线程。

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicDemo {
  public static void main(String[] args) {     
  List<Integer> dataList = Collections.synchronizedList(new ArrayList<Integer>());
  // Initializing cyclicbarrier
  CyclicBarrier cb = new CyclicBarrier(4, new ListBarrierAction(dataList));
  // starting threads
  for(int i = 0; i < 3; i++) {
    new Thread(new ListWorker(dataList, cb)).start();;
  }
  try {
    // Calling await for main thread
    cb.await();
  } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  } catch (BrokenBarrierException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  System.out.println("done with initial set of threads, starting again reusing the same cyclicbarrier object");
  dataList = Collections.synchronizedList(new ArrayList<Integer>());
  // Starting another set of threads
  for(int i = 0; i < 3; i++) {
    new Thread(new ListWorker(dataList, cb)).start();;
  }    
  try {
    // Calling await for main thread
    cb.await();
  } catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  } catch (BrokenBarrierException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  System.out.println("In main thread...");
  }    
}

class ListWorker implements Runnable{
  private CyclicBarrier cb;
  private List<Integer> dataList;
  ListWorker(List<Integer> dataList, CyclicBarrier cb) {
    this.dataList = dataList;
    this.cb = cb;
  }
  @Override
  public void run() {
    System.out.println("Executing run method for thread - " + Thread.currentThread().getName());
    for(int i = 0; i < 10; i++) {
      dataList.add(i);
    }
    
    try {
      System.out.println("Calling await.. " + Thread.currentThread().getName());
      cb.await();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}
// Barrier action to be executed when barrier is tripped
class ListBarrierAction implements Runnable {
  private List<Integer> dataList;
  ListBarrierAction(List<Integer> dataList){
    this.dataList = dataList;
  }
  @Override
  public void run() {
    System.out.println("In ListBarrierAction, start further processing on list with length " + dataList.size());
  }
}

输出:

Executing run method for thread - Thread-0
Executing run method for thread - Thread-1
Executing run method for thread - Thread-2
Calling await.. Thread-2
Calling await.. Thread-0
Calling await.. Thread-1
In ListBarrierAction, start further processing on list with length 30
done with initial set of threads, starting again reusing the same cyclicbarrier object
Executing run method for thread - Thread-4
Calling await.. Thread-4
Executing run method for thread - Thread-3
Executing run method for thread - Thread-5
Calling await.. Thread-5
Calling await.. Thread-3
In ListBarrierAction, start further processing on list with length 30
In main thread...