生产者-消费者问题Java程序

时间:2020-01-09 10:35:35  来源:igfitidea点击:

在这篇文章中,我们将看到使用线程解决生产者-消费者问题的Java程序。

目录

  • 生产者消费者问题

  • 生产者-消费者Java程序

  • 生产者-消费者的Java程序,使用wait-notify

  • 使用BlockingQueue的生产者-消费者Java程序

生产者消费者问题

生产者使用者是一个经典的并发问题,其中同步和线程间通信要求正确执行。

在生产者-消费者问题中,生产者和消费者有两个进程共享一个称为队列的公共有界缓冲区。

  • 生产者进程生成数据并将其插入共享队列。

  • 使用者进程使用共享队列中的数据。

这里的要求是,生产者不应尝试将数据添加到共享缓冲区(如果已满),而应等待队列中有空间容纳新元素。以同样的方式,使用者不应尝试使用空缓冲区中的数据,而应等待将数据插入队列中。

生产者-消费者Java程序

由于正确执行Producer-Consumer需要进行线程间通信,因此可以使用wait-notify方法编写该程序。

我们还可以使用Java并发包,其中添加了许多队列实现。使用ArrayBlockingQueue,我们可以轻松地用Java实现Producer-Consumer程序。

生产者-消费者的Java程序,使用wait-notify

在Java程序中,需要一个共享缓冲区,供生产者和使用者进程使用,以便可以使用LinkedList实例。

生产者和使用者还有两个Runnable任务,由两个单独的线程执行。将值添加到队列后,生产者应通知使用者任务唤醒,并应进入等待状态。

如果队列为空,则使用者任务应以相同的方式处于等待状态。

import java.util.LinkedList;
// Producer task
class Producer implements Runnable{
  LinkedList<Integer> list;
  Producer(LinkedList<Integer> list){
    this.list = list;
  }
  @Override
  public void run() {
    for(int i = 1; i <= 5; i++){
      synchronized(list) {
        // If there is already an element in the list wait
        while(list.size() >= 1){
          System.out.println("Waiting as queue is full..");
          try {
            list.wait();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("Adding to queue- " + Thread.currentThread().getName() + " " + i);
        list.add(i);
        list.notify();    
      }
    }		
  }
}
//Consumer task
class Consumer implements Runnable{
  LinkedList<Integer> list;
  Consumer(LinkedList<Integer> list){
    this.list = list;
  }
  @Override
  public void run() {
    for(int i = 1; i <= 5; i++){
      synchronized(list) {
        // if there is no element in the list wait
        while(list.size() < 1){
          System.out.println("Waiting as queue is empty..");
          try {
            list.wait();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        // if there is element in the list then retrieve it
        System.out.println("Consuming from queue- " + Thread.currentThread().getName() + " " + list.remove());
        list.notify();  
      }
    }		
  }
}

public class ProducerConsumer {
  public static void main(String[] args) {
    // shared list
    LinkedList<Integer> list = new LinkedList<Integer>();
    Thread t1 = new Thread(new Producer(list), "Producer");
    Thread t2 = new Thread(new Consumer(list), "Consumer");
    t1.start();
    t2.start(); 
  }
}

输出:

Adding to queue- Producer 1
Waiting as queue is full..
Consuming from queue- Consumer 1
Waiting as queue is empty..
Adding to queue- Producer 2
Waiting as queue is full..
Consuming from queue- Consumer 2
Waiting as queue is empty..
Adding to queue- Producer 3
Waiting as queue is full..
Consuming from queue- Consumer 3
Waiting as queue is empty..
Adding to queue- Producer 4
Waiting as queue is full..
Consuming from queue- Consumer 4
Waiting as queue is empty..
Adding to queue- Producer 5
Consuming from queue- Consumer 5

使用BlockingQueue的面向生产者-消费者的Java程序

使用ArrayBlockingQueue之类的BlockingQueue实现,我们可以轻松地用Java实现Producer-Consumer程序。
BlockingQueue具有put()方法用于添加到队列中,如果队列容量已满,它将阻塞。同样,有一个take()方法可从队列的开头检索,如果没有可用的元素,它将阻塞。

在容量为1的代码ArrayBlockingQueue中创建,因此队列将只有一个元素并且插入将被阻塞,直到检索到该元素为止。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// Producer task
class Producer implements Runnable{
  BlockingQueue<Integer> queue;
  Producer(BlockingQueue<Integer> queue){
    this.queue = queue;
  }
  @Override
  public void run() {
    for(int i = 1; i <= 5; i++){           
      try {
        queue.put(i);
        System.out.println("Adding to queue- " + i);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }   
    }
  }		
}
//Consumer task
class Consumer implements Runnable{
  BlockingQueue<Integer> queue;
  Consumer(BlockingQueue<Integer> queue){
    this.queue = queue;
  }
  @Override
  public void run() {
    for(int i = 1; i <= 5; i++){
      try {
        // if there is element in the list then retrieve it
        System.out.println("Consuming from queue- "  + queue.take());
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }            
  }
}

public class ProducerConsumer {
  public static void main(String[] args) {
    BlockingQueue<Integer> bQueue = new ArrayBlockingQueue<Integer>(1);
    Thread t1 = new Thread(new Producer(bQueue), "Producer");
    Thread t2 = new Thread(new Consumer(bQueue), "Consumer");
    t1.start();
    t2.start(); 
  }
}

输出:

Adding to queue- 1
Consuming from queue- 1
Adding to queue- 2
Consuming from queue- 2
Adding to queue- 3
Consuming from queue- 3
Adding to queue- 4
Consuming from queue- 4
Adding to queue- 5
Consuming from queue- 5

如我们所见,使用ArrayBlockingQueue无需编写用于同步线程的逻辑,也无需调用等待并明确通知,从而使编写生产者-消费者Java程序非常简单。使用Lambda表达式可以使其更紧凑。

public class ArrayBQ {
  public static void main(String[] args) {
    // BlockingQueue of capacity 1
    BlockingQueue<Integer> bQueue = new ArrayBlockingQueue<Integer>(1);
    // Producer 
    new Thread(()->{
      for(int i = 0; i < 5; i++){
        try {
          bQueue.put(i);
          System.out.println("Added to queue-" + i);  
          
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    }).start();
        
    // Consumer
    new Thread(()->{
      for(int i = 0; i < 5; i++){
        try {
          System.out.println("Consumer retrieved- " + bQueue.take());
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }
    }).start();
  }
}