Java中的自定义BlockingQueue实现

时间:2020-02-23 14:34:47  来源:igfitidea点击:

在本教程中,我们将看到如何创建自己的自定义BlockingQueue。

这是BlockingQueue的简单实现。

  • 我们将使用数组在内部存储BlockingQueue中的元素。

此阵列的大小定义一次可以驻留在BlockingQueue中的最大元素数。

  • 我们将使用 lockconditions对象创建自定义BlockingQueue。
  • 在将元素放在队列中,如果队列已满,则生产者将会 wait队列要为空。
  • 虽然从队列中消耗元素,但如果队列为空,则 consumer会等待 queue充满填满。

创建一个名为的类 CustomBlockingQueue.java

package org.arpit.theitroad;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
public class CustomBlockingQueue {
 
	final Lock lock = new ReentrantLock();
 
	//Conditions
	final Condition produceCond  = lock.newCondition(); 
	final Condition consumeCond = lock.newCondition(); 
 
	//Array to store element for CustomBlockingQueue
	final Object[] array = new Object[6];
	int putIndex, takeIndex;
	int count;
 
	public void put(Object x) throws InterruptedException {
 
		lock.lock();
		try {
			while (count == array.length){
				//Queue is full, producers need to wait
				produceCond.await();
			}
 
			array[putIndex] = x;
			System.out.println("Producing - " + x);
			putIndex++;
			if (putIndex == array.length){
				putIndex = 0;
			}
			//Increment the count for the array
			++count;
			consumeCond.signal();
		} finally {
			lock.unlock();
		}
	}
 
	public Object take() throws InterruptedException {
		lock.lock();
		try {
			while (count == 0){
				//Queue is empty, consumers need to wait
				consumeCond.await();
			}
			Object x = array[takeIndex];
			System.out.println("Consuming - " + x);
			takeIndex++;
			if (takeIndex == array.length){
				takeIndex = 0;
			}
			//reduce the count for the array
			--count;
			//send signal producer
			produceCond.signal();
			return x;
		} finally {
			lock.unlock();
		}
	}
}

创建另一个将在CustomBlockingQueue上方使用的主类。

package org.arpit.theitroad;
 
 
public class CustomBlockingQueueMain {
 
	public static void main(String[] args) {
		CustomBlockingQueue customBlockingQueue = new CustomBlockingQueue();
		//Creating producer and consumer threads
		Thread producer = new Thread(new Producer(customBlockingQueue));
		Thread consumer = new Thread(new Consumer(customBlockingQueue)); 
		
		producer.start();
		consumer.start();
	}
}
 
class Producer implements Runnable {
 
	private CustomBlockingQueue customBlockingQueue;
 
	public Producer(CustomBlockingQueue customBlockingQueue){
		this.customBlockingQueue = customBlockingQueue;
	}
	@Override
	public void run() {
		for (int i = 1; i <= 10; i++) {
			try {
				customBlockingQueue.put(i);                            
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
 
}
 
class Consumer implements Runnable {
	private CustomBlockingQueue customBlockingQueue;
 
	public Consumer(CustomBlockingQueue customBlockingQueue){
		this.customBlockingQueue = customBlockingQueue;
	}
	@Override
	public void run() {
		for (int i = 1; i <= 10; i++) {
			try {
				customBlockingQueue.take();               
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
 
}

我们创建了两个可追加的类,一个用于制作人,另一个用于消费者,并使用这些Runnables创建了两个线程。

Producing - 1
Producing - 2
Producing - 3
Producing - 4
Producing - 5
Producing - 6
Consuming - 1
Consuming - 2
Consuming - 3
Consuming - 4
Consuming - 5
Consuming - 6
Producing - 7
Producing - 8
Producing - 9
Producing - 10
Consuming - 7
Consuming - 8
Consuming - 9
Consuming - 10

输出可能对我们有所不同,但CustomBlockingQueue在某一个时间只能有6个元素。