非阻塞隊列

鏈接節點上的線程安全和最重要的非阻塞隊列實現。

ConcurrentLinkedQueue<E> - 它使用適用於垃圾收集器的無等待算法。該算法非常高效且快速,因為它是基於 CAS 構建的。size()方法可以運行很長時間,所以最好不要一直拉。

ConcurrentLinkedDeque<E> - Deque 代表雙端隊列。這意味著可以從兩側添加和提取數據。因此,該類支持兩種操作模式:FIFO(先進先出)和 LIFO(後進先出)。

實際上,如果絕對需要 LIFO,則應使用ConcurrentLinkedDeque ,因為由於節點的雙向性,與ConcurrentLinkedQueue相比,此類在性能上損失了一半。

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

阻塞隊列

BlockingQueue<E>接口- 如果有很多數據,ConcurrentLinkedQueue是不夠的。

當線程無法完成其工作時,您可以輕鬆獲得OutOfMemmoryException。為了避免出現這種情況,我們有一個 BlockingQueue用於使用不同的方法來填充和使用隊列和條件鎖。

BlockingQueue無法識別空元素,並在嘗試添加或獲取此類元素時拋出NullPointerException 。如果在超時時間內沒有元素被放入隊列,則 poll 方法返回一個空元素。

BlockingQueue<E> 實現

讓我們仔細看看我們的每個BlockingQueue實現:

ArrayBlockingQueue<E>是一個建立在經典環形緩衝區之上的阻塞隊列類。在這裡我們有機會管理鎖的“誠實”。如果 fair=false(默認值),則不保證線程順序。

DelayQueue<E extends Delayed>是一個允許您僅在一定延遲後才從隊列中拉取元素的類,通過Delayed接口的getDelay方法在每個元素中定義。

LinkedBlockingQueue<E>是鏈接節點上的阻塞隊列,在“雙鎖隊列”算法上實現:第一個鎖用於添加,第二個鎖用於從隊列中提取元素。由於有鎖,相對於ArrayBlockingQueue,該類具有較高的性能,但需要更多的內存。隊列大小通過構造函數設置,默認情況下等於 Integer.MAX_VALUE。

PriorityBlockingQueue<E>是PriorityQueue 的多線程包裝器。Comparator負責添加元素的邏輯。最小的元素首先出現。

SynchronousQueue<E> - 隊列根據 FIFO(先進先出)原則工作。每個插入操作都會阻塞“生產者”線程,直到“消費者”線程從隊列中拉出元素,反之亦然,“消費者”將等待“生產者”插入元素。

BlockingDeque<E>是描述雙向阻塞隊列的附加方法的接口。數據可以從隊列的兩邊插入和取出。

LinkedBlockingDeque<E>是鏈接節點上的雙向阻塞隊列,實現為帶有一個鎖的簡單雙向列表。隊列大小通過構造函數設置,默認情況下等於 Integer.MAX_VALUE。

TransferQueue<E> - 該接口很有趣,因為當一個元素被添加到隊列中時,可以阻止插入的生產者線程,直到另一個消費者線程從隊列中拉出該元素。您還可以添加對特定超時的檢查或設置對未決Consumer 的檢查。結果,我們得到了一個支持異步和同步消息的數據傳輸機制。

LinkedTransferQueue<E>是基於雙隊列和 Slack 算法的 TransferQueue 的實現在閒置時大量使用 CAS(見上文)和線程停放。