CodeGym /จาวาบล็อก /สุ่ม /ดีกว่ากัน: Java และคลาสเธรด ส่วนที่ V — Executor, ThreadP...
John Squirrels
ระดับ
San Francisco

ดีกว่ากัน: Java และคลาสเธรด ส่วนที่ V — Executor, ThreadPool, Fork/Jin

เผยแพร่ในกลุ่ม

การแนะนำ

ดังนั้นเราจึงรู้ว่า Java มีเธรด คุณสามารถอ่านเกี่ยวกับเรื่องนี้ได้ในบทวิจารณ์เรื่องBetter together: Java and the Thread class ส่วนที่ 1 — เธรดของการดำเนินการ ดีกว่ากัน: Java และคลาสเธรด  ส่วนที่ V — Executor, ThreadPool, Fork/Jin - 1มาดูรหัสทั่วไปกันอีกครั้ง:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
อย่างที่คุณเห็น โค้ดสำหรับเริ่มงานเป็นเรื่องปกติ แต่เราต้องทำซ้ำสำหรับงานใหม่ วิธีแก้ไขวิธีหนึ่งคือใส่ไว้ในวิธีอื่นexecute(Runnable runnable)เช่น แต่ผู้สร้างของ Java ได้พิจารณาถึงสภาพของเราและมาพร้อมกับExecutorอินเทอร์เฟซ:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
โค้ดนี้กระชับขึ้นอย่างเห็นได้ชัด: ตอนนี้เราเพียงแค่เขียนโค้ดเพื่อเริ่มRunnableต้นเธรด ที่ดีใช่มั้ย แต่นี่เป็นเพียงจุดเริ่มต้นเท่านั้น: ดีกว่ากัน: Java และคลาสเธรด  ส่วนที่ V — Executor, ThreadPool, Fork/Jin - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

อย่างที่คุณเห็นExecutorอินเทอร์เฟซมีExecutorServiceอินเทอร์เฟซย่อย Javadoc สำหรับอินเตอร์เฟสนี้ระบุว่า an ExecutorServiceอธิบายเฉพาะExecutorที่มีวิธีการปิดระบบExecutor. นอกจากนี้ยังช่วยให้สามารถjava.util.concurrent.Futureติดตามกระบวนการดำเนินการได้ ก่อนหน้านี้ในBetter together: Java และคลาส Thread ตอนที่ IV — Callable, Future และผองเพื่อนเราได้ทบทวนความสามารถFutureของ หากคุณลืมหรือไม่เคยอ่าน ฉันขอแนะนำให้คุณรีเฟรชหน่วยความจำของคุณ ;) Javadoc พูดอะไรอีก มันบอกเราว่าเรามีโรงงานพิเศษjava.util.concurrent.Executorsที่ให้เราสร้างการใช้งานเริ่มต้นExecutorServiceของ

ExecutorService

มาทบทวนกัน เราต้องExecutorดำเนินการ (เช่นเรียกexecute()ใช้) งานบางอย่างบนเธรด และโค้ดที่สร้างเธรดนั้นถูกซ่อนไว้ไม่ให้เราทราบ เรามีExecutorService— เฉพาะExecutorที่มีหลายตัวเลือกสำหรับการควบคุมความคืบหน้า และเรามี โรงงานที่ให้ เราExecutorsสร้าง ExecutorServiceตอนนี้มาทำเอง:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
คุณจะเห็นว่าเราระบุกลุ่มเธรดคงที่ซึ่งมีขนาด 2 จากนั้นเราจะส่งงานไปยังกลุ่มทีละรายการ แต่ละงานส่งคืนStringชื่อเธรด ( currentThread().GetName()) สิ่งสำคัญคือต้องปิดส่วนExecutorServiceท้ายสุด เพราะไม่เช่นนั้นโปรแกรมของเราจะไม่สิ้นสุด โรงงานExecutorsมีวิธีการโรงงานเพิ่มเติม ตัวอย่างเช่น เราสามารถสร้างพูลที่ประกอบด้วยเธรดเดียว ( newSingleThreadExecutor) หรือพูลที่มีแคช ( newCachedThreadPool) ซึ่งเธรดจะถูกลบออกหลังจากที่ไม่ได้ใช้งานเป็นเวลา 1 นาที ในความเป็นจริง สิ่งเหล่านี้ExecutorServiceได้รับการสนับสนุนโดยคิวการบล็อกซึ่งวางงานและจากงานใดที่ถูกดำเนินการ ดูข้อมูลเพิ่มเติมเกี่ยวกับการบล็อกคิวได้ในวิดีโอนี้ คุณยังสามารถอ่านสิ่งนี้รีวิวเกี่ยวกับ BlockingQueue และตรวจดูคำตอบสำหรับคำถาม"เมื่อต้องการ LinkedBlockingQueue มากกว่า ArrayBlockingQueue" กล่าวอย่างง่ายที่สุดBlockingQueueการบล็อกเธรดในสองกรณี:
  • เธรดพยายามรับไอเท็มจากคิวว่าง
  • เธรดพยายามใส่รายการลงในคิวเต็ม
หากเราดูที่การนำวิธีการของโรงงานไปใช้ เราจะเห็นว่ามันทำงานอย่างไร ตัวอย่างเช่น:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
หรือ

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
ดังที่เราเห็น การใช้งานExecutorServiceถูกสร้างขึ้นภายในวิธีการของโรงงาน และส่วนใหญ่เรากำลังพูดThreadPoolExecutorถึง เฉพาะพารามิเตอร์ที่มีผลต่องานเท่านั้นที่มีการเปลี่ยนแปลง ดีกว่ากัน: Java และคลาสเธรด  ส่วนที่ V — Executor, ThreadPool, Fork/Jin - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

อย่างที่เราเห็นก่อนหน้านี้ThreadPoolExecutorคือสิ่งที่มักจะสร้างขึ้นในวิธีการของโรงงาน ฟังก์ชันการทำงานได้รับผลกระทบจากอาร์กิวเมนต์ที่เราส่งผ่านเป็นจำนวนเธรดสูงสุดและต่ำสุด รวมถึงประเภทของคิวที่ใช้ แต่java.util.concurrent.BlockingQueueสามารถใช้อินเทอร์ เฟซใด ๆ ก็ได้ เมื่อพูดถึงThreadPoolExecutorเราควรพูดถึงคุณสมบัติที่น่าสนใจบางอย่าง ตัวอย่างเช่น คุณไม่สามารถส่งงานไปยัง a ThreadPoolExecutorหากไม่มีพื้นที่ว่าง:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
รหัสนี้จะผิดพลาดโดยมีข้อผิดพลาดดังนี้:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
กล่าวอีกนัยหนึ่ง คือtaskไม่สามารถส่งได้ เพราะSynchronousQueueได้รับการออกแบบเพื่อให้ประกอบด้วยองค์ประกอบเดียวจริง ๆ และไม่อนุญาตให้เราใส่อะไรลงไปอีก เราจะเห็นว่าเรามีศูนย์queued tasks("งานคิว = 0") ที่นี่ แต่ก็ไม่มีอะไรแปลกเกี่ยวกับเรื่องนี้ เพราะนี่คือคุณสมบัติพิเศษของSynchronousQueueซึ่งอันที่จริงแล้วคือคิว 1 องค์ประกอบที่ว่างเปล่าเสมอ! เมื่อเธรดหนึ่งใส่องค์ประกอบในคิว มันจะรอจนกว่าเธรดอื่นจะนำองค์ประกอบจากคิว ดังนั้น เราสามารถแทนที่ด้วยnew LinkedBlockingQueue<>(1)และข้อผิดพลาดจะเปลี่ยนเป็น now queued tasks = 1show เนื่องจากคิวเป็นเพียง 1 องค์ประกอบ เราจึงไม่สามารถเพิ่มองค์ประกอบที่สองได้ และนั่นคือสิ่งที่ทำให้โปรแกรมล้มเหลว การสนทนาของเราต่อไปในคิวเป็นที่น่าสังเกตว่าThreadPoolExecutorคลาสมีวิธีเพิ่มเติมสำหรับการให้บริการคิว ตัวอย่างเช่นthreadPoolExecutor.purge()เมธอดจะลบงานที่ยกเลิกทั้งหมดออกจากคิวเพื่อเพิ่มพื้นที่ว่างในคิว ฟังก์ชันเกี่ยวกับคิวที่น่าสนใจอีกอย่างคือตัวจัดการสำหรับงานที่ถูกปฏิเสธ:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
ในตัวอย่างนี้ ตัวจัดการของเราจะแสดงRejectedทุกครั้งที่งานในคิวถูกปฏิเสธ สะดวกใช่มั้ย นอกจากนี้ยังThreadPoolExecutorมีคลาสย่อยที่น่าสนใจ: ScheduledThreadPoolExecutorซึ่งเป็นScheduledExecutorService. ให้ความสามารถในการทำงานตามตัวจับเวลา

บริการผู้ดำเนินการตามกำหนดเวลา

ScheduledExecutorService(ซึ่งเป็นประเภทExecutorService) ให้เราทำงานตามกำหนดเวลา ลองดูตัวอย่าง:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
ทุกอย่างเรียบง่ายที่นี่ ส่งงานแล้วเราจะได้รับไฟล์java.util.concurrent.ScheduledFuture. ตารางเวลาอาจมีประโยชน์ในสถานการณ์ต่อไปนี้ด้วย:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
ที่นี่ เราส่งRunnableงานเพื่อดำเนินการที่ความถี่คงที่ ("อัตราคงที่") โดยมีการหน่วงเวลาเริ่มต้นที่แน่นอน ในกรณีนี้ หลังจาก 1 วินาที งานจะเริ่มดำเนินการทุกๆ 2 วินาที มีตัวเลือกที่คล้ายกัน:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
แต่ในกรณีนี้ งานจะถูกดำเนินการโดยมีช่วงเวลาเฉพาะระหว่างการดำเนินการแต่ละครั้ง นั่นคือtaskจะดำเนินการหลังจาก 1 วินาที จากนั้น ทันทีที่เสร็จสิ้น 2 วินาทีจะผ่านไป จากนั้นงานใหม่ก็จะเริ่มขึ้น นี่คือแหล่งข้อมูลเพิ่มเติมบางส่วนในหัวข้อนี้: ดีกว่ากัน: Java และคลาสเธรด  ส่วนที่ V — Executor, ThreadPool, Fork/Jin - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

นอกจากเธรดพูลข้างต้นแล้ว ยังมีอีกหนึ่ง เราสามารถพูดได้ว่ามันพิเศษเล็กน้อย เรียกว่าแหล่งขโมยงาน กล่าวโดยย่อ การขโมยงานเป็นอัลกอริทึมที่เธรดที่ไม่ได้ใช้งานเริ่มรับงานจากเธรดอื่นหรืองานจากคิวที่ใช้ร่วมกัน ลองดูตัวอย่าง:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
ถ้าเราเรียกใช้รหัสนี้ExecutorServiceจะสร้าง 5 เธรดให้เรา เนื่องจากแต่ละเธรดจะถูกวางในคิวรอสำหรับวัตถุล็อค เราได้หามอนิเตอร์และล็อกในBetter together: Java และคลาสเธรดแล้ว ส่วนที่ II — การซิงโครไนซ์ ทีนี้มาแทนที่Executors.newCachedThreadPool()ด้วยExecutors.newWorkStealingPool(). อะไรจะเปลี่ยนไป? เราจะเห็นว่างานของเราดำเนินการน้อยกว่า 5 เธรด โปรดจำไว้ว่าCachedThreadPoolสร้างเธรดสำหรับแต่ละงานหรือไม่ นั่นเป็นเพราะว่าwait()เธรดถูกบล็อก งานที่ตามมาต้องการทำให้เสร็จ และเธรดใหม่ถูกสร้างขึ้นสำหรับเธรดในพูล ด้วยกลุ่มการขโมย เธรดจะไม่ว่างตลอดไป พวกเขาเริ่มปฏิบัติภารกิจของเพื่อนบ้าน อะไรทำให้WorkStealingPoolแตกต่างจากเธรดพูลอื่นๆ ความจริงที่ว่าขลังForkJoinPoolอาศัยอยู่ในนั้น:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
จริงๆแล้วมีความแตกต่างอีกอย่างหนึ่ง ตามค่าดีฟอลต์ เธรดที่สร้างขึ้นสำหรับ a จะเป็น เธรดForkJoinPooldaemon ซึ่งแตกต่างจากเธรดที่สร้างผ่าน onrdinary ThreadPoolโดยทั่วไป คุณควรจำเธรด daemon เพราะตัวอย่างเช่นCompletableFutureใช้เธรด daemon เว้นแต่คุณจะระบุของคุณเองThreadFactoryที่สร้างเธรดที่ไม่ใช่ daemon นี่คือเซอร์ไพรส์ที่อาจแฝงตัวอยู่ในที่ที่คาดไม่ถึง! :)

ForkJinPool

ในส่วนนี้ เราจะพูดถึงอีกครั้งเกี่ยวกับForkJoinPool(เรียกอีกอย่างว่า fork/join framework) ซึ่งอาศัยอยู่ "ภายใต้ประทุน" ของWorkStealingPool. โดยทั่วไปแล้ว fork/join framework จะกลับมาใน Java 1.7 และแม้ว่า Java 11 จะอยู่ใกล้แค่เอื้อม แต่ก็ยังคุ้มค่าที่จะจดจำ นี่ไม่ใช่การใช้งานทั่วไป แต่ก็น่าสนใจทีเดียว มีบทวิจารณ์ที่ดีเกี่ยวกับเรื่องนี้บนเว็บ: ทำความเข้าใจกับ Java Fork-Join Framework ด้วย Examples ที่พึ่งForkJoinPoolพิงjava.util.concurrent.RecursiveTask. นอกจากนี้ยังjava.util.concurrent.RecursiveActionมี RecursiveActionไม่ส่งคืนผลลัพธ์ ดังนั้นจึงRecursiveTaskคล้ายกับCallableและRecursiveActionคล้ายunnableกับ เราจะเห็นว่าชื่อประกอบด้วยชื่อของเมธอดสำคัญสองเมธอด: forkและ joinเดอะforkวิธีการเริ่มงานบางอย่างแบบอะซิงโครนัสบนเธรดแยกต่างหาก และjoinวิธีการให้คุณรอให้งานเสร็จ เพื่อให้เข้าใจได้ดีที่สุด คุณควรอ่านFrom Imperative Programming to Fork/Join to Parallel Streams in Java 8

สรุป

เป็นอันจบรีวิวส่วนนี้ เราได้เรียนรู้ว่าExecutorเดิมทีถูกประดิษฐ์ขึ้นเพื่อดำเนินการเธรด จากนั้นผู้สร้างของ Java ตัดสินใจที่จะสานต่อแนวคิดนี้และได้แนวคิดExecutorService. ExecutorServiceให้เราส่งงานเพื่อดำเนินการโดยใช้submit()และinvoke()และปิดบริการด้วย เนื่องจากExecutorServiceต้องการการใช้งาน พวกเขาจึงเขียนคลาสด้วยเมธอดโรงงานและเรียกมันExecutorsว่า ช่วยให้คุณสร้างเธรดพูล ( ThreadPoolExecutor) นอกจากนี้ ยังมีเธรดพูลที่ช่วยให้เราระบุกำหนดการดำเนินการได้ด้วย และForkJoinPoolซ่อนอยู่เบื้องหลังWorkStealingPool. ฉันหวังว่าคุณจะพบสิ่งที่ฉันเขียนไว้ข้างต้น ไม่เพียงแต่น่าสนใจเท่านั้น แต่ยังเข้าใจได้ด้วย :) ฉันยินดีรับฟังคำแนะนำและความคิดเห็นของคุณเสมอ ดีกว่ากัน: Java และคลาสเธรด ส่วนที่ 1 — เธรดของการดำเนินการ ดีขึ้นด้วยกัน: Java และคลาสเธรด ส่วนที่ II — การซิงโครไนซ์ เข้าด้วยกันได้ดีขึ้น: Java และคลาสเธรด ส่วนที่ 3 — ปฏิสัมพันธ์ ร่วมกันได้ดีขึ้น: Java และคลาสเธรด ตอนที่ IV — Callable, Future และผองเพื่อน อยู่ด้วยกันดีกว่า: Java และคลาส Thread ตอนที่ VI — ยิงออกไป!
ความคิดเห็น
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION