Неблокиращи опашки

Безопасни за нишки и най-важното неблокиращи реализации на опашки на свързани възли.

ConcurrentLinkedQueue<E> - използва алгоритъм без изчакване, адаптиран за работа със събирача на отпадъци. Този алгоритъм е доста ефективен и много бърз, тъй като е изграден върху CAS. Методът size() може да работи дълго време, така че е по-добре да не го изтегляте през цялото време.

ConcurrentLinkedDeque<E> - Deque означава Double ended queue. Това означава, че данните могат да се добавят и изтеглят от двете страни. Съответно класът поддържа и двата режима на работа: FIFO (First In First Out) и LIFO (Last In First Out).

На практика ConcurrentLinkedDeque трябва да се използва, ако LIFO е абсолютно необходимо, тъй като поради двупосочността на възлите, този клас губи половината от производителността в сравнение с ConcurrentLinkedQueue .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Блокиране на опашки

Интерфейс BlockingQueue<E> - ако има много данни, ConcurrentLinkedQueue не е достатъчен.

Когато нишките не успеят да свършат работата си, можете лесно да получите OutOfMemmoryException . И за да не възникват такива случаи, имаме BlockingQueue за работа с наличието на различни методи за попълване и работа с опашката и условни ключалки.

BlockingQueue не разпознава нулеви елементи и хвърля NullPointerException , когато се опитва да добави or получи такъв елемент. Методът за анкета връща нулев елемент, ако нито един елемент не е поставен в опашката в рамките на времето за изчакване.

Реализации на BlockingQueue<E>

Нека разгледаме по-подробно всяка от нашите реализации на BlockingQueue :

ArrayBlockingQueue<E> е клас на опашка за блокиране, изграден върху класическия пръстен буфер. Тук имаме възможност да управляваме „честността“ на ключалките. Ако fair=false (по подразбиране), тогава подреждането на нишките не е гарантирано.

DelayQueue<E extends Delayed> е клас, който ви позволява да изтегляте елементи от опашката само след определено забавяне, дефинирано във всеки елемент чрез метода getDelay на интерфейса Delayed .

LinkedBlockingQueue<E> е опашка за блокиране на свързани възли, реализирана на алгоритъма „две опашки за заключване“: първото заключване е за добавяне, второто е за изтегляне на елемент от опашката. Благодарение на заключванията, в сравнение с ArrayBlockingQueue , този клас има висока производителност, но изисква повече памет. Размерът на опашката се задава чрез конструктора и е equals на Integer.MAX_VALUE по подразбиране.

PriorityBlockingQueue<E> е многонишкова обвивка над PriorityQueue . Компараторът отговаря за логиката, по която ще бъде добавен елементът. Първи излиза най-малкият елемент.

SynchronousQueue<E> - опашката работи на принципа FIFO (first-in-first-out). Всяка операция за вмъкване блокира нишката „Производител“, докато нишката „Потребител“ изтегли елемента от опашката и обратно, „Потребителят“ ще изчака, докато „Производителят“ вмъкне елемента.

BlockingDeque<E> е интерфейс, който описва допълнителни методи за двупосочна опашка за блокиране. Данните могат да се вмъкват и изтеглят от двете страни на опашката.

LinkedBlockingDeque<E> е двупосочна опашка за блокиране на свързани възли, реализирана като обикновен двупосочен списък с едно заключване. Размерът на опашката се задава чрез конструктора и е equals на Integer.MAX_VALUE по подразбиране.

TransferQueue<E> - интерфейсът е интересен с това, че когато елемент се добави към опашката, е възможно да се блокира вмъкващата нишка Producer , докато друга нишка Consumer не изтегли елемента от опашката. Можете също така да добавите проверка за конкретно изчакване or да зададете проверка за чакащи потребители . В резултат на това получаваме механизъм за пренос на данни с поддръжка на асинхронни и синхронни съобщения.

LinkedTransferQueue<E> е реализация на TransferQueue, базирана на алгоритъма Dual Queues with Slack. Използва интензивно CAS (вижте по-горе) и паркиране на конци, когато е неактивен.