Java中的ArrayBlockingQueue

时间:2020-02-23 14:34:43  来源:igfitidea点击:

在本文中,我们将理解Java并发队列,BlockingQueue。
然后我们将深入了解它的实现之一,ArrayBlockingQueue。

什么是blockingqueue.

BlockingQueue接口在并发API下在Java 5中引入,它代表了一个线程安全队列,其中可以将元素添加到中并从中删除。

我们可以使用多个线程插入和删除元素 BlockingQueue同时。

它被称为的原因 blocking因为当此队列已满时,它有能力阻止一个线程插入元素,并且当此队列为空时,它会阻止一个线程删除元素。
当然,有不同的方法,决定是否阻止线程或者返回异常。

ArrayBlockingQueue.

ArrayBlockingQueue实现了BlockingQueue Java接口。
它是一个并发和有界的阻塞队列实现,它使用数组来存储元素。
该类提供了从队列中插入和删除元素的阻塞和非阻塞功能。
通过阻止它意味着uke()等的函数,并将()将阻止消费者或者生产者线程无限期,除非删除或者插入元素。

特征

  • 它是一个阻塞队列的线程安全实现。
  • 它是一个有界队列,其大小在创建对象时给出,并且一旦实例化就无法改变。队列在内部使用阵列实现。
  • ArrayBlockingQueue上的元素可以在插入顺序或者FIFO中消耗。
  • 不允许使用空对象,并且如果在拦截队列上的情况下,它将抛出异常。
  • 它具有阻塞和非阻塞功能,可以从队列中放置或者删除元素。
  • 它允许生产者或者消费者线程的公平政策。下面详细解释公平政策

构造函数

ArrayBlockingQueue有三个构造函数。
fairness标志设置为True意味着,如果多个生产者或者使用者线程在队列中等待用于添加或者删除队列中的元素,那么它将确保等待线程的FIFO顺序如果它是假的,则它不保证订单线程。

ArrayBlockingQueue在Java中,内部使用重圈 lock对象实例。
它还使用锁来创建两个条件令人备忘录和不满。

The notEmpty condition will allow consumers to remove elements from the queue in a threadsafe manner and will block the consumer threads in case the queue gets full. 
The notFull condition will in a similar way allow producer threads to add elements in the queue in a thread-safe manner until it's full after which it will block all producer threads.

在第三构造函数中,如果我们通过集合,初始数组块将具有遍历顺序中该集合的元素。

public ArrayBlockingQueue(int capacity, boolean fair)
 
        public ArrayBlockingQueue(int capacity) 
 
        ArrayBlockingQueue(int capacity, boolean fair,  Collection<? extends E> c)

方法

ArrayBlockingQueue有许多支持的方法,我们将列出所有这些方法,另外,我们添加了某些功能的实际实现的源代码片段以便更好地理解。
添加的函数的源片段有助于帮助了解如何利用构造函数中初始化的锁定和条件对象。

  • offer(E):此功能如下所示,将在尾部添加一个元素到队列中,如果达到队列容量,则成功返回REATE,则会返回FALSE。这是一个线程安全方法,并且是非块:如果队列已满,则不会阻止生产者线程但返回false。
public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
  • offer(E e, long timeout, TimeUnit unit):此功能的行为类似于报价(e),除了阻止队列完整的情况下,它不会快速返回false,但等待到超时值,以查看阻止队列中的空间是否可用以插入元素在返回false之前。
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
 
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
  • put(E e):此函数会将元素插入尾端的阻塞队列中,除非拦截队列已满,否则将无限期等待。
public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
  • add(E):此功能内部使用优惠(e)并以相同的方式行事,除非拦截队列已满时,它会抛出IllegalStateException。
  • poll():此函数删除并返回阻塞队列顶部的元素,如果队列为空,则返回null。这是一个非阻塞功能。
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
  • poll(long timeout, TimeUnit unit):此功能的行为与轮询函数一样,除了在拦截队列尝试在阻塞队列顶部的元素之前为空如果拦截队列为空而等待超时参数值。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  • take():如果队列不为空,则此函数将返回阻塞队列顶部的元素。如果阻塞队列为空,则调用此函数的线程将等待将元素插入阻塞队列中。
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
  • peek():此函数将返回阻塞队列顶部的元素,而不会删除元素,如果阻塞队列为空,则为null。
  • size():此函数返回阻塞队列的容量
  • remainingCapacity():此函数返回最大元素之间的差异,阻塞队列可以保持当前在阻塞队列中的元素。
  • remove(Object o):如果它等于传递给此函数的对象,则此函数从阻塞队列中删除一个元素的单个实例。如果它找到匹配元素,则删除后返回true eys false。
  • contains(Object o):如果在匹配作为输入参数evers的对象的阻塞队列上存在对象,则此函数返回true。
  • toArray():此函数返回一个对象[],它是内部阵列的InOrder副本,其返回阻塞队列。 System.ArrayCopy()用于复制数组,以确保返回的数组上的外部修改不会影响阻塞队列。
  • clear():此函数将以原子方式删除阻塞队列中的所有元素。在清空队列后,这还会发出任何在阻塞队列中等待的生产者线程。
  • drainTo():此函数将以原子方式拓扑队列中的所有元素。如果集合参数实例和此函数所调用的实例相同,则将抛出IllegalArgumentException。任何等待的生产者线程都将发出信号,即队列为空并准备好接受新元素。

使用场景

当不需要全面的消息传递基础架构时,ArrayBlockingQueue实例可用于解决生产者和消费者类型问题。
它可以用作资源池,以便如果资源有限,则可以提高消费者。

实施代码

在下面的单元测试中,我们创建了一个ArrayBlockingQueue实例并启动生产者,如果生产者线程等待某些消费线程创建的空间。
同样,我们也在测试如果消费者线程等待在队列为空中将要添加到队列中的元素。

package main.com.kv.mt.concurrent.blockingqueue;
 
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
 
/**
 * The below code tests where the ArrayBlockingQueue makes the producer threads wait when the queue is full
 * and similarly it makes the consumer threads full if the queue is empty
 */
public class theitroadArrayBlockingQueue {
    public static void main(String args[]){
        final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(5);
        final Random random = new Random();
 
        //producer thread keeps running until its interrupted
        Runnable producer = () -> {
            boolean isInterrupted = false;
            while(!isInterrupted) {
                try {
                    System.out.println(Thread.currentThread().getName() + " adding to queue");
                    blockingQueue.put(random.nextInt());
                    System.out.println(Thread.currentThread().getName() + " finished adding to queue");
                } catch (InterruptedException e) {
                    System.out.println(Thread.currentThread().getName() + " interrupted");
                    isInterrupted = true;
                }
            };
        };
 
        //consumer thread keeps running until its interrupted
        Runnable consumer = () -> {
            boolean isInterrupted = false;
            while(!isInterrupted) {
                try {
                    System.out.println(Thread.currentThread().getName() + " retrieving from queue");
                    System.out.println(Thread.currentThread().getName() + " retrieved " + blockingQueue.take() + " from queue");
                } catch (InterruptedException e) {
                    System.out.println(Thread.currentThread().getName() + " interrupted");
                    isInterrupted = true;
                }
            }
        };
 
        Thread producerThread = new Thread(producer);
        producerThread.setName("MyProducer");
 
        Thread consumerThread = new Thread(consumer);
        consumerThread.setName("MyConsumer");
 
        producerThread.start();
 
        //this code is to wait for the main thread to wait till the producer completely fills the blocking queue
        while(blockingQueue.remainingCapacity()!=0){
            try{
                Thread.sleep(5000);
            }catch (InterruptedException ie){
                System.out.println(" interrupted main thread");
            }
        }
        //This log checks the MyProducer thread state as it should be now in waiting state as the  blocking queue is full
        System.out.println("Queue is full and the MyProducer thread state : "+producerThread.getState());
        assert (Thread.State.WAITING==producerThread.getState());
        assert(producerThread.isAlive());
        //The producer thread is stopped to ensure the blocking queue becomes empty once all integers are consumed
        producerThread.interrupt();
 
        //now start the consumer threads
        consumerThread.start();
 
        //wait for the consumer to drain the blocking queue
        while(((ArrayBlockingQueue) blockingQueue).remainingCapacity()!=5){
            try{
                Thread.sleep(5000);
            }catch (InterruptedException ie){
                System.out.println(" interrupted main thread");
            }
        }
 
        //check the status of the consumer thread once the blocking queue is empty. it should we in waiting state
        System.out.println("Queue is empty and the MyConsumer thread state : "+consumerThread.getState());
        assert(Thread.State.WAITING==consumerThread.getState());
        assert(consumerThread.isAlive());
 
        //stop the consumer
        consumerThread.interrupt();
    }
}
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
MyProducer finished adding to queue
MyProducer adding to queue
Queue is full and the MyProducer thread state : WAITING
MyProducer interrupted
MyConsumer retrieving from queue
MyConsumer retrieved -65648598 from queue
MyConsumer retrieving from queue
MyConsumer retrieved -1141421021 from queue
MyConsumer retrieving from queue
MyConsumer retrieved 1476346866 from queue
MyConsumer retrieving from queue
MyConsumer retrieved 1937023750 from queue
MyConsumer retrieving from queue
MyConsumer retrieved -1723127356 from queue
MyConsumer retrieving from queue
Queue is empty and the MyConsumer thread state : WAITING
MyConsumer interrupted