Java中的LinkedTransferQueue
时间:2020-01-09 10:35:11 来源:igfitidea点击:
Java中的LinkedTransferQueue是TransferQueue接口的实现,并且是java.util.concurrent包的一部分。它是在Java 7中添加的。
Java中的TransferQueue接口
扩展BlockingQueue接口的TransferQueue接口增加了生产者可以等待使用者接收元素的功能。
在诸如ArrayBlockingQueue,PriorityBlockingQueue之类的BlockingQueue实现中,有一些操作会在检索元素时等待队列为空,并在存储元素时等待队列中的空间可用。在TransferQueue中,也有一些操作在元素级别阻塞。
Java TransferQueue方法
除了从BlockingQueue继承的方法外,TransferQueue还添加了以下方法来添加功能,使线程等待直到元素被另一个线程消耗为止。
- transfer(E e)–将元素转移给使用者,并在必要时等待。
- tryTransfer(E e)–如果存在已经在等待接收它的使用者,则立即传输指定的元素,否则返回false
- tryTransfer(E e,long timeout,TimeUnit unit)–如果存在已经在等待接收它的使用者,则立即传输指定的元素。等待直到使用者接收到元素,如果在元素可以传输之前经过了指定的等待时间,则返回false。
TransferQueue还具有以下查询方法:
- hasWaitingConsumer()–如果至少有一个消费者正在等待接收元素,则返回true。
- getWaitingConsumerCount()–返回等待接收元素的使用者数量的估计值。
Java中的LinkedTransferQueue
LinkedTransferQueue是一个无界的TransferQueue,它将其元素存储为链接节点,其中每个节点都存储对下一个节点的引用。此队列中的元素以FIFO(先进先出)的方式排序。队列的开头是某个生产者在队列中停留时间最长的元素。队列的尾部是某个生产者最短时间进入队列的元素。
Java LinkedTransferQueue构造函数
- LinkedTransferQueue()–创建一个最初为空的LinkedTransferQueue。
- LinkedTransferQueue(Collection <?extends E> c)–创建一个LinkedTransferQueue,最初包含给定集合的元素,并以集合迭代器的遍历顺序添加。
LinkedTransferQueue Java示例
这是使用LinkedTransferQueue的Java生产者-消费者示例。在使用者线程中,有一个睡眠方法,该方法的时间为2秒,以使使用者线程暂停2秒,即使生产者线程等待该元素被使用者检索。
public class LinkedTQ { public static void main(String[] args) { TransferQueue<Integer> tQueue = new LinkedTransferQueue<>(); ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new LinkedProducer(tQueue)); executor.execute(new LinkedConsumer(tQueue)); executor.shutdown(); } } //Producer class LinkedProducer implements Runnable{ TransferQueue<Integer> tQueue; LinkedProducer(TransferQueue<Integer> tQueue){ this.tQueue = tQueue; } @Override public void run() { for(int i = 0; i < 5; i++){ try { System.out.println("Adding to queue-" + i); tQueue.transfer(i); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //Consumer class LinkedConsumer implements Runnable{ TransferQueue<Integer> tQueue; LinkedConsumer(TransferQueue<Integer> tQueue){ this.tQueue = tQueue; } @Override public void run() { for(int i = 0; i < 5; i++){ try { // Delay of 2 seconds TimeUnit.SECONDS.sleep(2); System.out.println("Consumer retrieved- " + tQueue.take()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
输出:
Adding to queue-0 Consumer retrieved- 0 Adding to queue-1 Consumer retrieved- 1 Adding to queue-2 Consumer retrieved- 2 Adding to queue-3 Consumer retrieved- 3 Adding to queue-4 Consumer retrieved- 4