การแนะนำ
ดังนั้นเราจึงรู้ว่า Java มีเธรด คุณสามารถอ่านเกี่ยวกับเรื่องนี้ได้ในบทวิจารณ์เรื่องBetter together: Java and the Thread class ส่วนที่ 1 — เธรดของการดำเนินการ
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
อย่างที่คุณเห็น โค้ดสำหรับเริ่มงานเป็นเรื่องปกติ แต่เราต้องทำซ้ำสำหรับงานใหม่ วิธีแก้ไขวิธีหนึ่งคือใส่ไว้ในวิธีอื่นexecute(Runnable runnable)
เช่น แต่ผู้สร้างของ Java ได้พิจารณาถึงสภาพของเราและมาพร้อมกับExecutor
อินเทอร์เฟซ:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
โค้ดนี้กระชับขึ้นอย่างเห็นได้ชัด: ตอนนี้เราเพียงแค่เขียนโค้ดเพื่อเริ่มRunnable
ต้นเธรด ที่ดีใช่มั้ย แต่นี่เป็นเพียงจุดเริ่มต้นเท่านั้น: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
อินเทอร์เฟซมีExecutorService
อินเทอร์เฟซย่อย Javadoc สำหรับอินเตอร์เฟสนี้ระบุว่า an ExecutorService
อธิบายเฉพาะExecutor
ที่มีวิธีการปิดระบบExecutor
. นอกจากนี้ยังช่วยให้สามารถjava.util.concurrent.Future
ติดตามกระบวนการดำเนินการได้ ก่อนหน้านี้ในBetter together: Java และคลาส Thread ตอนที่ IV — Callable, Future และผองเพื่อนเราได้ทบทวนความสามารถFuture
ของ หากคุณลืมหรือไม่เคยอ่าน ฉันขอแนะนำให้คุณรีเฟรชหน่วยความจำของคุณ ;) Javadoc พูดอะไรอีก มันบอกเราว่าเรามีโรงงานพิเศษjava.util.concurrent.Executors
ที่ให้เราสร้างการใช้งานเริ่มต้นExecutorService
ของ
ExecutorService
มาทบทวนกัน เราต้องExecutor
ดำเนินการ (เช่นเรียกexecute()
ใช้) งานบางอย่างบนเธรด และโค้ดที่สร้างเธรดนั้นถูกซ่อนไว้ไม่ให้เราทราบ เรามีExecutorService
— เฉพาะExecutor
ที่มีหลายตัวเลือกสำหรับการควบคุมความคืบหน้า และเรามี โรงงานที่ให้ เราExecutors
สร้าง ExecutorService
ตอนนี้มาทำเอง:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
คุณจะเห็นว่าเราระบุกลุ่มเธรดคงที่ซึ่งมีขนาด 2 จากนั้นเราจะส่งงานไปยังกลุ่มทีละรายการ แต่ละงานส่งคืนString
ชื่อเธรด ( currentThread().GetName()
) สิ่งสำคัญคือต้องปิดส่วนExecutorService
ท้ายสุด เพราะไม่เช่นนั้นโปรแกรมของเราจะไม่สิ้นสุด โรงงานExecutors
มีวิธีการโรงงานเพิ่มเติม ตัวอย่างเช่น เราสามารถสร้างพูลที่ประกอบด้วยเธรดเดียว ( newSingleThreadExecutor
) หรือพูลที่มีแคช ( newCachedThreadPool
) ซึ่งเธรดจะถูกลบออกหลังจากที่ไม่ได้ใช้งานเป็นเวลา 1 นาที ในความเป็นจริง สิ่งเหล่านี้ExecutorService
ได้รับการสนับสนุนโดยคิวการบล็อกซึ่งวางงานและจากงานใดที่ถูกดำเนินการ ดูข้อมูลเพิ่มเติมเกี่ยวกับการบล็อกคิวได้ในวิดีโอนี้ คุณยังสามารถอ่านสิ่งนี้รีวิวเกี่ยวกับ BlockingQueue และตรวจดูคำตอบสำหรับคำถาม"เมื่อต้องการ LinkedBlockingQueue มากกว่า ArrayBlockingQueue" กล่าวอย่างง่ายที่สุดBlockingQueue
การบล็อกเธรดในสองกรณี:
- เธรดพยายามรับไอเท็มจากคิวว่าง
- เธรดพยายามใส่รายการลงในคิวเต็ม
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
หรือ
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ดังที่เราเห็น การใช้งานExecutorService
ถูกสร้างขึ้นภายในวิธีการของโรงงาน และส่วนใหญ่เรากำลังพูดThreadPoolExecutor
ถึง เฉพาะพารามิเตอร์ที่มีผลต่องานเท่านั้นที่มีการเปลี่ยนแปลง 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
อย่างที่เราเห็นก่อนหน้านี้ThreadPoolExecutor
คือสิ่งที่มักจะสร้างขึ้นในวิธีการของโรงงาน ฟังก์ชันการทำงานได้รับผลกระทบจากอาร์กิวเมนต์ที่เราส่งผ่านเป็นจำนวนเธรดสูงสุดและต่ำสุด รวมถึงประเภทของคิวที่ใช้ แต่java.util.concurrent.BlockingQueue
สามารถใช้อินเทอร์ เฟซใด ๆ ก็ได้ เมื่อพูดถึงThreadPoolExecutor
เราควรพูดถึงคุณสมบัติที่น่าสนใจบางอย่าง ตัวอย่างเช่น คุณไม่สามารถส่งงานไปยัง a ThreadPoolExecutor
หากไม่มีพื้นที่ว่าง:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
รหัสนี้จะผิดพลาดโดยมีข้อผิดพลาดดังนี้:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
กล่าวอีกนัยหนึ่ง คือtask
ไม่สามารถส่งได้ เพราะSynchronousQueue
ได้รับการออกแบบเพื่อให้ประกอบด้วยองค์ประกอบเดียวจริง ๆ และไม่อนุญาตให้เราใส่อะไรลงไปอีก เราจะเห็นว่าเรามีศูนย์queued tasks
("งานคิว = 0") ที่นี่ แต่ก็ไม่มีอะไรแปลกเกี่ยวกับเรื่องนี้ เพราะนี่คือคุณสมบัติพิเศษของSynchronousQueue
ซึ่งอันที่จริงแล้วคือคิว 1 องค์ประกอบที่ว่างเปล่าเสมอ! เมื่อเธรดหนึ่งใส่องค์ประกอบในคิว มันจะรอจนกว่าเธรดอื่นจะนำองค์ประกอบจากคิว ดังนั้น เราสามารถแทนที่ด้วยnew LinkedBlockingQueue<>(1)
และข้อผิดพลาดจะเปลี่ยนเป็น now queued tasks = 1
show เนื่องจากคิวเป็นเพียง 1 องค์ประกอบ เราจึงไม่สามารถเพิ่มองค์ประกอบที่สองได้ และนั่นคือสิ่งที่ทำให้โปรแกรมล้มเหลว การสนทนาของเราต่อไปในคิวเป็นที่น่าสังเกตว่าThreadPoolExecutor
คลาสมีวิธีเพิ่มเติมสำหรับการให้บริการคิว ตัวอย่างเช่นthreadPoolExecutor.purge()
เมธอดจะลบงานที่ยกเลิกทั้งหมดออกจากคิวเพื่อเพิ่มพื้นที่ว่างในคิว ฟังก์ชันเกี่ยวกับคิวที่น่าสนใจอีกอย่างคือตัวจัดการสำหรับงานที่ถูกปฏิเสธ:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
ในตัวอย่างนี้ ตัวจัดการของเราจะแสดงRejected
ทุกครั้งที่งานในคิวถูกปฏิเสธ สะดวกใช่มั้ย นอกจากนี้ยังThreadPoolExecutor
มีคลาสย่อยที่น่าสนใจ: ScheduledThreadPoolExecutor
ซึ่งเป็นScheduledExecutorService
. ให้ความสามารถในการทำงานตามตัวจับเวลา
บริการผู้ดำเนินการตามกำหนดเวลา
ScheduledExecutorService
(ซึ่งเป็นประเภทExecutorService
) ให้เราทำงานตามกำหนดเวลา ลองดูตัวอย่าง:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
ทุกอย่างเรียบง่ายที่นี่ ส่งงานแล้วเราจะได้รับไฟล์java.util.concurrent.ScheduledFuture
. ตารางเวลาอาจมีประโยชน์ในสถานการณ์ต่อไปนี้ด้วย:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
ที่นี่ เราส่งRunnable
งานเพื่อดำเนินการที่ความถี่คงที่ ("อัตราคงที่") โดยมีการหน่วงเวลาเริ่มต้นที่แน่นอน ในกรณีนี้ หลังจาก 1 วินาที งานจะเริ่มดำเนินการทุกๆ 2 วินาที มีตัวเลือกที่คล้ายกัน:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
แต่ในกรณีนี้ งานจะถูกดำเนินการโดยมีช่วงเวลาเฉพาะระหว่างการดำเนินการแต่ละครั้ง นั่นคือtask
จะดำเนินการหลังจาก 1 วินาที จากนั้น ทันทีที่เสร็จสิ้น 2 วินาทีจะผ่านไป จากนั้นงานใหม่ก็จะเริ่มขึ้น นี่คือแหล่งข้อมูลเพิ่มเติมบางส่วนในหัวข้อนี้:
- บทนำเกี่ยวกับเธรดพูลใน Java
- รู้เบื้องต้นเกี่ยวกับ Thread Pools ใน Java
- Java Multithreading Steeplechase: การยกเลิกงานใน Executors
- การใช้ Java Executors สำหรับงานเบื้องหลัง

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
นอกจากเธรดพูลข้างต้นแล้ว ยังมีอีกหนึ่ง เราสามารถพูดได้ว่ามันพิเศษเล็กน้อย เรียกว่าแหล่งขโมยงาน กล่าวโดยย่อ การขโมยงานเป็นอัลกอริทึมที่เธรดที่ไม่ได้ใช้งานเริ่มรับงานจากเธรดอื่นหรืองานจากคิวที่ใช้ร่วมกัน ลองดูตัวอย่าง:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
ถ้าเราเรียกใช้รหัสนี้ExecutorService
จะสร้าง 5 เธรดให้เรา เนื่องจากแต่ละเธรดจะถูกวางในคิวรอสำหรับวัตถุล็อค เราได้หามอนิเตอร์และล็อกในBetter together: Java และคลาสเธรดแล้ว ส่วนที่ II — การซิงโครไนซ์ ทีนี้มาแทนที่Executors.newCachedThreadPool()
ด้วยExecutors.newWorkStealingPool()
. อะไรจะเปลี่ยนไป? เราจะเห็นว่างานของเราดำเนินการน้อยกว่า 5 เธรด โปรดจำไว้ว่าCachedThreadPool
สร้างเธรดสำหรับแต่ละงานหรือไม่ นั่นเป็นเพราะว่าwait()
เธรดถูกบล็อก งานที่ตามมาต้องการทำให้เสร็จ และเธรดใหม่ถูกสร้างขึ้นสำหรับเธรดในพูล ด้วยกลุ่มการขโมย เธรดจะไม่ว่างตลอดไป พวกเขาเริ่มปฏิบัติภารกิจของเพื่อนบ้าน อะไรทำให้WorkStealingPool
แตกต่างจากเธรดพูลอื่นๆ ความจริงที่ว่าขลังForkJoinPool
อาศัยอยู่ในนั้น:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
จริงๆแล้วมีความแตกต่างอีกอย่างหนึ่ง ตามค่าดีฟอลต์ เธรดที่สร้างขึ้นสำหรับ a จะเป็น เธรดForkJoinPool
daemon ซึ่งแตกต่างจากเธรดที่สร้างผ่าน onrdinary ThreadPool
โดยทั่วไป คุณควรจำเธรด daemon เพราะตัวอย่างเช่นCompletableFuture
ใช้เธรด daemon เว้นแต่คุณจะระบุของคุณเองThreadFactory
ที่สร้างเธรดที่ไม่ใช่ daemon นี่คือเซอร์ไพรส์ที่อาจแฝงตัวอยู่ในที่ที่คาดไม่ถึง! :)
ForkJinPool
ในส่วนนี้ เราจะพูดถึงอีกครั้งเกี่ยวกับForkJoinPool
(เรียกอีกอย่างว่า fork/join framework) ซึ่งอาศัยอยู่ "ภายใต้ประทุน" ของWorkStealingPool
. โดยทั่วไปแล้ว fork/join framework จะกลับมาใน Java 1.7 และแม้ว่า Java 11 จะอยู่ใกล้แค่เอื้อม แต่ก็ยังคุ้มค่าที่จะจดจำ นี่ไม่ใช่การใช้งานทั่วไป แต่ก็น่าสนใจทีเดียว มีบทวิจารณ์ที่ดีเกี่ยวกับเรื่องนี้บนเว็บ: ทำความเข้าใจกับ Java Fork-Join Framework ด้วย Examples ที่พึ่งForkJoinPool
พิงjava.util.concurrent.RecursiveTask
. นอกจากนี้ยังjava.util.concurrent.RecursiveAction
มี RecursiveAction
ไม่ส่งคืนผลลัพธ์ ดังนั้นจึงRecursiveTask
คล้ายกับCallable
และRecursiveAction
คล้ายunnable
กับ เราจะเห็นว่าชื่อประกอบด้วยชื่อของเมธอดสำคัญสองเมธอด: fork
และ join
เดอะfork
วิธีการเริ่มงานบางอย่างแบบอะซิงโครนัสบนเธรดแยกต่างหาก และjoin
วิธีการให้คุณรอให้งานเสร็จ เพื่อให้เข้าใจได้ดีที่สุด คุณควรอ่านFrom Imperative Programming to Fork/Join to Parallel Streams in Java 8
สรุป
เป็นอันจบรีวิวส่วนนี้ เราได้เรียนรู้ว่าExecutor
เดิมทีถูกประดิษฐ์ขึ้นเพื่อดำเนินการเธรด จากนั้นผู้สร้างของ Java ตัดสินใจที่จะสานต่อแนวคิดนี้และได้แนวคิดExecutorService
. ExecutorService
ให้เราส่งงานเพื่อดำเนินการโดยใช้submit()
และinvoke()
และปิดบริการด้วย เนื่องจากExecutorService
ต้องการการใช้งาน พวกเขาจึงเขียนคลาสด้วยเมธอดโรงงานและเรียกมันExecutors
ว่า ช่วยให้คุณสร้างเธรดพูล ( ThreadPoolExecutor
) นอกจากนี้ ยังมีเธรดพูลที่ช่วยให้เราระบุกำหนดการดำเนินการได้ด้วย และForkJoinPool
ซ่อนอยู่เบื้องหลังWorkStealingPool
. ฉันหวังว่าคุณจะพบสิ่งที่ฉันเขียนไว้ข้างต้น ไม่เพียงแต่น่าสนใจเท่านั้น แต่ยังเข้าใจได้ด้วย :) ฉันยินดีรับฟังคำแนะนำและความคิดเห็นของคุณเสมอ ดีกว่ากัน: Java และคลาสเธรด ส่วนที่ 1 — เธรดของการดำเนินการ ดีขึ้นด้วยกัน: Java และคลาสเธรด ส่วนที่ II — การซิงโครไนซ์ เข้าด้วยกันได้ดีขึ้น: Java และคลาสเธรด ส่วนที่ 3 — ปฏิสัมพันธ์ ร่วมกันได้ดีขึ้น: Java และคลาสเธรด ตอนที่ IV — Callable, Future และผองเพื่อน อยู่ด้วยกันดีกว่า: Java และคลาส Thread ตอนที่ VI — ยิงออกไป!
GO TO FULL VERSION