Java中的ConcurrentLinkedDeque

时间:2020-01-09 10:35:12  来源:igfitidea点击:

Java中的ConcurrentLinkedDeque是线程安全的无限制并发双端队列。它将其元素存储为链接节点,其中每个节点都存储对上一个和下一个节点的引用。 Java中的ConcurrentLinkedDeque实现Deque接口,并且是java.util.concurrent包的一部分。

ConcurrentLinkedDeque与诸如LinkedBlockingDeque这样的阻塞式Deque实现的不同之处在于ConcurrentLinkedDeque是非阻塞的,因此该队列中的操作不会阻塞。由于ConcurrentLinkedDeque是非阻塞的,因此无法使用如putFirst(),takeFirst()或者putLast(),takeLast()之类的阻塞方法(如果需要的话)。

ConcurrentLinkedDeque与其对应的ConcurrentLinkedQueue相似,不同之处在于它是一个双端队列。这意味着ConcurrentLinkedDeque允许从两端插入和移除。 ConcurrentLinkedDeque具有诸如addFirst(),addLast(),removeFirst(),removeLast()之类的方法,以便于从两端插入和删除。

ConcurrentLinkedDeque不允许使用空元素

与大多数其他并发集合实现一样,此类不允许使用null元素。

public class ConcurrentLDQ {
  public static void main(String[] args) {
    Deque<Integer> conDeque = new ConcurrentLinkedDeque<>();
    conDeque.add(10);
    conDeque.addLast(null);
  }
}

输出:

Exception in thread "main" java.lang.NullPointerException
	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
	at java.base/java.util.concurrent.ConcurrentLinkedDeque.linkLast(ConcurrentLinkedDeque.java:347)
	at java.base/java.util.concurrent.ConcurrentLinkedDeque.addLast(ConcurrentLinkedDeque.java:840)
	at com.theitroad.programs.ConcurrentLDQ.main(ConcurrentLDQ.java:11)

如我们所见,尝试将null添加到双端队列会导致NullPointerException。

Java ConcurrentLinkedDeque构造函数

  • ConcurrentLinkedDeque()–构造一个空的双端队列。
  • ConcurrentLinkedDeque(Collection <?extends E> c)–构造一个双端队列,此双端队列最初包含给定集合的元素,并以该集合的迭代器的遍历顺序添加。

ConcurrentLinkedDeque Java示例

这是使用ConcurrentLinkedDeque的Java生产者-消费者示例。有一个生产者线程和两个消费者线程。

public class ConcurrentLDQ {
  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(4);
    Deque<Integer> conDeque = new ConcurrentLinkedDeque<>();
    // One Producer thread
    executor.execute(new ConProducer(conDeque));
    // Two Consumer thread
    executor.execute(new ConConsumer(conDeque));
    executor.execute(new ConConsumer(conDeque));    
    executor.shutdown();
  }
}

//Producer
class ConProducer implements Runnable{
  Deque<Integer> conDeque;
  ConProducer(Deque<Integer> conDeque){
    this.conDeque = conDeque;
  }
  @Override
  public void run() {
    for(int i = 0; i < 6; i++){
      System.out.println("Adding to queue-" + i);
      conDeque.addFirst(i);    
    }
  }
}

//Consumer
class ConConsumer implements Runnable{
  Deque<Integer> conDeque;
  ConConsumer(Deque<Integer> conDeque){
    this.conDeque = conDeque;
  }
  @Override
  public void run() {        
    for(int i = 0; i < 3; i++){
      try {
        TimeUnit.MILLISECONDS.sleep(10);                
        System.out.println("Thread Name -" + Thread.currentThread().getName() + 
            " Consumer retrieved- " + conDeque.pollLast());
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
}

输出:

Adding to queue-0
Adding to queue-1
Adding to queue-2
Adding to queue-3
Adding to queue-4
Adding to queue-5
Thread Name -pool-1-thread-3 Consumer retrieved- 0
Thread Name -pool-1-thread-2 Consumer retrieved- 1
Thread Name -pool-1-thread-3 Consumer retrieved- 2
Thread Name -pool-1-thread-2 Consumer retrieved- 3
Thread Name -pool-1-thread-3 Consumer retrieved- 4
Thread Name -pool-1-thread-2 Consumer retrieved- 5