Java中的SynchronousQueue

时间:2020-01-09 10:35:11  来源:igfitidea点击:

Java中的SynchronousQueue是BlockingQueue接口的实现,并且是java.util.concurrent包的一部分。 Java中的SynchronousQueue与ArrayBlockingQueue和PriorityBlockingQueue之类的其他BlockingQueue实现不同,因为同步队列没有任何内部容量,甚至没有一个内部容量。因此,SynchronousQueue中的每个插入操作必须等待另一个线程进行相应的删除操作,反之亦然。

这就是为什么将其命名为SynchronousQueue的原因,因为元素的传递是同步发生的,而不是插入可以异步检索的数据。

Java中SynchronousQueue的功能

  • SynchronousQueue没有任何内部容量,甚至没有。
  • 由于没有容量,每个插入操作必须等待另一个线程进行相应的删除操作。例如,如果使用put()方法将元素插入同步队列,则该方法将被阻塞,直到另一个线程接收到该元素。如果尝试从同步队列中检索元素并且队列方法中没有元素,则以相同的方式等待另一个线程将其插入。
  • 我们无法窥视同步队列,因为仅当我们尝试删除它时,该元素才存在。因此,peek()方法始终返回null。
  • 由于没有要迭代的对象,因此无法迭代SynchronousQueue。因此,iterator()和spliterator()方法分别返回一个空的迭代器或者spliterator。
  • 与其他BlockingQueue实现一样,Java中的SynchronousQueue不允许使用null元素。尝试添加,放置或者提供null时,它将引发NullPointerException。

Java SynchronousQueue构造函数

  • SynchronousQueue()–使用不公平的访问策略创建一个SynchronousQueue。
  • SynchronousQueue(boolean fair)–使用指定的公平性策略创建一个SynchronousQueue。将Fairness设置为true构造的SynchronousQueue授予线程按FIFO顺序的访问权限。

SynchronousQueue Java示例

BlockingQueue实现被设计为主要用于生产者-消费者队列,因此让我们来看一个使用SynchronousQueue的生产者-消费者示例。在示例中,创建了两个线程,一个是生产者线程,另一个是消费者线程。

在使用者线程中,在将元素从同步队列中移出之前,使用sleep()方法引入了3秒的延迟,但是put()方法将等待直到检索到元素,而不是尝试添加另一个元素。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class SychroQ {
  public static void main(String[] args) {
    BlockingQueue<Integer> bQueue = new SynchronousQueue<>();
    // Producer 
    new Thread(()->{
      for(int i = 0; i < 5; i++){
        try {
          System.out.println("Added to queue-" + i);
          bQueue.put(i);
          Thread.sleep(200);                                 
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
        }
    }).start();
        
    // Consumer
    new Thread(()->{
      for(int i = 0; i < 5; i++){
        try {
          Thread.sleep(3000);
          System.out.println("Consumer retrieved- " + bQueue.take());                    
          
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
      }
    }).start();
  }
}

输出:

Added to queue-0
Consumer retrieved- 0
Added to queue-1
Consumer retrieved- 1
Added to queue-2
Consumer retrieved- 2
Added to queue-3
Consumer retrieved- 3
Added to queue-4
Consumer retrieved- 4