生产者-消费者问题Java程序
在这篇文章中,我们将看到使用线程解决生产者-消费者问题的Java程序。
目录
生产者消费者问题
生产者-消费者Java程序
生产者-消费者的Java程序,使用wait-notify
使用BlockingQueue的生产者-消费者Java程序
生产者消费者问题
生产者使用者是一个经典的并发问题,其中同步和线程间通信要求正确执行。
在生产者-消费者问题中,生产者和消费者有两个进程共享一个称为队列的公共有界缓冲区。
生产者进程生成数据并将其插入共享队列。
使用者进程使用共享队列中的数据。
这里的要求是,生产者不应尝试将数据添加到共享缓冲区(如果已满),而应等待队列中有空间容纳新元素。以同样的方式,使用者不应尝试使用空缓冲区中的数据,而应等待将数据插入队列中。
生产者-消费者Java程序
由于正确执行Producer-Consumer需要进行线程间通信,因此可以使用wait-notify方法编写该程序。
我们还可以使用Java并发包,其中添加了许多队列实现。使用ArrayBlockingQueue,我们可以轻松地用Java实现Producer-Consumer程序。
生产者-消费者的Java程序,使用wait-notify
在Java程序中,需要一个共享缓冲区,供生产者和使用者进程使用,以便可以使用LinkedList实例。
生产者和使用者还有两个Runnable任务,由两个单独的线程执行。将值添加到队列后,生产者应通知使用者任务唤醒,并应进入等待状态。
如果队列为空,则使用者任务应以相同的方式处于等待状态。
import java.util.LinkedList; // Producer task class Producer implements Runnable{ LinkedList<Integer> list; Producer(LinkedList<Integer> list){ this.list = list; } @Override public void run() { for(int i = 1; i <= 5; i++){ synchronized(list) { // If there is already an element in the list wait while(list.size() >= 1){ System.out.println("Waiting as queue is full.."); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Adding to queue- " + Thread.currentThread().getName() + " " + i); list.add(i); list.notify(); } } } } //Consumer task class Consumer implements Runnable{ LinkedList<Integer> list; Consumer(LinkedList<Integer> list){ this.list = list; } @Override public void run() { for(int i = 1; i <= 5; i++){ synchronized(list) { // if there is no element in the list wait while(list.size() < 1){ System.out.println("Waiting as queue is empty.."); try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // if there is element in the list then retrieve it System.out.println("Consuming from queue- " + Thread.currentThread().getName() + " " + list.remove()); list.notify(); } } } } public class ProducerConsumer { public static void main(String[] args) { // shared list LinkedList<Integer> list = new LinkedList<Integer>(); Thread t1 = new Thread(new Producer(list), "Producer"); Thread t2 = new Thread(new Consumer(list), "Consumer"); t1.start(); t2.start(); } }
输出:
Adding to queue- Producer 1 Waiting as queue is full.. Consuming from queue- Consumer 1 Waiting as queue is empty.. Adding to queue- Producer 2 Waiting as queue is full.. Consuming from queue- Consumer 2 Waiting as queue is empty.. Adding to queue- Producer 3 Waiting as queue is full.. Consuming from queue- Consumer 3 Waiting as queue is empty.. Adding to queue- Producer 4 Waiting as queue is full.. Consuming from queue- Consumer 4 Waiting as queue is empty.. Adding to queue- Producer 5 Consuming from queue- Consumer 5
使用BlockingQueue的面向生产者-消费者的Java程序
使用ArrayBlockingQueue之类的BlockingQueue实现,我们可以轻松地用Java实现Producer-Consumer程序。
BlockingQueue具有put()方法用于添加到队列中,如果队列容量已满,它将阻塞。同样,有一个take()方法可从队列的开头检索,如果没有可用的元素,它将阻塞。
在容量为1的代码ArrayBlockingQueue中创建,因此队列将只有一个元素并且插入将被阻塞,直到检索到该元素为止。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; // Producer task class Producer implements Runnable{ BlockingQueue<Integer> queue; Producer(BlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { for(int i = 1; i <= 5; i++){ try { queue.put(i); System.out.println("Adding to queue- " + i); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //Consumer task class Consumer implements Runnable{ BlockingQueue<Integer> queue; Consumer(BlockingQueue<Integer> queue){ this.queue = queue; } @Override public void run() { for(int i = 1; i <= 5; i++){ try { // if there is element in the list then retrieve it System.out.println("Consuming from queue- " + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class ProducerConsumer { public static void main(String[] args) { BlockingQueue<Integer> bQueue = new ArrayBlockingQueue<Integer>(1); Thread t1 = new Thread(new Producer(bQueue), "Producer"); Thread t2 = new Thread(new Consumer(bQueue), "Consumer"); t1.start(); t2.start(); } }
输出:
Adding to queue- 1 Consuming from queue- 1 Adding to queue- 2 Consuming from queue- 2 Adding to queue- 3 Consuming from queue- 3 Adding to queue- 4 Consuming from queue- 4 Adding to queue- 5 Consuming from queue- 5
如我们所见,使用ArrayBlockingQueue无需编写用于同步线程的逻辑,也无需调用等待并明确通知,从而使编写生产者-消费者Java程序非常简单。使用Lambda表达式可以使其更紧凑。
public class ArrayBQ { public static void main(String[] args) { // BlockingQueue of capacity 1 BlockingQueue<Integer> bQueue = new ArrayBlockingQueue<Integer>(1); // Producer new Thread(()->{ for(int i = 0; i < 5; i++){ try { bQueue.put(i); System.out.println("Added to queue-" + i); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); // Consumer new Thread(()->{ for(int i = 0; i < 5; i++){ try { System.out.println("Consumer retrieved- " + bQueue.take()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } }