Java BlockingQueue示例
今天,我们将研究Java BlockingQueue。
java.util.concurrent.BlockingQueue是一个Java队列,它支持以下操作:在检索和删除元素时等待队列变为非空,并在添加元素时等待队列中的空间变为可用。
Java BlockingQueue
如果您尝试将null值存储在队列中,则Java BlockingQueue不接受null值并抛出NullPointerException。
Java BlockingQueue实现是线程安全的。
所有排队方法本质上都是原子的,并使用内部锁或者其他形式的并发控制。
Java BlockingQueue接口是Java集合框架的一部分,主要用于解决生产者消费者问题。
我们不必担心在BlockingQueue中等待生产者或者对象对消费者可用的空间,因为它是由BlockingQueue的实现类处理的。
Java提供了几种BlockingQueue实现,例如ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue等。
在BlockingQueue中实现生产者使用者问题时,我们将使用ArrayBlockingQueue实现。
以下是一些您应该知道的重要方法。
put(E e)
:此方法用于将元素插入队列。
如果队列已满,它将等待空间可用。E take()
:此方法从队列的开头检索并删除该元素。
如果队列为空,则等待元素可用。
现在使用Java BlockingQueue实现生产者消费者问题。
Java BlockingQueue示例–消息
只是一个普通的Java对象,它将由Producer生成并添加到队列中。
您也可以将其称为有效负载或者队列消息。
package com.theitroad.concurrency; public class Message { private String msg; public Message(String str){ this.msg=str; } public String getMsg() { return msg; } }
Java BlockingQueue示例–生产者
生产者类,它将创建消息并将其放入队列。
package com.theitroad.concurrency; import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue<Message> queue; public Producer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { //produce messages for(int i=0; i<100; i++){ Message msg = new Message(""+i); try { Thread.sleep(i); queue.put(msg); System.out.println("Produced "+msg.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } //adding exit message Message msg = new Message("exit"); try { queue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } }
Java BlockingQueue示例–使用者
消费者类,它将处理来自队列的消息,并在收到退出消息时终止。
package com.theitroad.concurrency; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable{ private BlockingQueue<Message> queue; public Consumer(BlockingQueue<Message> q){ this.queue=q; } @Override public void run() { try{ Message msg; //consuming messages until exit message is received while((msg = queue.take()).getMsg() !="exit"){ Thread.sleep(10); System.out.println("Consumed "+msg.getMsg()); } }catch(InterruptedException e) { e.printStackTrace(); } } }
Java BlockingQueue示例–服务
最后,我们必须为生产者和消费者创建BlockingQueue服务。
该生产者消费者服务将创建具有固定大小的BlockingQueue,并与生产者和消费者共享。
该服务将启动生产者和使用者线程并退出。
package com.theitroad.concurrency; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerService { public static void main(String[] args) { //Creating BlockingQueue of size 10 BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); //starting producer to produce messages in queue new Thread(producer).start(); //starting consumer to consume messages from queue new Thread(consumer).start(); System.out.println("Producer and Consumer has been started"); } }
上面的Java BlockingQueue示例程序的输出如下所示。
Producer and Consumer has been started Produced 0 Produced 1 Produced 2 Produced 3 Produced 4 Consumed 0 Produced 5 Consumed 1 Produced 6 Produced 7 Consumed 2 Produced 8 ...
生产者和使用者使用Java Thread sleep来延迟产生和使用消息。