Java中的ConcurrentLinkedQueue

时间:2020-01-09 10:35:12  来源:igfitidea点击:

Java中的ConcurrentLinkedQueue是线程安全的无界队列。它将其元素存储为链接节点,其中每个节点都存储对下一个节点的引用。 ConcurrentLinkedQueue类实现Queue接口,并且是java.util.concurrent包的一部分。

ConcurrentLinkedQueue与ArrayBlockingQueue之类的BlockingQueue实现有何不同,PriorityBlockingQueue是ConcurrentLinkedQueue是非阻塞的,因此该队列中的操作不会阻塞。由于ConcurrentLinkedQueue是非阻塞的,因此没有put()或者take()方法会在需要时阻塞。

该队列对元素FIFO(先进先出)进行排序。队列的开头是已在队列中最长时间的元素。队列的尾部是最短时间位于队列中的元素。新元素插入到队列的尾部,并且队列检索操作在队列的开头获取元素。

ConcurrentLinkedQueue不允许使用空元素

像大多数其他并发集合实现一样,此类也不允许使用null元素。

public class ConcurrentLQ {
  public static void main(String[] args) {
    Queue<Integer> conQueue = new ConcurrentLinkedQueue<>();
    conQueue.add(5);
    conQueue.add(null);
  }
}

输出:

Exception in thread "main" java.lang.NullPointerException
	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
	at java.base/java.util.concurrent.ConcurrentLinkedQueue.offer(ConcurrentLinkedQueue.java:355)
	at java.base/java.util.concurrent.ConcurrentLinkedQueue.add(ConcurrentLinkedQueue.java:283)
	at com.theitroad.programs.ConcurrentLQ.main(ConcurrentLQ.java:11)

如我们所见,尝试将null添加到队列会导致NullPointerException。

Java ConcurrentLinkedQueue构造函数

  • ConcurrentLinkedQueue()–创建一个最初为空的ConcurrentLinkedQueue。
  • ConcurrentLinkedQueue(Collection <?extends E> c)-创建一个ConcurrentLinkedQueue,最初包含给定集合的元素,并以集合迭代器的遍历顺序添加。

ConcurrentLinkedQueue Java示例

这是使用ConcurrentLinkedQueue的Java生产者-消费者示例。有一个生产者线程和两个消费者线程。

public class ConcurrentLQ {
  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(4);
    Queue<Integer> conQueue = new ConcurrentLinkedQueue<>();
    // One Producer thread
    executor.execute(new ConProducer(conQueue));
    // Two Consumer thread
    executor.execute(new ConConsumer(conQueue));
    executor.execute(new ConConsumer(conQueue));	
    executor.shutdown();
  }
}

//Producer
class ConProducer implements Runnable{
  Queue<Integer> conQueue;
  ConProducer(Queue<Integer> conQueue){
    this.conQueue = conQueue;
  }
  @Override
  public void run() {
    for(int i = 0; i < 6; i++){
      System.out.println("Adding to queue-" + i);
      conQueue.add(i);	
    }
  }
}
//Consumer
class ConConsumer implements Runnable{
  Queue<Integer> conQueue;
  ConConsumer(Queue<Integer> conQueue){
    this.conQueue = conQueue;
  }
  @Override
  public void run() {		
    for(int i = 0; i < 3; i++){
      try {
        TimeUnit.MILLISECONDS.sleep(50);			
        System.out.println("Thread Name -" + Thread.currentThread().getName() + " Consumer retrieved- " + conQueue.poll());
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
}

输出:

Adding to queue-0
Adding to queue-1
Adding to queue-2
Adding to queue-3
Adding to queue-4
Adding to queue-5
Thread Name -pool-1-thread-2 Consumer retrieved- 0
Thread Name -pool-1-thread-3 Consumer retrieved- 1
Thread Name -pool-1-thread-3 Consumer retrieved- 3
Thread Name -pool-1-thread-2 Consumer retrieved- 2
Thread Name -pool-1-thread-3 Consumer retrieved- 4
Thread Name -pool-1-thread-2 Consumer retrieved- 5