Java中的自定义BlockingQueue实现
时间:2020-02-23 14:34:47 来源:igfitidea点击:
在本教程中,我们将看到如何创建自己的自定义BlockingQueue。
这是BlockingQueue的简单实现。
- 我们将使用数组在内部存储BlockingQueue中的元素。
此阵列的大小定义一次可以驻留在BlockingQueue中的最大元素数。
- 我们将使用
lock
和conditions
对象创建自定义BlockingQueue。 - 在将元素放在队列中,如果队列已满,则生产者将会
wait
队列要为空。 - 虽然从队列中消耗元素,但如果队列为空,则
consumer
会等待queue
充满填满。
创建一个名为的类 CustomBlockingQueue.java
package org.arpit.theitroad; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class CustomBlockingQueue { final Lock lock = new ReentrantLock(); //Conditions final Condition produceCond = lock.newCondition(); final Condition consumeCond = lock.newCondition(); //Array to store element for CustomBlockingQueue final Object[] array = new Object[6]; int putIndex, takeIndex; int count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == array.length){ //Queue is full, producers need to wait produceCond.await(); } array[putIndex] = x; System.out.println("Producing - " + x); putIndex++; if (putIndex == array.length){ putIndex = 0; } //Increment the count for the array ++count; consumeCond.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0){ //Queue is empty, consumers need to wait consumeCond.await(); } Object x = array[takeIndex]; System.out.println("Consuming - " + x); takeIndex++; if (takeIndex == array.length){ takeIndex = 0; } //reduce the count for the array --count; //send signal producer produceCond.signal(); return x; } finally { lock.unlock(); } } }
创建另一个将在CustomBlockingQueue上方使用的主类。
package org.arpit.theitroad; public class CustomBlockingQueueMain { public static void main(String[] args) { CustomBlockingQueue customBlockingQueue = new CustomBlockingQueue(); //Creating producer and consumer threads Thread producer = new Thread(new Producer(customBlockingQueue)); Thread consumer = new Thread(new Consumer(customBlockingQueue)); producer.start(); consumer.start(); } } class Producer implements Runnable { private CustomBlockingQueue customBlockingQueue; public Producer(CustomBlockingQueue customBlockingQueue){ this.customBlockingQueue = customBlockingQueue; } @Override public void run() { for (int i = 1; i <= 10; i++) { try { customBlockingQueue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Consumer implements Runnable { private CustomBlockingQueue customBlockingQueue; public Consumer(CustomBlockingQueue customBlockingQueue){ this.customBlockingQueue = customBlockingQueue; } @Override public void run() { for (int i = 1; i <= 10; i++) { try { customBlockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
我们创建了两个可追加的类,一个用于制作人,另一个用于消费者,并使用这些Runnables创建了两个线程。
Producing - 1 Producing - 2 Producing - 3 Producing - 4 Producing - 5 Producing - 6 Consuming - 1 Consuming - 2 Consuming - 3 Consuming - 4 Consuming - 5 Consuming - 6 Producing - 7 Producing - 8 Producing - 9 Producing - 10 Consuming - 7 Consuming - 8 Consuming - 9 Consuming - 10
输出可能对我们有所不同,但CustomBlockingQueue在某一个时间只能有6个元素。