Java中的ConcurrentLinkedQueue
时间:2020-01-09 10:35:12 来源:igfitidea点击:
Java中的ConcurrentLinkedQueue是线程安全的无界队列。它将其元素存储为链接节点,其中每个节点都存储对下一个节点的引用。 ConcurrentLinkedQueue类实现Queue接口,并且是java.util.concurrent包的一部分。
ConcurrentLinkedQueue与ArrayBlockingQueue之类的BlockingQueue实现有何不同,PriorityBlockingQueue是ConcurrentLinkedQueue是非阻塞的,因此该队列中的操作不会阻塞。由于ConcurrentLinkedQueue是非阻塞的,因此没有put()或者take()方法会在需要时阻塞。
该队列对元素FIFO(先进先出)进行排序。队列的开头是已在队列中最长时间的元素。队列的尾部是最短时间位于队列中的元素。新元素插入到队列的尾部,并且队列检索操作在队列的开头获取元素。
ConcurrentLinkedQueue不允许使用空元素
像大多数其他并发集合实现一样,此类也不允许使用null元素。
public class ConcurrentLQ { public static void main(String[] args) { Queue<Integer> conQueue = new ConcurrentLinkedQueue<>(); conQueue.add(5); conQueue.add(null); } }
输出:
Exception in thread "main" java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at java.base/java.util.concurrent.ConcurrentLinkedQueue.offer(ConcurrentLinkedQueue.java:355) at java.base/java.util.concurrent.ConcurrentLinkedQueue.add(ConcurrentLinkedQueue.java:283) at com.theitroad.programs.ConcurrentLQ.main(ConcurrentLQ.java:11)
如我们所见,尝试将null添加到队列会导致NullPointerException。
Java ConcurrentLinkedQueue构造函数
- ConcurrentLinkedQueue()–创建一个最初为空的ConcurrentLinkedQueue。
- ConcurrentLinkedQueue(Collection <?extends E> c)-创建一个ConcurrentLinkedQueue,最初包含给定集合的元素,并以集合迭代器的遍历顺序添加。
ConcurrentLinkedQueue Java示例
这是使用ConcurrentLinkedQueue的Java生产者-消费者示例。有一个生产者线程和两个消费者线程。
public class ConcurrentLQ { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(4); Queue<Integer> conQueue = new ConcurrentLinkedQueue<>(); // One Producer thread executor.execute(new ConProducer(conQueue)); // Two Consumer thread executor.execute(new ConConsumer(conQueue)); executor.execute(new ConConsumer(conQueue)); executor.shutdown(); } } //Producer class ConProducer implements Runnable{ Queue<Integer> conQueue; ConProducer(Queue<Integer> conQueue){ this.conQueue = conQueue; } @Override public void run() { for(int i = 0; i < 6; i++){ System.out.println("Adding to queue-" + i); conQueue.add(i); } } } //Consumer class ConConsumer implements Runnable{ Queue<Integer> conQueue; ConConsumer(Queue<Integer> conQueue){ this.conQueue = conQueue; } @Override public void run() { for(int i = 0; i < 3; i++){ try { TimeUnit.MILLISECONDS.sleep(50); System.out.println("Thread Name -" + Thread.currentThread().getName() + " Consumer retrieved- " + conQueue.poll()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
输出:
Adding to queue-0 Adding to queue-1 Adding to queue-2 Adding to queue-3 Adding to queue-4 Adding to queue-5 Thread Name -pool-1-thread-2 Consumer retrieved- 0 Thread Name -pool-1-thread-3 Consumer retrieved- 1 Thread Name -pool-1-thread-3 Consumer retrieved- 3 Thread Name -pool-1-thread-2 Consumer retrieved- 2 Thread Name -pool-1-thread-3 Consumer retrieved- 4 Thread Name -pool-1-thread-2 Consumer retrieved- 5