Niet-blokkerende wachtrijen

Threadveilige en vooral niet-blokkerende wachtrij- implementaties op gekoppelde knooppunten.

ConcurrentLinkedQueue<E> - het gebruikt een wachtvrij algoritme dat is aangepast om met de vuilnisophaler te werken. Dit algoritme is behoorlijk efficiënt en erg snel, omdat het op CAS is gebouwd. De methode size() kan lange tijd worden uitgevoerd, dus het is het beste om er niet de hele tijd aan te trekken.

ConcurrentLinkedDeque<E> - Deque staat voor Double ended wachtrij. Dit betekent dat gegevens van beide kanten kunnen worden toegevoegd en opgehaald. Dienovereenkomstig ondersteunt de klasse beide werkingsmodi: FIFO (First In First Out) en LIFO (Last In First Out).

In de praktijk moet ConcurrentLinkedDeque worden gebruikt als LIFO absoluut noodzakelijk is, aangezien deze klasse vanwege de bidirectionele aard van de knooppunten de helft aan prestaties verliest in vergelijking met ConcurrentLinkedQueue .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Wachtrijen blokkeren

BlockingQueue<E> -interface - als er veel gegevens zijn, is ConcurrentLinkedQueue niet voldoende.

Wanneer threads hun werk niet doen, kunt u eenvoudig een OutOfMemmoryException krijgen . En zodat dergelijke gevallen zich niet voordoen, hebben we een BlockingQueue voor werk met de aanwezigheid van verschillende methoden voor het vullen en werken met de wachtrij en voorwaardelijke sloten.

BlockingQueue herkent geen null-elementen en genereert een NullPointerException wanneer wordt geprobeerd een dergelijk element toe te voegen of op te halen. De poll-methode retourneert een null-element als er binnen de time-out geen element in de wachtrij is geplaatst.

BlockingQueue<E> Implementaties

Laten we elk van onze BlockingQueue- implementaties nader bekijken :

ArrayBlockingQueue<E> is een blokkerende wachtrijklasse die is gebouwd op de klassieke ringbuffer. Hier hebben we de mogelijkheid om de "eerlijkheid" van sloten te beheren. Als fair=false (de standaardinstelling), dan is de volgorde van threads niet gegarandeerd.

DelayQueue<E extends Delayed> is een klasse waarmee u elementen uit de wachtrij kunt halen na een bepaalde vertraging, gedefinieerd in elk element via de getDelay- methode van de Delayed- interface.

LinkedBlockingQueue<E> is een blokkeerwachtrij op gekoppelde knooppunten, geïmplementeerd op het algoritme "two lock queue": de eerste vergrendeling is om toe te voegen, de tweede is om een ​​element uit de wachtrij te halen. Vanwege sloten heeft deze klasse, vergeleken met ArrayBlockingQueue , hoge prestaties, maar er is meer geheugen voor nodig. De wachtrijgrootte wordt ingesteld via de constructor en is standaard gelijk aan Integer.MAX_VALUE.

PriorityBlockingQueue<E> is een multi-threaded wrapper over PriorityQueue . De Comparator is verantwoordelijk voor de logica waarmee het element wordt toegevoegd. Het kleinste element komt er als eerste uit.

SynchronousQueue<E> - de wachtrij werkt volgens het FIFO-principe (first-in-first-out). Elke invoegbewerking blokkeert de "Producer"-thread totdat de "Consumer"-thread het element uit de wachtrij haalt en vice versa, de "Consumer" wacht tot de "Producer" het element invoegt.

BlockingDeque<E> is een interface die aanvullende methoden beschrijft voor een bidirectionele blokkeerwachtrij. Gegevens kunnen aan beide kanten van de wachtrij worden ingevoerd en verwijderd.

LinkedBlockingDeque<E> is een bidirectionele blokkeerwachtrij op gekoppelde knooppunten, geïmplementeerd als een eenvoudige bidirectionele lijst met één slot. De wachtrijgrootte wordt ingesteld via de constructor en is standaard gelijk aan Integer.MAX_VALUE.

TransferQueue<E> - de interface is interessant omdat wanneer een element aan de wachtrij wordt toegevoegd, het mogelijk is om de invoegende Producer- thread te blokkeren totdat een andere Consumer- thread het element uit de wachtrij haalt. U kunt ook een controle voor een specifieke time-out toevoegen of een controle instellen voor wachtende consumenten . Als gevolg hiervan krijgen we een mechanisme voor gegevensoverdracht met ondersteuning voor asynchrone en synchrone berichten.

LinkedTransferQueue<E> is een implementatie van TransferQueue op basis van het Dual Queues with Slack-algoritme. Maakt veel gebruik van CAS (zie hierboven) en threadparking bij inactiviteit.