Java BlockingQueue示例

时间:2020-02-23 14:36:23  来源:igfitidea点击:

今天,我们将研究Java BlockingQueue。
java.util.concurrent.BlockingQueue是一个Java队列,它支持以下操作:在检索和删除元素时等待队列变为非空,并在添加元素时等待队列中的空间变为可用。

Java BlockingQueue

如果您尝试将null值存储在队列中,则Java BlockingQueue不接受null值并抛出NullPointerException。

Java BlockingQueue实现是线程安全的。
所有排队方法本质上都是原子的,并使用内部锁或者其他形式的并发控制。

Java BlockingQueue接口是Java集合框架的一部分,主要用于解决生产者消费者问题。
我们不必担心在BlockingQueue中等待生产者或者对象对消费者可用的空间,因为它是由BlockingQueue的实现类处理的。

Java提供了几种BlockingQueue实现,例如ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue等。

在BlockingQueue中实现生产者使用者问题时,我们将使用ArrayBlockingQueue实现。
以下是一些您应该知道的重要方法。

  • put(E e):此方法用于将元素插入队列。
    如果队列已满,它将等待空间可用。

  • E take():此方法从队列的开头检索并删除该元素。
    如果队列为空,则等待元素可用。

现在使用Java BlockingQueue实现生产者消费者问题。

Java BlockingQueue示例–消息

只是一个普通的Java对象,它将由Producer生成并添加到队列中。
您也可以将其称为有效负载或者队列消息。

package com.theitroad.concurrency;

public class Message {
  private String msg;
  
  public Message(String str){
      this.msg=str;
  }

  public String getMsg() {
      return msg;
  }

}

Java BlockingQueue示例–生产者

生产者类,它将创建消息并将其放入队列。

package com.theitroad.concurrency;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

  private BlockingQueue<Message> queue;
  
  public Producer(BlockingQueue<Message> q){
      this.queue=q;
  }
  @Override
  public void run() {
      //produce messages
      for(int i=0; i<100; i++){
          Message msg = new Message(""+i);
          try {
              Thread.sleep(i);
              queue.put(msg);
              System.out.println("Produced "+msg.getMsg());
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
      //adding exit message
      Message msg = new Message("exit");
      try {
          queue.put(msg);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }

}

Java BlockingQueue示例–使用者

消费者类,它将处理来自队列的消息,并在收到退出消息时终止。

package com.theitroad.concurrency;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

private BlockingQueue<Message> queue;
  
  public Consumer(BlockingQueue<Message> q){
      this.queue=q;
  }

  @Override
  public void run() {
      try{
          Message msg;
          //consuming messages until exit message is received
          while((msg = queue.take()).getMsg() !="exit"){
          Thread.sleep(10);
          System.out.println("Consumed "+msg.getMsg());
          }
      }catch(InterruptedException e) {
          e.printStackTrace();
      }
  }
}

Java BlockingQueue示例–服务

最后,我们必须为生产者和消费者创建BlockingQueue服务。
该生产者消费者服务将创建具有固定大小的BlockingQueue,并与生产者和消费者共享。
该服务将启动生产者和使用者线程并退出。

package com.theitroad.concurrency;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerService {

  public static void main(String[] args) {
      //Creating BlockingQueue of size 10
      BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);
      //starting producer to produce messages in queue
      new Thread(producer).start();
      //starting consumer to consume messages from queue
      new Thread(consumer).start();
      System.out.println("Producer and Consumer has been started");
  }

}

上面的Java BlockingQueue示例程序的输出如下所示。

Producer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...

生产者和使用者使用Java Thread sleep来延迟产生和使用消息。