ลองหา เมธอด WorkStealingPool ใหม่ซึ่งเตรียมExecutorServiceให้เรา

พูลเธรดนี้พิเศษ พฤติกรรมของมันขึ้นอยู่กับแนวคิดของการ "ขโมย" งาน

งานอยู่ในคิวและแจกจ่ายระหว่างโปรเซสเซอร์ แต่ถ้าโปรเซสเซอร์ไม่ว่าง โปรเซสเซอร์ฟรีตัวอื่นสามารถขโมยงานจากโปรเซสเซอร์นั้นและดำเนินการได้ รูปแบบนี้ถูกนำมาใช้ใน Java เพื่อลดความขัดแย้งในแอปพลิเคชันแบบมัลติเธรด มันถูกสร้างขึ้นบนเฟรมเวิร์กfork/join

ส้อม / เข้าร่วม

ใน เฟรมเวิร์ก fork/joinงานจะถูกแยกย่อยแบบเรียกซ้ำ นั่นคือ แบ่งงานออกเป็นงานย่อย จากนั้นงานย่อยจะถูกดำเนินการแยกกัน และผลลัพธ์ของงานย่อยจะถูกรวมเข้าด้วยกันเพื่อสร้างผลลัพธ์ของงานดั้งเดิม

วิธี การforkเริ่มงานแบบอะซิงโครนัสในบางเธรด และ วิธีการ เข้าร่วมช่วยให้คุณรอให้งานนี้เสร็จสิ้น

ใหม่WorkStealingPool

วิธี การ newWorkStealingPoolมีการใช้งานสองแบบ:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

ตั้งแต่เริ่มต้น เราทราบว่าภายใต้ประทุน เราไม่ได้เรียกตัวสร้างThreadPoolExecutor ที่นี่เรากำลังทำงานกับเอนทิตีForkJoinPool เช่นเดียวกับThreadPoolExecutorเป็นการใช้งานAbstractExecutorService

เรามี 2 วิธีให้เลือก ในตอนแรกเราเองระบุว่าเราต้องการเห็นความเท่าเทียมในระดับใด หากเราไม่ระบุค่านี้ ความขนานของพูลของเราจะเท่ากับจำนวนแกนประมวลผลที่มีในเครื่องเสมือน Java

มันยังคงต้องเข้าใจว่ามันทำงานอย่างไรในทางปฏิบัติ:

Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

เราสร้างงาน 10 รายการที่แสดงสถานะการเสร็จสิ้นของตนเอง หลังจากนั้น เราเริ่มงานทั้งหมดโดยใช้เมธอดinvokeAll

ผลลัพธ์เมื่อดำเนินการ 10 งานใน 10 เธรดในพูล:

คำขอของผู้ใช้ที่ประมวลผล #9 บนเธรด ForkJoinPool-1-worker-10
คำขอของผู้ใช้ที่ประมวลผล #4 บนเธรด ForkJoinPool-1-worker-5 คำขอของ
ผู้ใช้ที่ประมวลผล #7 บนเธรด ForkJoinPool-1-worker-
8 เธรด 1-คนงาน-2
ประมวลผลคำขอของผู้ใช้ #2 บนเธรด ForkJoinPool-1-คนงาน-3
ประมวลผลคำขอของผู้ใช้ #3 บนเธรด ForkJoinPool-1-คนงาน-4 ประมวลผล
คำขอของผู้ใช้ #6 บนเธรด ForkJoinPool-1-คนงาน-7
ผู้ใช้ที่ประมวลผล คำขอ #0 บนเธรด ForkJoinPool-1-worker-1
คำขอของผู้ใช้ที่ประมวลผล #5 บนเธรด ForkJoinPool-1-worker-6 คำขอของ
ผู้ใช้ที่ประมวลผล #8 บนเธรด ForkJoinPool-1-worker-9

เราเห็นว่าหลังจากสร้างคิวแล้ว เธรดจะใช้งานเพื่อดำเนินการ คุณยังสามารถตรวจสอบได้ว่าจะกระจายงาน 20 งานในกลุ่ม 10 เธรดอย่างไร

คำขอของผู้ใช้ที่ดำเนินการ #3 บนเธรด ForkJoinPool-1-worker-4
คำขอของผู้ใช้ที่ประมวลผล #7 บนเธรด ForkJoinPool-1-worker-8 คำขอของ
ผู้ใช้ที่ประมวลผล #2 บนเธรด ForkJoinPool-1-worker-
3 เธรด 1-คนงาน-5
ประมวลผลคำขอของผู้ใช้ #1 บนเธรด ForkJoinPool-1-คนงาน-2
ประมวลผลคำขอของผู้ใช้ #5 บนเธรด ForkJoinPool-1-คนงาน-6 ประมวลผล
คำขอของผู้ใช้ #8 บนเธรด ForkJoinPool-1-คนงาน-9 ผู้
ใช้ที่ประมวล ผล คำขอ #9 บน ForkJoinPool-1-worker-10 เธรด
ประมวลผลคำขอของผู้ใช้ #0 บนเธรด ForkJoinPool-1-worker-1
ประมวลผลคำขอของผู้ใช้ #6 บนเธรด ForkJoinPool-1-worker-7 คำขอของ
ผู้ใช้ที่ประมวลผล #10 บน ForkJoinPool-1- เธรดคนงาน-9
คำขอของผู้ใช้ที่ประมวลผล #12 บนเธรด ForkJoinPool-1-worker-1
คำขอของผู้ใช้ที่
ประมวลผล #13 บนเธรด ForkJoinPool-1-worker-8 คำขอของผู้ใช้
ที่ประมวลผล #11 บนเธรด ForkJoinPool-1-worker-6 เธรด 1-คนงาน-8
ประมวลผลคำขอของผู้ใช้ #14 บนเธรด ForkJoinPool-1-คนงาน-1
ประมวลผลคำขอของผู้ใช้ #17 บนเธรด ForkJoinPool-1-คนงาน-6 ประมวลผล
คำขอของผู้ใช้ #16 บนเธรด ForkJoinPool-1-คนงาน-7 ผู้ใช้
ที่ประมวลผล คำขอ #19 บนเธรด ForkJoinPool
-1-worker-6

จากผลลัพธ์ เราจะเห็นว่าบางเธรดจัดการเพื่อทำงานหลายอย่างให้เสร็จ ( ForkJoinPool-1-worker-6เสร็จ 4 งาน) ในขณะที่บางเธรดทำเพียงงานเดียว ( ForkJoinPool-1-worker-2 ) หากมีการเพิ่มการหน่วงเวลา 1 วินาทีในการดำเนินการตาม วิธี การโทรรูปภาพจะเปลี่ยนไป

Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

เพื่อประโยชน์ในการทดลอง ลองเรียกใช้รหัสเดียวกันบนเครื่องอื่น ผลลัพธ์ที่ได้:

คำขอของผู้ใช้ที่ประมวลผล #2 บนเธรด ForkJoinPool-1-worker-23
คำขอของผู้ใช้ที่ประมวลผล #7 บนเธรด
ForkJoinPool-1-worker-31 คำขอของผู้ใช้ที่
ประมวลผล #4 บนเธรด ForkJoinPool-1-worker-27 เธรด 1-คนงาน-13 ประมวลผล
คำขอของผู้ใช้ #0 บนเธรด ForkJoinPool-1-คนงาน-19 ประมวลผล
คำขอของผู้ใช้ #8 บนเธรด ForkJoinPool-1-คนงาน-3 ประมวลผล
คำขอของผู้ใช้ #9 บนเธรด ForkJoinPool-1-คนงาน-21 ประมวลผล
ผู้ใช้ คำขอ #6 บนเธรด ForkJoinPool-1-worker-17
คำขอของผู้ใช้ที่ประมวลผล #3 บนเธรด ForkJoinPool-1-worker-9 คำขอของ
ผู้ใช้ที่ประมวลผล #1 บนเธรด ForkJoinPool-1-worker-5 คำขอ
ของผู้ใช้ที่ประมวลผล #12 บน ForkJoinPool-1- เธรดคนงาน-23
คำขอของผู้ใช้ที่ประมวลผล #15 บนเธรด ForkJoinPool-1-worker-19
คำขอของผู้ใช้ที่ประมวลผล #14 บนเธรด ForkJoinPool-1-worker-27 คำขอของ
ผู้ใช้ที่ประมวลผล #11 บนเธรด ForkJoinPool-1-worker-3 คำขอของ
ผู้ใช้ที่ประมวลผล #13 บน ForkJoinPool- เธรด 1-คนงาน-13
ประมวลผลคำขอของผู้ใช้ #10 บนเธรด ForkJoinPool-1-คนงาน-31
ประมวลผลคำขอของผู้ใช้ #18 บนเธรด ForkJoinPool-1-คนงาน-5 ประมวลผล
คำขอของผู้ใช้ #16 บนเธรด ForkJoinPool-1-คนงาน-9
ผู้ใช้ที่ประมวลผล คำขอ #17 บนเธรด ForkJoinPool-1-worker-21
ดำเนินการคำขอของผู้ใช้ #19 ในเธรด ForkJoinPool-1-worker-17

ในผลลัพธ์นี้ เป็นที่น่าสังเกตว่าเรา "ขอ" เธรดในพูล ยิ่งไปกว่านั้น ชื่อของเธรดผู้ปฏิบัติงานไม่ได้เปลี่ยนจากหนึ่งถึงสิบ แต่บางครั้งก็สูงกว่าสิบแทน เมื่อดูชื่อเฉพาะ เราจะเห็นว่ามีคนงานสิบคนจริงๆ (3, 5, 9, 13, 17, 19, 21, 23, 27 และ 31) ที่นี่ค่อนข้างสมเหตุสมผลที่จะถามว่าทำไมสิ่งนี้ถึงเกิดขึ้น? เมื่อใดก็ตามที่คุณไม่เข้าใจสิ่งที่เกิดขึ้น ให้ใช้ดีบักเกอร์

นี่คือสิ่งที่เราจะทำ มาโยนกันเถอะผู้บริหารบริการคัดค้านForkJoinPool :

final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

เราจะใช้การดำเนินการประเมินนิพจน์เพื่อตรวจสอบวัตถุนี้หลังจากเรียกใช้เมธอดinvokeAll ในการทำเช่นนี้ หลังจาก เมธอด invokeAllให้เพิ่มคำสั่งใดๆ เช่น sout ที่ว่างเปล่า และตั้งค่าเบรกพอยต์

เราจะเห็นว่าพูลมี 10 เธรด แต่ขนาดของอาร์เรย์ของเธรดผู้ปฏิบัติงานคือ 32 แปลก แต่ก็โอเค มาขุดกันต่อ เมื่อสร้างพูล ให้ลองตั้งค่าระดับความขนานเป็นมากกว่า 32 เช่น 40

ExecutorService executorService = Executors.newWorkStealingPool(40);

ในดีบักเกอร์ ลองดูที่วัตถุ forkJoinPool อีกครั้ง

ตอนนี้ขนาดของอาร์เรย์ของเธรดผู้ปฏิบัติงานคือ 128 เราถือว่านี่เป็นหนึ่งในการปรับให้เหมาะสมภายในของ JVM ลองค้นหาในรหัสของ JDK (openjdk-14):

อย่างที่เราสงสัย: ขนาดของอาร์เรย์ของเธรดผู้ปฏิบัติงานคำนวณโดยดำเนินการจัดการระดับบิตบนค่าความขนาน เราไม่จำเป็นต้องพยายามหาว่าเกิดอะไรขึ้นที่นี่ แค่รู้ว่ามีการเพิ่มประสิทธิภาพดังกล่าวก็เพียงพอแล้ว

อีกแง่มุมที่น่าสนใจในตัวอย่างของเราคือการใช้เมธอดinvokeAll เป็นที่น่าสังเกตว่า เมธอด invokeAllสามารถส่งคืนผลลัพธ์หรือมากกว่ารายการผลลัพธ์ (ในกรณีของเรา a List<Future<Void>>)ซึ่งเราสามารถค้นหาผลลัพธ์ของแต่ละงานได้

var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

บริการชนิดพิเศษและเธรดพูลสามารถใช้ในงานที่มีระดับของการทำงานพร้อมกันที่คาดเดาได้หรืออย่างน้อยก็โดยนัย