Nem blokkoló sorok

Szálbiztos és legfőképpen nem blokkoló sormegvalósítások kapcsolt csomópontokon.

ConcurrentLinkedQueue<E> – a szemétgyűjtővel való együttműködéshez igazított várakozásmentes algoritmust használ. Ez az algoritmus meglehetősen hatékony és nagyon gyors, mivel CAS-ra épül. A size() metódushosszú ideig futhat, ezért jobb, ha nem húzza állandóan.

ConcurrentLinkedDeque<E> – A Deque a Double ended queue rövidítése. Ez azt jelenti, hogy mindkét oldalról lehet adatokat hozzáadni és levonni. Ennek megfelelően az osztály mindkét működési módot támogatja: FIFO (First In First Out) és LIFO (Last In First Out).

A gyakorlatban a ConcurrentLinkedDeque-et akkor kell használni, ha a LIFO feltétlenül szükséges, mivel a csomópontok kétirányúsága miatt ez az osztály a felét veszíti teljesítményében a ConcurrentLinkedQueue- hoz képest .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Várólisták blokkolása

BlockingQueue<E> interfész - ha sok az adat, a ConcurrentLinkedQueue nem elég.

Ha a szálak nem látják el feladatukat, könnyen szerezhet egy OutOfMemmoryException kivételt . És hogy ilyen esetek ne forduljanak elő, van egy BlockingQueue a munkavégzéshez , amely különféle módszereket tartalmaz a sor kitöltésére és kezelésére, valamint feltételes zárolásokra.

A BlockingQueue nem ismeri fel a null elemeket, és egy NullPointerException-t dob ​​fel , amikor ilyen elemet próbál hozzáadni vagy beszerezni. A poll metódus null elemet ad vissza, ha az időtúllépésen belül egyetlen elem sem került a sorba.

BlockingQueue<E> megvalósítások

Nézzük meg közelebbről az egyes BlockingQueue megvalósításainkat :

Az ArrayBlockingQueue<E> egy blokkoló sorosztály, amely a klasszikus gyűrűpufferre épül. Itt lehetőségünk nyílik a zárak „őszinteségének” kezelésére. Ha fair=false (az alapértelmezett), akkor a szálak sorrendje nem garantált.

A DelayQueue<E extends Delayed> egy olyan osztály, amely lehetővé teszi, hogy az elemeket csak bizonyos késleltetés után vonja le a sorból, amely az egyes elemekben a Delayed felület getDelay metódusával van definiálva .

A LinkedBlockingQueue<E> egy blokkoló sor kapcsolt csomópontokon, amelyet a „két zárolási sor” algoritmussal valósítanak meg: az első zárolás egy elem hozzáadására, a második egy elem kihúzására szolgál a sorból. A zárolások miatt az ArrayBlockingQueue- hoz képest ez az osztály nagy teljesítményű, de több memóriát igényel. A sor méretét a konstruktor állítja be, és alapértelmezés szerint egyenlő az Integer.MAX_VALUE értékkel.

A PriorityBlockingQueue<E> egy többszálas burkoló a PriorityQueue felett . Az Összehasonlító felelős azért, hogy az elem milyen logikával kerüljön hozzáadásra. A legkisebb elem jön ki először.

SynchronousQueue<E> - a sor a FIFO (first-in-first-out) elv szerint működik. Minden beillesztési művelet blokkolja a „Gyártó” szálat, amíg a „Fogyasztó” szál ki nem húzza az elemet a sorból, és fordítva, a „Fogyasztó” megvárja, amíg a „Gyártó” beilleszti az elemet.

A BlockingDeque<E> egy olyan interfész, amely további módszereket ír le a kétirányú blokkolási sorhoz. Az adatok a sor mindkét oldaláról beszúrhatók és kihúzhatók.

A LinkedBlockingDeque<E> egy kétirányú blokkolósor a csatolt csomópontokon, amelyet egyszerű kétirányú listaként valósítanak meg, egyetlen zárral. A sor méretét a konstruktor állítja be, és alapértelmezés szerint egyenlő az Integer.MAX_VALUE értékkel.

TransferQueue<E> - az interfész érdekessége, hogy amikor egy elemet adunk a sorhoz, akkor blokkolható a beszúró Producer szál , amíg egy másik Consumer szál ki nem húzza az elemet a sorból. Hozzáadhat egy csekket egy adott időtúllépéshez, vagy beállíthat egy csekket a függőben lévő fogyasztókhoz . Ennek eredményeként egy adatátviteli mechanizmust kapunk, amely támogatja az aszinkron és szinkron üzeneteket.

A LinkedTransferQueue<E> a TransferQueue megvalósítása,amely a Dual Queues with Slack algoritmuson alapul. Erősen kihasználja a CAS-t (lásd fent) és a menetparkolót üresjáratban.