非阻塞队列

链接节点上的线程安全和最重要的非阻塞队列实现。

ConcurrentLinkedQueue<E> - 它使用适用于垃圾收集器的无等待算法。该算法非常高效且快速,因为它是基于 CAS 构建的。size()方法可以运行很长时间,所以最好不要一直拉。

ConcurrentLinkedDeque<E> - Deque 代表双端队列。这意味着可以从两侧添加和提取数据。因此,该类支持两种操作模式:FIFO(先进先出)和 LIFO(后进先出)。

实际上,如果绝对需要 LIFO,则应使用ConcurrentLinkedDeque ,因为由于节点的双向性,与ConcurrentLinkedQueue相比,此类在性能上损失了一半。

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

阻塞队列

BlockingQueue<E>接口- 如果有很多数据,ConcurrentLinkedQueue是不够的。

当线程无法完成其工作时,您可以轻松获得OutOfMemmoryException。为了避免出现这种情况,我们有一个 BlockingQueue用于使用不同的方法来填充和使用队列和条件锁。

BlockingQueue无法识别空元素,并在尝试添加或获取此类元素时抛出NullPointerException 。如果在超时时间内没有元素被放入队列,则 poll 方法返回一个空元素。

BlockingQueue<E> 实现

让我们仔细看看我们的每个BlockingQueue实现:

ArrayBlockingQueue<E>是一个建立在经典环形缓冲区之上的阻塞队列类。在这里我们有机会管理锁的“诚实”。如果 fair=false(默认值),则不保证线程顺序。

DelayQueue<E extends Delayed>是一个允许您仅在一定延迟后才从队列中拉取元素的类,通过Delayed接口的getDelay方法在每个元素中定义。

LinkedBlockingQueue<E>是链接节点上的阻塞队列,在“双锁队列”算法上实现:第一个锁用于添加,第二个锁用于从队列中提取元素。由于有锁,相对于ArrayBlockingQueue,该类具有较高的性能,但需要更多的内存。队列大小通过构造函数设置,默认情况下等于 Integer.MAX_VALUE。

PriorityBlockingQueue<E>是PriorityQueue 的多线程包装器。Comparator负责添加元素的逻辑。最小的元素首先出现。

SynchronousQueue<E> - 队列根据 FIFO(先进先出)原则工作。每个插入操作都会阻塞“生产者”线程,直到“消费者”线程从队列中拉出元素,反之亦然,“消费者”将等待“生产者”插入元素。

BlockingDeque<E>是描述双向阻塞队列的附加方法的接口。数据可以从队列的两边插入和取出。

LinkedBlockingDeque<E>是链接节点上的双向阻塞队列,实现为带有一个锁的简单双向列表。队列大小通过构造函数设置,默认情况下等于 Integer.MAX_VALUE。

TransferQueue<E> - 该接口很有趣,因为当一个元素被添加到队列中时,可以阻止插入的生产者线程,直到另一个消费者线程从队列中拉出该元素。您还可以添加对特定超时的检查或设置对未决Consumer 的检查。结果,我们得到了一个支持异步和同步消息的数据传输机制。

LinkedTransferQueue<E>是基于双队列和 Slack 算法的 TransferQueue 的实现在空闲时大量使用 CAS(见上文)和线程停放。