Nicht blockierende Warteschlangen

Thread-sichere und vor allem nicht blockierende Queue- Implementierungen auf verknüpften Knoten.

ConcurrentLinkedQueue<E> – es verwendet einen wartefreien Algorithmus, der für die Arbeit mit dem Garbage Collector angepasst ist. Dieser Algorithmus ist sehr effizient und sehr schnell, da er auf CAS basiert. Die size()- Methodekann lange ausgeführt werden, daher ist es am besten, sie nicht ständig abzurufen.

ConcurrentLinkedDeque<E> – Deque steht für Double-Ended Queue. Das bedeutet, dass Daten von beiden Seiten hinzugefügt und abgerufen werden können. Dementsprechend unterstützt die Klasse beide Betriebsarten: FIFO (First In First Out) und LIFO (Last In First Out).

In der Praxis sollte ConcurrentLinkedDeque verwendet werden, wenn LIFO unbedingt erforderlich ist, da diese Klasse aufgrund der Bidirektionalität der Knoten im Vergleich zu ConcurrentLinkedQueue die Hälfte an Leistung einbüßt .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Blockierende Warteschlangen

BlockingQueue<E> -Schnittstelle – wenn viele Daten vorhanden sind, reicht ConcurrentLinkedQueue nicht aus.

Wenn Threads ihre Aufgabe nicht erfüllen, kann es leicht zu einer OutOfMemmoryException kommen . Und damit solche Fälle nicht auftreten, haben wir eine BlockingQueue für die Arbeit mit verschiedenen Methoden zum Füllen und Arbeiten mit der Warteschlange und bedingten Sperren.

BlockingQueue erkennt keine Nullelemente und löst eine NullPointerException aus , wenn versucht wird, ein solches Element hinzuzufügen oder abzurufen. Die Poll-Methode gibt ein Nullelement zurück, wenn innerhalb des Timeouts kein Element in die Warteschlange gestellt wurde.

BlockingQueue<E>-Implementierungen

Schauen wir uns jede unserer BlockingQueue- Implementierungen genauer an :

ArrayBlockingQueue<E> ist eine blockierende Warteschlangenklasse, die auf dem klassischen Ringpuffer basiert. Hier haben wir die Möglichkeit, die „Ehrlichkeit“ von Schlössern zu verwalten. Wenn fair=false (Standardeinstellung), ist die Reihenfolge der Threads nicht garantiert.

DelayQueue<E erweitert Delayed> ist eine Klasse, die es Ihnen ermöglicht, Elemente erst nach einer bestimmten Verzögerung aus der Warteschlange zu ziehen, die in jedem Element über die getDelay- Methode der Delayed- Schnittstelle.

LinkedBlockingQueue<E> ist eine Blockierungswarteschlange auf verbundenen Knoten, die mit dem „Two Lock Queue“-Algorithmus implementiert ist: Die erste Sperre dient zum Hinzufügen, die zweite zum Ziehen eines Elements aus der Warteschlange. Aufgrund von Sperren weist diese Klasse im Vergleich zu ArrayBlockingQueue eine hohe Leistung auf, benötigt jedoch mehr Speicher. Die Warteschlangengröße wird über den Konstruktor festgelegt und ist standardmäßig gleich Integer.MAX_VALUE.

PriorityBlockingQueue<E> ist ein Multithread-Wrapper für PriorityQueue . Der Komparator ist für die Logik verantwortlich, mit der das Element hinzugefügt wird. Das kleinste Element kommt zuerst heraus.

SynchronousQueue<E> – die Warteschlange arbeitet nach dem FIFO-Prinzip (First-In-First-Out). Jeder Einfügungsvorgang blockiert den „Produzenten“-Thread, bis der „Verbraucher“-Thread das Element aus der Warteschlange zieht, und umgekehrt wartet der „Verbraucher“, bis der „Produzent“ das Element einfügt.

BlockingDeque<E> ist eine Schnittstelle, die zusätzliche Methoden für eine bidirektionale Blockierungswarteschlange beschreibt. Daten können von beiden Seiten der Warteschlange eingefügt und entnommen werden.

LinkedBlockingDeque<E> ist eine bidirektionale Blockierungswarteschlange auf verknüpften Knoten, implementiert als einfache bidirektionale Liste mit einer Sperre. Die Warteschlangengröße wird über den Konstruktor festgelegt und ist standardmäßig gleich Integer.MAX_VALUE.

TransferQueue<E> – Die Schnittstelle ist insofern interessant, als sie beim Hinzufügen eines Elements zur Warteschlange den einfügenden Producer- Thread blockieren kann,bis ein anderer Consumer- Thread das Element aus der Warteschlange zieht. Sie können auch eine Prüfung für eine bestimmte Zeitüberschreitung hinzufügen oder eine Prüfung für ausstehende Consumer s festlegen. Als Ergebnis erhalten wir einen Datenübertragungsmechanismus mit Unterstützung für asynchrone und synchrone Nachrichten.

LinkedTransferQueue<E> ist eine Implementierung von TransferQueue basierend auf dem Dual Queues with Slack-Algorithmus. Beansprucht intensiv CAS (siehe oben) und Thread-Parken im Leerlauf.