Java中的LinkedTransferQueue

时间:2020-01-09 10:35:11  来源:igfitidea点击:

Java中的LinkedTransferQueue是TransferQueue接口的实现,并且是java.util.concurrent包的一部分。它是在Java 7中添加的。

Java中的TransferQueue接口

扩展BlockingQueue接口的TransferQueue接口增加了生产者可以等待使用者接收元素的功能。
在诸如ArrayBlockingQueue,PriorityBlockingQueue之类的BlockingQueue实现中,有一些操作会在检索元素时等待队列为空,并在存储元素时等待队列中的空间可用。在TransferQueue中,也有一些操作在元素级别阻塞。

Java TransferQueue方法

除了从BlockingQueue继承的方法外,TransferQueue还添加了以下方法来添加功能,使线程等待直到元素被另一个线程消耗为止。

  • transfer(E e)–将元素转移给使用者,并在必要时等待。
  • tryTransfer(E e)–如果存在已经在等待接收它的使用者,则立即传输指定的元素,否则返回false
  • tryTransfer(E e,long timeout,TimeUnit unit)–如果存在已经在等待接收它的使用者,则立即传输指定的元素。等待直到使用者接收到元素,如果在元素可以传输之前经过了指定的等待时间,则返回false。

TransferQueue还具有以下查询方法:

  • hasWaitingConsumer()–如果至少有一个消费者正在等待接收元素,则返回true。
  • getWaitingConsumerCount()–返回等待接收元素的使用者数量的估计值。

Java中的LinkedTransferQueue

LinkedTransferQueue是一个无界的TransferQueue,它将其元素存储为链接节点,其中每个节点都存储对下一个节点的引用。此队列中的元素以FIFO(先进先出)的方式排序。队列的开头是某个生产者在队列中停留时间最长的元素。队列的尾部是某个生产者最短时间进入队列的元素。

Java LinkedTransferQueue构造函数

  • LinkedTransferQueue()–创建一个最初为空的LinkedTransferQueue。
  • LinkedTransferQueue(Collection <?extends E> c)–创建一个LinkedTransferQueue,最初包含给定集合的元素,并以集合迭代器的遍历顺序添加。

LinkedTransferQueue Java示例

这是使用LinkedTransferQueue的Java生产者-消费者示例。在使用者线程中,有一个睡眠方法,该方法的时间为2秒,以使使用者线程暂停2秒,即使生产者线程等待该元素被使用者检索。

public class LinkedTQ {
  public static void main(String[] args) {
    TransferQueue<Integer> tQueue = new LinkedTransferQueue<>();
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.execute(new LinkedProducer(tQueue));
    executor.execute(new LinkedConsumer(tQueue));
    executor.shutdown();
  }
}

//Producer
class LinkedProducer implements Runnable{
  TransferQueue<Integer> tQueue;
  LinkedProducer(TransferQueue<Integer> tQueue){
    this.tQueue = tQueue;
  }
  @Override
  public void run() {
    for(int i = 0; i < 5; i++){
      try {
        System.out.println("Adding to queue-" + i);
        tQueue.transfer(i);    
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
}
//Consumer
class LinkedConsumer implements Runnable{
  TransferQueue<Integer> tQueue;
  LinkedConsumer(TransferQueue<Integer> tQueue){
    this.tQueue = tQueue;
  }
  @Override
  public void run() {
    for(int i = 0; i < 5; i++){
      try {
        // Delay of 2 seconds
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Consumer retrieved- " + tQueue.take());				
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
     }
  }
}

输出:

Adding to queue-0
Consumer retrieved- 0
Adding to queue-1
Consumer retrieved- 1
Adding to queue-2
Consumer retrieved- 2
Adding to queue-3
Consumer retrieved- 3
Adding to queue-4
Consumer retrieved- 4