Files d'attente non bloquantes

Implémentations de file d'attente sécurisées pour les threads et surtout non bloquantes sur les nœuds liés.

ConcurrentLinkedQueue<E> - il utilise un algorithme sans attente adapté pour fonctionner avec le ramasse-miettes. Cet algorithme est assez efficace et très rapide, car il est construit sur CAS. La méthode size() peut s'exécuter pendant une longue période, il est donc préférable de ne pas la tirer tout le temps.

ConcurrentLinkedDeque<E> - Deque signifie file d'attente à double extrémité. Cela signifie que les données peuvent être ajoutées et extraites des deux côtés. En conséquence, la classe prend en charge les deux modes de fonctionnement : FIFO (First In First Out) et LIFO (Last In First Out).

En pratique, ConcurrentLinkedDeque doit être utilisé si LIFO est absolument nécessaire, car du fait de la bidirectionnalité des nœuds, cette classe perd la moitié de ses performances par rapport à ConcurrentLinkedQueue .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Blocage des files d'attente

Interface BlockingQueue<E> - s'il y a beaucoup de données, ConcurrentLinkedQueue ne suffit pas.

Lorsque les threads ne parviennent pas à faire leur travail, vous pouvez facilement obtenir une OutOfMemmoryException . Et pour que de tels cas ne se produisent pas, nous avons un BlockingQueue pour travailler avec la présence de différentes méthodes pour remplir et travailler avec la file d'attente et les verrous conditionnels.

BlockingQueue ne reconnaît pas les éléments nuls et lève une exception NullPointerException lors de la tentative d'ajout ou d'obtention d'un tel élément. La méthode poll renvoie un élément nul si aucun élément n'a été placé dans la file d'attente dans le délai imparti.

Implémentations de BlockingQueue<E>

Examinons de plus près chacune de nos implémentations BlockingQueue :

ArrayBlockingQueue<E> est une classe de file d'attente de blocage construite sur le tampon en anneau classique. Ici, nous avons la possibilité de gérer «l'honnêteté» des serrures. Si fair=false (valeur par défaut), l'ordre des threads n'est pas garanti.

DelayQueue<E extend Delayed> est une classe qui vous permet d'extraire des éléments de la file d'attente uniquement après un certain délai, défini dans chaque élément via la méthode getDelay de l' interface Delayed .

LinkedBlockingQueue<E> est une file d'attente bloquante sur les nœuds liés, implémentée sur l'algorithme "file d'attente à deux verrous" : le premier verrou sert à ajouter, le second à extraire un élément de la file d'attente. En raison des verrous, par rapport à ArrayBlockingQueue , cette classe a des performances élevées, mais elle nécessite plus de mémoire. La taille de la file d'attente est définie via le constructeur et est égale à Integer.MAX_VALUE par défaut.

PriorityBlockingQueue<E> est un wrapper multithread sur PriorityQueue . Le comparateur est responsable de la logique selon laquelle l'élément sera ajouté. Le plus petit élément sort en premier.

SynchronousQueue<E> - la file d'attente fonctionne selon le principe FIFO (premier entré, premier sorti). Chaque opération d'insertion bloque le thread "Producer" jusqu'à ce que le thread "Consumer" tire l'élément de la file d'attente et vice versa, le "Consumer" attendra que le "Producteur" insère l'élément.

BlockingDeque<E> est une interface qui décrit des méthodes supplémentaires pour une file d'attente de blocage bidirectionnelle. Les données peuvent être insérées et extraites des deux côtés de la file d'attente.

LinkedBlockingDeque<E> est une file d'attente de blocage bidirectionnelle sur les nœuds liés, implémentée comme une simple liste bidirectionnelle avec un verrou. La taille de la file d'attente est définie via le constructeur et est égale à Integer.MAX_VALUE par défaut.

TransferQueue<E> - l'interface est intéressante dans la mesure où lorsqu'un élément est ajouté à la file d'attente, il est possible de bloquer le thread producteur d'insertion jusqu'à ce qu'un autre thread consommateur extraie l'élément de la file d'attente. Vous pouvez également ajouter une vérification pour un délai d'attente spécifique ou définir une vérification pour les Consumer s en attente . En conséquence, nous obtenons un mécanisme de transfert de données prenant en charge les messages asynchrones et synchrones.

LinkedTransferQueue<E> est une implémentation de TransferQueue basée sur l'algorithme Dual Queues with Slack. Fait un usage intensif du CAS (voir ci-dessus) et du stationnement des threads lorsqu'il est inactif.