Hàng đợi không chặn

Triển khai Hàng đợi an toàn theo luồng và quan trọng nhất là không chặn trên các nút được liên kết.

ConcurrentLinkedQueue<E> - thuật toán này sử dụng thuật toán không chờ đợi được điều chỉnh để hoạt động với trình thu gom rác. Thuật toán này khá hiệu quả và rất nhanh vì nó được xây dựng trên CAS. Phương thức size() có thể chạy trong thời gian dài, vì vậy tốt nhất bạn không nên kéo nó liên tục.

ConcurrentLinkedDeque<E> - Deque là viết tắt của Hàng đợi kết thúc kép. Điều này có nghĩa là dữ liệu có thể được thêm và lấy từ cả hai phía. Theo đó, lớp hỗ trợ cả 2 phương thức hoạt động là FIFO (First In First Out) và LIFO (Last In First Out).

Trong thực tế, ConcurrentLinkedDeque nên được sử dụng nếu LIFO là thực sự cần thiết, vì tính hai chiều của các nút, lớp này mất một nửa hiệu suất so với ConcurrentLinkedQueue .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

Chặn hàng đợi

Giao diện BlockingQueue<E> - nếu có nhiều dữ liệu, ConcurrentLinkedQueue là không đủ.

Khi các luồng không thực hiện được công việc của chúng, bạn có thể dễ dàng nhận được OutOfMemmoryException . Và để những trường hợp như vậy không phát sinh, chúng tôi có một BlockingQueue để làm việc với sự hiện diện của các phương thức khác nhau để điền và làm việc với hàng đợi và khóa có điều kiện.

BlockingQueue không nhận ra các phần tử rỗng và đưa ra một NullPulumException khi cố gắng thêm hoặc lấy một phần tử như vậy. Phương thức thăm dò trả về một phần tử null nếu không có phần tử nào được đặt trong hàng đợi trong thời gian chờ.

Triển khai BlockingQueue<E>

Hãy xem xét kỹ hơn từng triển khai BlockingQueue của chúng tôi :

ArrayBlockingQueue<E> là lớp hàng đợi chặn được xây dựng trên bộ đệm vòng cổ điển. Tại đây, chúng tôi có cơ hội quản lý tính “trung thực” của ổ khóa. Nếu fair=false (mặc định), thì thứ tự chuỗi không được đảm bảo.

DelayQueue<E extends Delayed> là một lớp cho phép bạn lấy các phần tử từ hàng đợi chỉ sau một độ trễ nhất định, được xác định trong từng phần tử thông qua phương thức getDelay của giao diện Delayed .

LinkedBlockingQueue<E> là hàng đợi chặn trên các nút được liên kết, được triển khai trên thuật toán “hàng đợi hai khóa”: khóa đầu tiên dùng để thêm, khóa thứ hai dùng để lấy phần tử ra khỏi hàng đợi. Do bị khóa, so với ArrayBlockingQueue , lớp này có hiệu năng cao nhưng cần nhiều bộ nhớ hơn. Kích thước hàng đợi được đặt thông qua hàm tạo và bằng Integer.MAX_VALUE theo mặc định.

PriorityBlockingQueue<E> là trình bao bọc đa luồng trên PriorityQueue . Bộ so sánh chịu trách nhiệm về logic mà phần tử sẽ được thêm vào. Phần tử nhỏ nhất xuất hiện trước.

SynchronousQueue<E> - hàng đợi hoạt động theo nguyên tắc FIFO (vào trước ra trước). Mỗi hoạt động chèn sẽ chặn luồng “Nhà sản xuất” cho đến khi luồng “Người tiêu dùng” kéo phần tử từ hàng đợi và ngược lại, “Người tiêu dùng” sẽ đợi cho đến khi “Nhà sản xuất” chèn phần tử.

BlockingDeque<E> là giao diện mô tả các phương thức bổ sung cho hàng đợi chặn hai chiều. Dữ liệu có thể được chèn và lấy ra từ cả hai phía của hàng đợi.

LinkedBlockingDeque<E> là hàng đợi chặn hai chiều trên các nút được liên kết, được triển khai dưới dạng danh sách hai chiều đơn giản với một khóa. Kích thước hàng đợi được đặt thông qua hàm tạo và bằng Integer.MAX_VALUE theo mặc định.

TransferQueue<E> - giao diện thú vị ở chỗ khi một phần tử được thêm vào hàng đợi, có thể chặn luồng Producer đang chèn cho đến khi một luồng Người tiêu dùng khác kéo phần tử đó khỏi hàng đợi. Bạn cũng có thể thêm kiểm tra thời gian chờ cụ thể hoặc đặt kiểm tra cho Consumer s đang chờ xử lý . Kết quả là, chúng tôi nhận được một cơ chế truyền dữ liệu với sự hỗ trợ cho các thông báo không đồng bộ và đồng bộ.

LinkedTransferQueue<E> là một triển khai của TransferQueue dựa trên Hàng đợi kép với thuật toán Slack. Sử dụng nhiều CAS (xem ở trên) và đỗ luồng khi không hoạt động.