๋น„์ฐจ๋‹จ ๋Œ€๊ธฐ์—ด

์—ฐ๊ฒฐ๋œ ๋…ธ๋“œ์—์„œ ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•˜๊ณ  ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๋น„์ฐจ๋‹จ Queue ๊ตฌํ˜„.

ConcurrentLinkedQueue<E> - ๊ฐ€๋น„์ง€ ์ˆ˜์ง‘๊ธฐ์™€ ํ•จ๊ป˜ ์ž‘๋™ํ•˜๋„๋ก ์กฐ์ •๋œ ๋Œ€๊ธฐ ์—†๋Š” ์•Œ๊ณ ๋ฆฌ์ฆ˜์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ด ์•Œ๊ณ ๋ฆฌ์ฆ˜์€ CAS๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ตฌ์ถ•๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ๋งค์šฐ ํšจ์œจ์ ์ด๊ณ  ๋งค์šฐ ๋น ๋ฆ…๋‹ˆ๋‹ค. size() ๋ฉ”์„œ๋“œ๋Š”์˜ค๋žซ๋™์•ˆ ์‹คํ–‰๋  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ํ•ญ์ƒ ๋Œ์–ด์˜ค์ง€ ์•Š๋Š” ๊ฒƒ์ด ๊ฐ€์žฅ ์ข‹์Šต๋‹ˆ๋‹ค.

ConcurrentLinkedDeque<E> - Deque๋Š” ์ด์ค‘ ์ข…๋ฃŒ ๋Œ€๊ธฐ์—ด์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค. ์ด๋Š” ์–‘์ชฝ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๊ฐ€ํ•˜๊ณ  ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Œ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ์ด ํด๋ž˜์Šค๋Š” FIFO(First In First Out) ๋ฐ LIFO(Last In First Out)์˜ ๋‘ ๊ฐ€์ง€ ์ž‘๋™ ๋ชจ๋“œ๋ฅผ ๋ชจ๋‘ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค.

์‹ค์ œ๋กœ LIFO๊ฐ€ ์ ˆ๋Œ€์ ์œผ๋กœ ํ•„์š”ํ•œ ๊ฒฝ์šฐ ConcurrentLinkedDeque๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๋…ธ๋“œ์˜ ์–‘๋ฐฉํ–ฅ์„ฑ์œผ๋กœ ์ธํ•ด ์ด ํด๋ž˜์Šค๋Š” ConcurrentLinkedQueue ์— ๋น„ํ•ด ์„ฑ๋Šฅ์ด ์ ˆ๋ฐ˜์œผ๋กœ ๋–จ์–ด์ง€๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค .

import java.util.concurrent.ConcurrentLinkedQueue;

public class  ConcurrentLinkedQueueExample {
   public static void main(String[] args) {
       ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

       Thread producer = new Thread(new Producer(queue));
       Thread consumer = new Thread(new Consumer(queue));

       producer.start();
       consumer.start();
   }
}

class Producer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Producer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }
   public void run() {
       System.out.println("Class for adding items to the queue");
       try {
           for (int i = 1; i < 5; i++) {
               queue.add("Item #" + i);
               System.out.println("Added: Item #" + i);
               Thread.sleep(300);
           }
       } catch (InterruptedException ex) {
           ex.printStackTrace();
           Thread.currentThread().interrupt();
       }
   }
}

class Consumer implements Runnable {

   ConcurrentLinkedQueue<String> queue;
   Consumer(ConcurrentLinkedQueue<String> queue){
       this.queue = queue;
   }

   public void run() {
       String str;
       System.out.println("Class for getting items from the queue");
       for (int x = 0; x < 5; x++) {
           while ((str = queue.poll()) != null) {
               System.out.println("Pulled out: " + str);
           }
           try {
               Thread.sleep(600);
           } catch (InterruptedException ex) {
               ex.printStackTrace();
               Thread.currentThread().interrupt();
           }
       }
   }
}

์ฐจ๋‹จ ๋Œ€๊ธฐ์—ด

BlockingQueue<E> ์ธํ„ฐํŽ˜์ด์Šค - ๋ฐ์ดํ„ฐ๊ฐ€ ๋งŽ์œผ๋ฉด ConcurrentLinkedQueue ๋กœ๋Š” ๋ถ€์กฑํ•ฉ๋‹ˆ๋‹ค.

์Šค๋ ˆ๋“œ๊ฐ€ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜์ง€ ๋ชปํ•˜๋ฉด OutOfMemmoryException ์„ ์‰ฝ๊ฒŒ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค . ์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋„๋ก ๋Œ€๊ธฐ์—ด ๋ฐ ์กฐ๊ฑด๋ถ€ ์ž ๊ธˆ์„ ์ฑ„์šฐ๊ณ  ์ž‘์—…ํ•˜๋Š” ๋‹ค์–‘ํ•œ ๋ฐฉ๋ฒ•์ด ์žˆ๋Š” ์ž‘์—…์„ ์œ„ํ•œ BlockingQueue๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

BlockingQueue๋Š” null ์š”์†Œ๋ฅผ ์ธ์‹ํ•˜์ง€ ๋ชปํ•˜๊ณ ์ด๋Ÿฌํ•œ ์š”์†Œ๋ฅผ ์ถ”๊ฐ€ํ•˜๊ฑฐ๋‚˜ ๊ฐ€์ ธ์˜ค๋ ค๊ณ  ํ•  ๋•Œ NullPointerException์„ ๋ฐœ์ƒ์‹œํ‚ต๋‹ˆ๋‹ค. poll ๋ฉ”์„œ๋“œ๋Š” ์ œํ•œ ์‹œ๊ฐ„ ๋‚ด์— ๋Œ€๊ธฐ์—ด์— ์š”์†Œ๊ฐ€ ๋ฐฐ์น˜๋˜์ง€ ์•Š์€ ๊ฒฝ์šฐ null ์š”์†Œ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

BlockingQueue<E> ๊ตฌํ˜„

๊ฐ BlockingQueue ๊ตฌํ˜„์„ ์ž์„ธํžˆ ์‚ดํŽด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค .

ArrayBlockingQueue<E>๋Š” ํด๋ž˜์‹ ๋ง ๋ฒ„ํผ์— ๊ตฌ์ถ•๋œ ์ฐจ๋‹จ ๋Œ€๊ธฐ์—ด ํด๋ž˜์Šค์ž…๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์—์„œ ์ž๋ฌผ์‡ ์˜ "์ •์ง์„ฑ"์„ ๊ด€๋ฆฌํ•  ๊ธฐํšŒ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. fair=false(๊ธฐ๋ณธ๊ฐ’)์ด๋ฉด ์Šค๋ ˆ๋“œ ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

DelayQueue<E extends Delayed>๋Š” Delayed ์ธํ„ฐํŽ˜์ด์Šค ์˜ getDelay ๋ฉ”์„œ๋“œ ๋ฅผ ํ†ตํ•ด ๊ฐ ์š”์†Œ์— ์ •์˜๋œ ํŠน์ • ์ง€์—ฐ ์ดํ›„์—๋งŒ ๋Œ€๊ธฐ์—ด์—์„œ ์š”์†Œ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” ํด๋ž˜์Šค์ž…๋‹ˆ๋‹ค.

LinkedBlockingQueue<E> ๋Š” "2๊ฐœ์˜ ์ž ๊ธˆ ๋Œ€๊ธฐ์—ด" ์•Œ๊ณ ๋ฆฌ์ฆ˜์œผ๋กœ ๊ตฌํ˜„๋œ ์—ฐ๊ฒฐ๋œ ๋…ธ๋“œ์˜ ์ฐจ๋‹จ ๋Œ€๊ธฐ์—ด์ž…๋‹ˆ๋‹ค. ์ฒซ ๋ฒˆ์งธ ์ž ๊ธˆ์€ ์ถ”๊ฐ€์šฉ์ด๊ณ  ๋‘ ๋ฒˆ์งธ ์ž ๊ธˆ์€ ๋Œ€๊ธฐ์—ด์—์„œ ์š”์†Œ๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•œ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์ž ๊ธˆ์œผ๋กœ ์ธํ•ด ArrayBlockingQueue ์— ๋น„ํ•ด์ด ํด๋ž˜์Šค๋Š” ์„ฑ๋Šฅ์ด ๋†’์ง€๋งŒ ๋” ๋งŽ์€ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ๋Œ€๊ธฐ์—ด ํฌ๊ธฐ๋Š” ์ƒ์„ฑ์ž๋ฅผ ํ†ตํ•ด ์„ค์ •๋˜๋ฉฐ ๊ธฐ๋ณธ์ ์œผ๋กœ Integer.MAX_VALUE์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.

PriorityBlockingQueue<E>๋Š” PriorityQueue ์— ๋Œ€ํ•œ ๋‹ค์ค‘ ์Šค๋ ˆ๋“œ ๋ž˜ํผ์ž…๋‹ˆ๋‹ค. Comparator๋Š” ์š”์†Œ๊ฐ€ ์ถ”๊ฐ€๋˜๋Š” ๋…ผ๋ฆฌ๋ฅผ ๋‹ด๋‹นํ•ฉ๋‹ˆ๋‹ค. ๊ฐ€์žฅ ์ž‘์€ ์š”์†Œ๊ฐ€ ๋จผ์ € ๋‚˜์˜ต๋‹ˆ๋‹ค.

SynchronousQueue<E> - ๋Œ€๊ธฐ์—ด์€ FIFO(์„ ์ž…์„ ์ถœ) ์›์น™์— ๋”ฐ๋ผ ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค. ๊ฐ ์‚ฝ์ž… ์ž‘์—…์€ "์†Œ๋น„์ž" ์Šค๋ ˆ๋“œ๊ฐ€ ๋Œ€๊ธฐ์—ด์—์„œ ์š”์†Œ๋ฅผ ๊ฐ€์ ธ์˜ฌ ๋•Œ๊นŒ์ง€ "์ƒ์‚ฐ์ž" ์Šค๋ ˆ๋“œ๋ฅผ ์ฐจ๋‹จํ•˜๊ณ  ๊ทธ ๋ฐ˜๋Œ€์˜ ๊ฒฝ์šฐ๋„ ๋งˆ์ฐฌ๊ฐ€์ง€์ž…๋‹ˆ๋‹ค. "์†Œ๋น„์ž"๋Š” "์ƒ์‚ฐ์ž"๊ฐ€ ์š”์†Œ๋ฅผ ์‚ฝ์ž…ํ•  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•ฉ๋‹ˆ๋‹ค.

BlockingDeque<E>๋Š” ์–‘๋ฐฉํ–ฅ ์ฐจ๋‹จ ๋Œ€๊ธฐ์—ด์— ๋Œ€ํ•œ ์ถ”๊ฐ€ ๋ฉ”์„œ๋“œ๋ฅผ ์„ค๋ช…ํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค์ž…๋‹ˆ๋‹ค. ๋Œ€๊ธฐ์—ด์˜ ์–‘์ชฝ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฝ์ž…ํ•˜๊ณ  ๊บผ๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

LinkedBlockingDeque<E>๋Š” ํ•˜๋‚˜์˜ ์ž ๊ธˆ์ด ์žˆ๋Š” ๊ฐ„๋‹จํ•œ ์–‘๋ฐฉํ–ฅ ๋ชฉ๋ก์œผ๋กœ ๊ตฌํ˜„๋œ ์—ฐ๊ฒฐ๋œ ๋…ธ๋“œ์˜ ์–‘๋ฐฉํ–ฅ ์ฐจ๋‹จ ๋Œ€๊ธฐ์—ด์ž…๋‹ˆ๋‹ค. ๋Œ€๊ธฐ์—ด ํฌ๊ธฐ๋Š” ์ƒ์„ฑ์ž๋ฅผ ํ†ตํ•ด ์„ค์ •๋˜๋ฉฐ ๊ธฐ๋ณธ์ ์œผ๋กœ Integer.MAX_VALUE์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.

TransferQueue<E> - ์š”์†Œ๊ฐ€ ๋Œ€๊ธฐ์—ด์— ์ถ”๊ฐ€๋  ๋•Œ๋‹ค๋ฅธ ์†Œ๋น„์ž ์Šค๋ ˆ๋“œ๊ฐ€ ๋Œ€๊ธฐ์—ด์—์„œ ์š”์†Œ๋ฅผ ๊ฐ€์ ธ์˜ฌ ๋•Œ๊นŒ์ง€ ์‚ฝ์ž…ํ•˜๋Š” ์ƒ์‚ฐ์ž ์Šค๋ ˆ๋“œ๋ฅผ ์ฐจ๋‹จํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ์—์„œ ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ํฅ๋ฏธ๋กญ์Šต๋‹ˆ๋‹ค. ํŠน์ • ์‹œ๊ฐ„ ์ œํ•œ์— ๋Œ€ํ•œ ํ™•์ธ์„ ์ถ”๊ฐ€ํ•˜๊ฑฐ๋‚˜ ๋Œ€๊ธฐ ์ค‘์ธ Consumer ์— ๋Œ€ํ•œ ํ™•์ธ์„ ์„ค์ •ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฒฐ๊ณผ์ ์œผ๋กœ ๋น„๋™๊ธฐ ๋ฐ ๋™๊ธฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ง€์›ํ•˜๋Š” ๋ฐ์ดํ„ฐ ์ „์†ก ๋ฉ”์ปค๋‹ˆ์ฆ˜์„ ์–ป์Šต๋‹ˆ๋‹ค.

LinkedTransferQueue<E>๋Š” Dual Queues with Slack ์•Œ๊ณ ๋ฆฌ์ฆ˜์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•˜๋Š” TransferQueue์˜ ๊ตฌํ˜„์ž…๋‹ˆ๋‹ค์œ ํœด ์ƒํƒœ์ผ ๋•Œ CAS(์œ„ ์ฐธ์กฐ) ๋ฐ ์Šค๋ ˆ๋“œ ํŒŒํ‚น์„ ๋งŽ์ด ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.