Kolejki nieblokujące

Bezpieczne wątkowo i co najważniejsze nieblokujące implementacje kolejek na połączonych węzłach.

ConcurrentLinkedQueue<E> — używa algorytmu bez oczekiwania, przystosowanego do pracy z Garbage Collectorem. Ten algorytm jest dość wydajny i bardzo szybki, ponieważ jest zbudowany na CAS. Metoda size() może działać przez długi czas, więc najlepiej nie ciągnąć jej cały czas.

ConcurrentLinkedDeque<E> — Deque oznacza kolejkę podwójnie zakończoną. Oznacza to, że dane mogą być dodawane i pobierane z obu stron. W związku z tym klasa obsługuje oba tryby działania: FIFO (First In First Out) oraz LIFO (Last In First Out).

W praktyce ConcurrentLinkedDeque powinno być używane, jeśli LIFO jest absolutnie konieczne, ponieważ ze względu na dwukierunkowość węzłów ta klasa traci połowę wydajności w porównaniu z ConcurrentLinkedQueue .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Blokowanie kolejek

Interfejs BlockingQueue<E> - jeśli danych jest dużo, ConcurrentLinkedQueue nie wystarczy.

Gdy wątki nie wykonują swojej pracy, możesz łatwo uzyskać wyjątek OutOfMemmoryException . Aby takie przypadki się nie pojawiały, mamy BlockingQueue do pracy z obecnością różnych metod wypełniania i pracy z kolejką oraz blokadami warunkowymi.

BlockingQueue nie rozpoznaje elementów null i zgłasza wyjątek NullPointerException podczas próby dodania lub pobrania takiego elementu. Metoda poll zwraca element null, jeśli żaden element nie został umieszczony w kolejce w określonym czasie.

BlockingQueue<E> Implementacje

Przyjrzyjmy się bliżej każdej z naszych implementacji BlockingQueue :

ArrayBlockingQueue<E> to blokująca klasa kolejki zbudowana na klasycznym buforze pierścieniowym. Tutaj mamy możliwość zarządzania „uczciwością” zamków. Jeśli fair=false (wartość domyślna), kolejność wątków nie jest gwarantowana.

DelayQueue<E extends Delayed> to klasa pozwalająca na wyciąganie elementów z kolejki dopiero po pewnym opóźnieniu, zdefiniowanym w każdym elemencie poprzez metodę getDelay interfejsu Delayed .

LinkedBlockingQueue<E> to kolejka blokująca na połączonych węzłach, zaimplementowana na algorytmie „kolejki dwóch blokad”: pierwsza blokada służy do dodawania, druga do wyciągania elementu z kolejki. Ze względu na blokady, w porównaniu do ArrayBlockingQueue , ta klasa ma wysoką wydajność, ale wymaga więcej pamięci. Rozmiar kolejki jest ustawiany przez konstruktora i domyślnie jest równy Integer.MAX_VALUE.

PriorityBlockingQueue<E> to wielowątkowe opakowanie na PriorityQueue . Komparator odpowiada za logikę, według której element zostanie dodany. Najmniejszy element wychodzi pierwszy.

SynchronousQueue<E> - kolejka działa zgodnie z zasadą FIFO (first-in-first-out). Każda operacja wstawiania blokuje wątek „Producent”, dopóki wątek „Konsument” nie wyciągnie elementu z kolejki i odwrotnie „Konsument” będzie czekał, aż „Producent” wstawi element.

BlockingDeque<E> to interfejs opisujący dodatkowe metody dwukierunkowej kolejki blokowania. Dane mogą być wstawiane i wyciągane z obu stron kolejki.

LinkedBlockingDeque<E> to dwukierunkowa kolejka blokowania na połączonych węzłach, zaimplementowana jako prosta dwukierunkowa lista z jedną blokadą. Rozmiar kolejki jest ustawiany przez konstruktora i domyślnie jest równy Integer.MAX_VALUE.

TransferQueue<E> - interfejs jest o tyle ciekawy, że w momencie dodania elementu do kolejki istnieje możliwość zablokowania wstawiania wątku Producer do momentu, aż inny wątek Consumer wyciągnie element z kolejki. Możesz także dodać czek dla określonego limitu czasu lub ustawić czek dla oczekujących s . W efekcie otrzymujemy mechanizm przesyłania danych z obsługą komunikatów asynchronicznych i synchronicznych.

LinkedTransferQueue<E> to implementacja TransferQueue oparta na algorytmie Dual Queues with Slack. Intensywnie wykorzystuje CAS (patrz wyżej) i parkowanie wątków, gdy jest bezczynny.