การแนะนำ
ดังนั้นเราจึงรู้ว่า Java มีเธรด คุณสามารถอ่านเกี่ยวกับเรื่องนี้ได้ในบทวิจารณ์เรื่องBetter together: Java and the Thread class ส่วนที่ 1 — เธรดของการดำเนินการ
มาดูรหัสทั่วไปกันอีกครั้ง:
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
อย่างที่คุณเห็น โค้ดสำหรับเริ่มงานเป็นเรื่องปกติ แต่เราต้องทำซ้ำสำหรับงานใหม่ วิธีแก้ไขวิธีหนึ่งคือใส่ไว้ในวิธีอื่นexecute(Runnable runnable)เช่น แต่ผู้สร้างของ Java ได้พิจารณาถึงสภาพของเราและมาพร้อมกับExecutorอินเทอร์เฟซ:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
โค้ดนี้กระชับขึ้นอย่างเห็นได้ชัด: ตอนนี้เราเพียงแค่เขียนโค้ดเพื่อเริ่มRunnableต้นเธรด ที่ดีใช่มั้ย แต่นี่เป็นเพียงจุดเริ่มต้นเท่านั้น:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executorอินเทอร์เฟซมีExecutorServiceอินเทอร์เฟซย่อย Javadoc สำหรับอินเตอร์เฟสนี้ระบุว่า an ExecutorServiceอธิบายเฉพาะExecutorที่มีวิธีการปิดระบบExecutor. นอกจากนี้ยังช่วยให้สามารถjava.util.concurrent.Futureติดตามกระบวนการดำเนินการได้ ก่อนหน้านี้ในBetter together: Java และคลาส Thread ตอนที่ IV — Callable, Future และผองเพื่อนเราได้ทบทวนความสามารถFutureของ หากคุณลืมหรือไม่เคยอ่าน ฉันขอแนะนำให้คุณรีเฟรชหน่วยความจำของคุณ ;) Javadoc พูดอะไรอีก มันบอกเราว่าเรามีโรงงานพิเศษjava.util.concurrent.Executorsที่ให้เราสร้างการใช้งานเริ่มต้นExecutorServiceของ
ExecutorService
มาทบทวนกัน เราต้องExecutorดำเนินการ (เช่นเรียกexecute()ใช้) งานบางอย่างบนเธรด และโค้ดที่สร้างเธรดนั้นถูกซ่อนไว้ไม่ให้เราทราบ เรามีExecutorService— เฉพาะExecutorที่มีหลายตัวเลือกสำหรับการควบคุมความคืบหน้า และเรามี โรงงานที่ให้ เราExecutorsสร้าง ExecutorServiceตอนนี้มาทำเอง:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
คุณจะเห็นว่าเราระบุกลุ่มเธรดคงที่ซึ่งมีขนาด 2 จากนั้นเราจะส่งงานไปยังกลุ่มทีละรายการ แต่ละงานส่งคืนStringชื่อเธรด ( currentThread().GetName()) สิ่งสำคัญคือต้องปิดส่วนExecutorServiceท้ายสุด เพราะไม่เช่นนั้นโปรแกรมของเราจะไม่สิ้นสุด โรงงานExecutorsมีวิธีการโรงงานเพิ่มเติม ตัวอย่างเช่น เราสามารถสร้างพูลที่ประกอบด้วยเธรดเดียว ( newSingleThreadExecutor) หรือพูลที่มีแคช ( newCachedThreadPool) ซึ่งเธรดจะถูกลบออกหลังจากที่ไม่ได้ใช้งานเป็นเวลา 1 นาที ในความเป็นจริง สิ่งเหล่านี้ExecutorServiceได้รับการสนับสนุนโดยคิวการบล็อกซึ่งวางงานและจากงานใดที่ถูกดำเนินการ ดูข้อมูลเพิ่มเติมเกี่ยวกับการบล็อกคิวได้ในวิดีโอนี้ คุณยังสามารถอ่านสิ่งนี้รีวิวเกี่ยวกับ BlockingQueue และตรวจดูคำตอบสำหรับคำถาม"เมื่อต้องการ LinkedBlockingQueue มากกว่า ArrayBlockingQueue" กล่าวอย่างง่ายที่สุดBlockingQueueการบล็อกเธรดในสองกรณี:
- เธรดพยายามรับไอเท็มจากคิวว่าง
- เธรดพยายามใส่รายการลงในคิวเต็ม
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
หรือ
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ดังที่เราเห็น การใช้งานExecutorServiceถูกสร้างขึ้นภายในวิธีการของโรงงาน และส่วนใหญ่เรากำลังพูดThreadPoolExecutorถึง เฉพาะพารามิเตอร์ที่มีผลต่องานเท่านั้นที่มีการเปลี่ยนแปลง
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
อย่างที่เราเห็นก่อนหน้านี้ThreadPoolExecutorคือสิ่งที่มักจะสร้างขึ้นในวิธีการของโรงงาน ฟังก์ชันการทำงานได้รับผลกระทบจากอาร์กิวเมนต์ที่เราส่งผ่านเป็นจำนวนเธรดสูงสุดและต่ำสุด รวมถึงประเภทของคิวที่ใช้ แต่java.util.concurrent.BlockingQueueสามารถใช้อินเทอร์ เฟซใด ๆ ก็ได้ เมื่อพูดถึงThreadPoolExecutorเราควรพูดถึงคุณสมบัติที่น่าสนใจบางอย่าง ตัวอย่างเช่น คุณไม่สามารถส่งงานไปยัง a ThreadPoolExecutorหากไม่มีพื้นที่ว่าง:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
รหัสนี้จะผิดพลาดโดยมีข้อผิดพลาดดังนี้:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
กล่าวอีกนัยหนึ่ง คือtaskไม่สามารถส่งได้ เพราะSynchronousQueueได้รับการออกแบบเพื่อให้ประกอบด้วยองค์ประกอบเดียวจริง ๆ และไม่อนุญาตให้เราใส่อะไรลงไปอีก เราจะเห็นว่าเรามีศูนย์queued tasks("งานคิว = 0") ที่นี่ แต่ก็ไม่มีอะไรแปลกเกี่ยวกับเรื่องนี้ เพราะนี่คือคุณสมบัติพิเศษของSynchronousQueueซึ่งอันที่จริงแล้วคือคิว 1 องค์ประกอบที่ว่างเปล่าเสมอ! เมื่อเธรดหนึ่งใส่องค์ประกอบในคิว มันจะรอจนกว่าเธรดอื่นจะนำองค์ประกอบจากคิว ดังนั้น เราสามารถแทนที่ด้วยnew LinkedBlockingQueue<>(1)และข้อผิดพลาดจะเปลี่ยนเป็น now queued tasks = 1show เนื่องจากคิวเป็นเพียง 1 องค์ประกอบ เราจึงไม่สามารถเพิ่มองค์ประกอบที่สองได้ และนั่นคือสิ่งที่ทำให้โปรแกรมล้มเหลว การสนทนาของเราต่อไปในคิวเป็นที่น่าสังเกตว่าThreadPoolExecutorคลาสมีวิธีเพิ่มเติมสำหรับการให้บริการคิว ตัวอย่างเช่นthreadPoolExecutor.purge()เมธอดจะลบงานที่ยกเลิกทั้งหมดออกจากคิวเพื่อเพิ่มพื้นที่ว่างในคิว ฟังก์ชันเกี่ยวกับคิวที่น่าสนใจอีกอย่างคือตัวจัดการสำหรับงานที่ถูกปฏิเสธ:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
ในตัวอย่างนี้ ตัวจัดการของเราจะแสดงRejectedทุกครั้งที่งานในคิวถูกปฏิเสธ สะดวกใช่มั้ย นอกจากนี้ยังThreadPoolExecutorมีคลาสย่อยที่น่าสนใจ: ScheduledThreadPoolExecutorซึ่งเป็นScheduledExecutorService. ให้ความสามารถในการทำงานตามตัวจับเวลา
บริการผู้ดำเนินการตามกำหนดเวลา
ScheduledExecutorService(ซึ่งเป็นประเภทExecutorService) ให้เราทำงานตามกำหนดเวลา ลองดูตัวอย่าง:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
ทุกอย่างเรียบง่ายที่นี่ ส่งงานแล้วเราจะได้รับไฟล์java.util.concurrent.ScheduledFuture. ตารางเวลาอาจมีประโยชน์ในสถานการณ์ต่อไปนี้ด้วย:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
ที่นี่ เราส่งRunnableงานเพื่อดำเนินการที่ความถี่คงที่ ("อัตราคงที่") โดยมีการหน่วงเวลาเริ่มต้นที่แน่นอน ในกรณีนี้ หลังจาก 1 วินาที งานจะเริ่มดำเนินการทุกๆ 2 วินาที มีตัวเลือกที่คล้ายกัน:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
แต่ในกรณีนี้ งานจะถูกดำเนินการโดยมีช่วงเวลาเฉพาะระหว่างการดำเนินการแต่ละครั้ง นั่นคือtaskจะดำเนินการหลังจาก 1 วินาที จากนั้น ทันทีที่เสร็จสิ้น 2 วินาทีจะผ่านไป จากนั้นงานใหม่ก็จะเริ่มขึ้น นี่คือแหล่งข้อมูลเพิ่มเติมบางส่วนในหัวข้อนี้:
- บทนำเกี่ยวกับเธรดพูลใน Java
- รู้เบื้องต้นเกี่ยวกับ Thread Pools ใน Java
- Java Multithreading Steeplechase: การยกเลิกงานใน Executors
- การใช้ Java Executors สำหรับงานเบื้องหลัง
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
นอกจากเธรดพูลข้างต้นแล้ว ยังมีอีกหนึ่ง เราสามารถพูดได้ว่ามันพิเศษเล็กน้อย เรียกว่าแหล่งขโมยงาน กล่าวโดยย่อ การขโมยงานเป็นอัลกอริทึมที่เธรดที่ไม่ได้ใช้งานเริ่มรับงานจากเธรดอื่นหรืองานจากคิวที่ใช้ร่วมกัน ลองดูตัวอย่าง:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
ถ้าเราเรียกใช้รหัสนี้ExecutorServiceจะสร้าง 5 เธรดให้เรา เนื่องจากแต่ละเธรดจะถูกวางในคิวรอสำหรับวัตถุล็อค เราได้หามอนิเตอร์และล็อกในBetter together: Java และคลาสเธรดแล้ว ส่วนที่ II — การซิงโครไนซ์ ทีนี้มาแทนที่Executors.newCachedThreadPool()ด้วยExecutors.newWorkStealingPool(). อะไรจะเปลี่ยนไป? เราจะเห็นว่างานของเราดำเนินการน้อยกว่า 5 เธรด โปรดจำไว้ว่าCachedThreadPoolสร้างเธรดสำหรับแต่ละงานหรือไม่ นั่นเป็นเพราะว่าwait()เธรดถูกบล็อก งานที่ตามมาต้องการทำให้เสร็จ และเธรดใหม่ถูกสร้างขึ้นสำหรับเธรดในพูล ด้วยกลุ่มการขโมย เธรดจะไม่ว่างตลอดไป พวกเขาเริ่มปฏิบัติภารกิจของเพื่อนบ้าน อะไรทำให้WorkStealingPoolแตกต่างจากเธรดพูลอื่นๆ ความจริงที่ว่าขลังForkJoinPoolอาศัยอยู่ในนั้น:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
จริงๆแล้วมีความแตกต่างอีกอย่างหนึ่ง ตามค่าดีฟอลต์ เธรดที่สร้างขึ้นสำหรับ a จะเป็น เธรดForkJoinPooldaemon ซึ่งแตกต่างจากเธรดที่สร้างผ่าน onrdinary ThreadPoolโดยทั่วไป คุณควรจำเธรด daemon เพราะตัวอย่างเช่นCompletableFutureใช้เธรด daemon เว้นแต่คุณจะระบุของคุณเองThreadFactoryที่สร้างเธรดที่ไม่ใช่ daemon นี่คือเซอร์ไพรส์ที่อาจแฝงตัวอยู่ในที่ที่คาดไม่ถึง! :)
ForkJinPool
ในส่วนนี้ เราจะพูดถึงอีกครั้งเกี่ยวกับForkJoinPool(เรียกอีกอย่างว่า fork/join framework) ซึ่งอาศัยอยู่ "ภายใต้ประทุน" ของWorkStealingPool. โดยทั่วไปแล้ว fork/join framework จะกลับมาใน Java 1.7 และแม้ว่า Java 11 จะอยู่ใกล้แค่เอื้อม แต่ก็ยังคุ้มค่าที่จะจดจำ นี่ไม่ใช่การใช้งานทั่วไป แต่ก็น่าสนใจทีเดียว มีบทวิจารณ์ที่ดีเกี่ยวกับเรื่องนี้บนเว็บ: ทำความเข้าใจกับ Java Fork-Join Framework ด้วย Examples ที่พึ่งForkJoinPoolพิงjava.util.concurrent.RecursiveTask. นอกจากนี้ยังjava.util.concurrent.RecursiveActionมี RecursiveActionไม่ส่งคืนผลลัพธ์ ดังนั้นจึงRecursiveTaskคล้ายกับCallableและRecursiveActionคล้ายunnableกับ เราจะเห็นว่าชื่อประกอบด้วยชื่อของเมธอดสำคัญสองเมธอด: forkและ joinเดอะforkวิธีการเริ่มงานบางอย่างแบบอะซิงโครนัสบนเธรดแยกต่างหาก และjoinวิธีการให้คุณรอให้งานเสร็จ เพื่อให้เข้าใจได้ดีที่สุด คุณควรอ่านFrom Imperative Programming to Fork/Join to Parallel Streams in Java 8
สรุป
เป็นอันจบรีวิวส่วนนี้ เราได้เรียนรู้ว่าExecutorเดิมทีถูกประดิษฐ์ขึ้นเพื่อดำเนินการเธรด จากนั้นผู้สร้างของ Java ตัดสินใจที่จะสานต่อแนวคิดนี้และได้แนวคิดExecutorService. ExecutorServiceให้เราส่งงานเพื่อดำเนินการโดยใช้submit()และinvoke()และปิดบริการด้วย เนื่องจากExecutorServiceต้องการการใช้งาน พวกเขาจึงเขียนคลาสด้วยเมธอดโรงงานและเรียกมันExecutorsว่า ช่วยให้คุณสร้างเธรดพูล ( ThreadPoolExecutor) นอกจากนี้ ยังมีเธรดพูลที่ช่วยให้เราระบุกำหนดการดำเนินการได้ด้วย และForkJoinPoolซ่อนอยู่เบื้องหลังWorkStealingPool. ฉันหวังว่าคุณจะพบสิ่งที่ฉันเขียนไว้ข้างต้น ไม่เพียงแต่น่าสนใจเท่านั้น แต่ยังเข้าใจได้ด้วย :) ฉันยินดีรับฟังคำแนะนำและความคิดเห็นของคุณเสมอ ดีกว่ากัน: Java และคลาสเธรด ส่วนที่ 1 — เธรดของการดำเนินการ ดีขึ้นด้วยกัน: Java และคลาสเธรด ส่วนที่ II — การซิงโครไนซ์ เข้าด้วยกันได้ดีขึ้น: Java และคลาสเธรด ส่วนที่ 3 — ปฏิสัมพันธ์ ร่วมกันได้ดีขึ้น: Java และคลาสเธรด ตอนที่ IV — Callable, Future และผองเพื่อน อยู่ด้วยกันดีกว่า: Java และคลาส Thread ตอนที่ VI — ยิงออกไป!
GO TO FULL VERSION