Non-Blocking Queues

Thread-safe and most importantly non-blocking Queue implementations on linked nodes.

ConcurrentLinkedQueue<E> - it uses a wait-free algorithm adapted to work with the garbage collector. This algorithm is quite efficient and very fast, as it is built on CAS. The size() methodcan run for a long time, so it's best not to pull it all the time.

ConcurrentLinkedDeque<E> - Deque stands for Double ended queue. This means that data can be added and pulled from both sides. Accordingly, the class supports both modes of operation: FIFO (First In First Out) and LIFO (Last In First Out).

In practice, ConcurrentLinkedDeque should be used if LIFO is absolutely necessary, since due to the bidirectionality of the nodes, this class loses half in performance compared to ConcurrentLinkedQueue .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Blocking Queues

BlockingQueue<E> interface - if there is a lot of data, ConcurrentLinkedQueue is not enough.

When threads fail to do their job, you can easily get an OutOfMemmoryException . And so that such cases do not arise, we have a BlockingQueue for work with the presence of different methods for filling and working with the queue and conditional locks.

BlockingQueue does not recognize null elements and throws a NullPointerException when trying to add or get such an element. The poll method returns a null element if no element has been placed in the queue within the timeout.

BlockingQueue<E> Implementations

Let's take a closer look at each of our BlockingQueue implementations :

ArrayBlockingQueue<E> is a blocking queue class built on the classic ring buffer. Here we have the opportunity to manage the “honesty” of locks. If fair=false (the default), then thread ordering is not guaranteed.

DelayQueue<E extends Delayed> is a class that allows you to pull elements from the queue only after a certain delay, defined in each element through the getDelay method of the Delayed interface.

LinkedBlockingQueue<E> is a blocking queue on linked nodes, implemented on the “two lock queue” algorithm: the first lock is for adding, the second is for pulling an element from the queue. Due to locks, compared to ArrayBlockingQueue , this class has high performance, but it requires more memory. The queue size is set via the constructor and is equal to Integer.MAX_VALUE by default.

PriorityBlockingQueue<E> is a multi-threaded wrapper over PriorityQueue . The Comparator is responsible for the logic by which the element will be added. The smallest element comes out first.

SynchronousQueue<E> - the queue works according to the FIFO (first-in-first-out) principle. Each insert operation blocks the “Producer” thread until the “Consumer” thread pulls the element from the queue and vice versa, the “Consumer” will wait until the “Producer” inserts the element.

BlockingDeque<E> is an interface that describes additional methods for a bidirectional blocking queue. Data can be inserted and pulled out from both sides of the queue.

LinkedBlockingDeque<E> is a bidirectional blocking queue on linked nodes, implemented as a simple bidirectional list with one lock. The queue size is set via the constructor and is equal to Integer.MAX_VALUE by default.

TransferQueue<E> - the interface is interesting in that when an element is added to the queue, it is possible to block the inserting Producer thread until another Consumer thread pulls the element from the queue. You can also add a check for a specific timeout or set a check for pending Consumer s . As a result, we get a data transfer mechanism with support for asynchronous and synchronous messages.

LinkedTransferQueue<E> is an implementation of TransferQueue based on the Dual Queues with Slack algorithm. Makes heavy use of CAS (see above) and thread parking when idle.

undefined
1
Task
Module 3. Java Professional, level 19, lesson 3
Locked
Message Queue
task4205
undefined
1
Task
Module 3. Java Professional, level 19, lesson 3
Locked
Queue Again
task4206