CodeGym /בלוג Java /Random-HE /עדיף ביחד: Java וכיתה Thread. חלק V - מבצע, ThreadPool, F...
John Squirrels
רָמָה
San Francisco

עדיף ביחד: Java וכיתה Thread. חלק V - מבצע, ThreadPool, Fork/Join

פורסם בקבוצה

מבוא

אז, אנחנו יודעים שלג'אווה יש חוטים. אתה יכול לקרוא על כך בסקירה שכותרתה Better together: Java and the Thread class. חלק א' - חוטי הוצאה להורג . עדיף ביחד: Java וכיתה Thread.  חלק V - מבצע, ThreadPool, Fork/Join - 1בואו נסתכל שוב על הקוד הטיפוסי:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
כפי שאתה יכול לראות, הקוד כדי להתחיל משימה הוא די אופייני, אבל אנחנו צריכים לחזור עליו עבור משימה חדשה. פתרון אחד הוא לשים אותו בשיטה נפרדת, למשל execute(Runnable runnable). אבל היוצרים של ג'אווה שקלו את מצוקתנו והגיעו לממשק Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
ברור שהקוד הזה הוא תמציתי יותר: עכשיו אנחנו פשוט כותבים קוד כדי להתחיל את Runnableהשרשור. זה נהדר, לא? אבל זו רק ההתחלה: עדיף ביחד: Java וכיתה Thread.  חלק V - מבצע, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

כפי שניתן לראות, Executorלממשק יש ממשק ExecutorServiceמשנה. ה-Javadoc עבור ממשק זה אומר שמתאר ExecutorServiceפרט Executorהמספק שיטות לכיבוי ה- Executor. זה גם מאפשר לקבל java.util.concurrent.Futureעל מנת לעקוב אחר תהליך הביצוע. בעבר, ב- Better together: Java and the Thread class. חלק IV - ניתן להתקשרות, עתיד וחברים , סקרנו בקצרה את היכולות של Future. אם שכחת או אף פעם לא קראת אותו, אני מציע לך לרענן את הזיכרון ;) מה עוד אומר ה-Javadoc? זה אומר לנו שיש לנו java.util.concurrent.Executorsמפעל מיוחד שמאפשר לנו ליצור יישומי ברירת מחדל של ExecutorService.

ExecutorService

בואו נסקור. עלינו Executorלבצע (כלומר להתקשר execute()) משימה מסוימת בשרשור, והקוד שיוצר את השרשור מוסתר מאיתנו. יש לנו ExecutorService- ספציפי Executorשיש לו מספר אפשרויות לשליטה בהתקדמות. ויש לנו את Executorsהמפעל שמאפשר לנו ליצור ExecutorService. עכשיו בואו נעשה זאת בעצמנו:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
ניתן לראות שציינו מאגר חוטים קבוע שגודלו הוא 2. לאחר מכן אנו מגישים משימות למאגר אחד אחד. כל משימה מחזירה a Stringהמכיל את שם השרשור ( currentThread().GetName()). חשוב לסגור את התוכנית ExecutorServiceממש בסוף, כי אחרת התוכנית שלנו לא תסתיים. למפעל Executorsשיטות מפעל נוספות. לדוגמה, נוכל ליצור בריכה המורכבת משרשור אחד בלבד ( newSingleThreadExecutor) או מאגר הכולל מטמון ( newCachedThreadPool) שממנו מוסרים שרשורים לאחר שהם לא פעילים במשך דקה. במציאות, אלה ExecutorServiceמגובים בתור חסימה , שלתוכו ממוקמות משימות וממנו מבוצעות משימות. מידע נוסף על חסימת תורים ניתן למצוא בסרטון זה . אתה יכול גם לקרוא סקירה זו על BlockingQueue . ובדוק את התשובה לשאלה "מתי להעדיף LinkedBlockingQueue על פני ArrayBlockingQueue?" במונחים הפשוטים ביותר, a BlockingQueueחוסם חוט בשני מקרים:
  • השרשור מנסה להשיג פריטים מתור ריק
  • השרשור מנסה להכניס פריטים לתור מלא
אם נסתכל על יישום שיטות המפעל, נוכל לראות כיצד הן פועלות. לדוגמה:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
אוֹ
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
כפי שאנו יכולים לראות, יישומים של ExecutorServiceנוצרים בתוך שיטות המפעל. ולרוב, אנחנו מדברים על ThreadPoolExecutor. רק הפרמטרים המשפיעים על העבודה משתנים. עדיף ביחד: Java וכיתה Thread.  חלק V - מבצע, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

כפי שראינו קודם, ThreadPoolExecutorזה מה שבדרך כלל נוצר בתוך שיטות המפעל. הפונקציונליות מושפעת מהארגומנטים שאנו מעבירים כמספר המרבי והמינימלי של שרשורים, כמו גם מאיזה סוג תור נעשה שימוש. אבל java.util.concurrent.BlockingQueueניתן להשתמש בכל יישום של הממשק. אם כבר מדברים על ThreadPoolExecutor, עלינו להזכיר כמה תכונות מעניינות. לדוגמה, לא ניתן לשלוח משימות ל- ThreadPoolExecutorאם אין מקום פנוי:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
הקוד הזה יקרוס עם שגיאה כזו:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
במילים אחרות, taskלא ניתן להגיש, כי SynchronousQueueהוא מתוכנן כך שהוא למעשה מורכב מאלמנט בודד ולא מאפשר לנו להכניס בו שום דבר נוסף. אנחנו יכולים לראות שיש לנו אפס queued tasks("משימות בתור = 0") כאן. אבל אין בזה שום דבר מוזר, כי זו תכונה מיוחדת של SynchronousQueue, שלמעשה היא תור של אלמנט אחד שתמיד ריק! כאשר שרשור אחד מכניס אלמנט לתור, הוא ימתין עד שרשור אחר ייקח את האלמנט מהתור. בהתאם לכך, נוכל להחליף אותו ב- new LinkedBlockingQueue<>(1)והשגיאה תשתנה להצגת כעת queued tasks = 1. מכיוון שהתור הוא רק אלמנט אחד, אנחנו לא יכולים להוסיף אלמנט שני. וזה מה שגורם לתוכנית להיכשל. בהמשך הדיון שלנו על תור, ראוי לציין שלכיתה ThreadPoolExecutorיש שיטות נוספות לשירות התור. לדוגמה, threadPoolExecutor.purge()השיטה תסיר את כל המשימות שבוטלו מהתור על מנת לפנות מקום בתור. פונקציה מעניינת נוספת הקשורה לתור היא המטפל למשימות שנדחו:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
בדוגמה זו, המטפל שלנו פשוט מציג Rejectedבכל פעם שמשימה בתור נדחית. נוח, לא? בנוסף, ThreadPoolExecutorיש תת-מחלקה מעניינת: ScheduledThreadPoolExecutor, שהיא ScheduledExecutorService. הוא מספק את היכולת לבצע משימה על בסיס טיימר.

ScheduledExecutorService

ScheduledExecutorService(שזה סוג של ExecutorService) מאפשר לנו להריץ משימות לפי לוח זמנים. בואו נסתכל על דוגמה:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
הכל פשוט כאן. המשימות מוגשות ואז אנחנו מקבלים java.util.concurrent.ScheduledFuture. לוח זמנים עשוי להיות מועיל גם במצב הבא:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
כאן אנו מגישים Runnableמשימה לביצוע בתדירות קבועה ("FixedRate") עם עיכוב ראשוני מסוים. במקרה זה, לאחר שנייה אחת, המשימה תתחיל להתבצע כל 2 שניות. יש אפשרות דומה:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
אבל במקרה זה, המשימות מבוצעות עם מרווח מסוים בין כל ביצוע. כלומר, הצוואה taskתתבצע לאחר שנייה אחת. לאחר מכן, ברגע שהיא תושלם, יעברו 2 שניות, ואז תתחיל משימה חדשה. להלן מספר משאבים נוספים בנושא זה: עדיף ביחד: Java וכיתה Thread.  חלק V - מבצע, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

בנוסף לבריכות החוטים הנ"ל, יש עוד אחת. אנחנו יכולים לומר בכנות שזה קצת מיוחד. זה נקרא בריכת גניבת עבודה. בקיצור, גניבת עבודה היא אלגוריתם שבו שרשורים סרק מתחילים לקחת משימות משרשורים אחרים או משימות מתור משותף. בואו נסתכל על דוגמה:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
אם נריץ את הקוד הזה, אז זה ExecutorServiceייצור עבורנו 5 שרשורים, מכיוון שכל שרשור יוכנס לתור ההמתנה לאובייקט הנעילה. כבר גילינו צגים ומנעולים ב- Better ביחד: Java ומחלקת Thread. חלק ב' - סנכרון . עכשיו בואו נחליף Executors.newCachedThreadPool()ב Executors.newWorkStealingPool(). מה ישתנה? נראה שהמשימות שלנו מבוצעות בפחות מ-5 שרשורים. זוכרים שזה CachedThreadPoolיוצר שרשור לכל משימה? הסיבה לכך היא wait()שחסמה את השרשור, משימות עוקבות רוצות להסתיים, ונוצרו עבורן שרשורים חדשים במאגר. עם בריכה גניבה, חוטים לא עומדים בטלים לנצח. הם מתחילים לבצע את המשימות של שכניהם. מה עושה WorkStealingPoolכל כך שונה מבריכות חוטים אחרות? העובדה שהקסום ForkJoinPoolחי בתוכו:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
למעשה, יש עוד הבדל אחד. כברירת מחדל, השרשורים שנוצרו עבור a ForkJoinPoolהם שרשורי דמון, בשונה מהשרשורים שנוצרו באמצעות שרשור רגיל ThreadPool. באופן כללי, אתה צריך לזכור שרשורי דמון, מכיוון, למשל, CompletableFutureמשתמש גם בשרשורי דמון אלא אם כן אתה מציין משלך ThreadFactoryשיוצר שרשורים שאינם דמונים. אלו ההפתעות שעלולות להיות אורבות במקומות לא צפויים! :)

ForkJoinPool

בחלק זה, נדבר שוב על ForkJoinPool(הנקראת גם מסגרת המזלג/החבר), שחי "מתחת למכסה המנוע" של WorkStealingPool. באופן כללי, המסגרת fork/join הופיעה בחזרה ב-Java 1.7. ולמרות ש-Java 11 קרובה, עדיין כדאי לזכור אותה. זה לא היישום הנפוץ ביותר, אבל הוא די מעניין. יש סקירה טובה על זה באינטרנט: הבנת Java Fork-Join Framework עם דוגמאות . המסתמכת ForkJoinPoolעל java.util.concurrent.RecursiveTask. יש גם java.util.concurrent.RecursiveAction. RecursiveActionלא מחזיר תוצאה. לפיכך, RecursiveTaskדומה ל Callable, ודומה RecursiveActionל unnable. אנו יכולים לראות שהשם כולל שמות של שתי שיטות חשובות: forkו join. השיטה forkמתחילה משימה כלשהי באופן אסינכרוני בשרשור נפרד. והשיטה joinמאפשרת לך לחכות לעבודה שתתבצע. כדי לקבל את ההבנה הטובה ביותר, כדאי לקרוא מתכנות ציווי ל-Fork/Join לזרמים מקבילים ב-Java 8 .

סיכום

ובכן, זה מסכם את החלק הזה של הסקירה. למדנו שזה Executorהומצא במקור כדי לבצע שרשורים. ואז היוצרים של ג'אווה החליטו להמשיך ברעיון והעלו ExecutorService. ExecutorServiceמאפשר לנו לשלוח משימות לביצוע באמצעות submit()ו invoke(), וגם לכבות את השירות. מכיוון ExecutorServiceשצריך יישומים, הם כתבו מחלקה עם שיטות מפעל וקראו לזה Executors. זה מאפשר לך ליצור בריכות חוטים ( ThreadPoolExecutor). בנוסף, ישנם בריכות שרשורים המאפשרות לנו גם לציין לוח זמנים לביצוע. ומסתתר ForkJoinPoolמאחורי א WorkStealingPool. אני מקווה שמצאת את מה שכתבתי למעלה לא רק מעניין, אלא גם מובן :) אני תמיד שמח לשמוע את ההצעות וההערות שלך. עדיף ביחד: Java וכיתה Thread. חלק א' - חוטי ביצוע טוב יותר ביחד: ג'אווה ומחלקת ה-Thread. חלק ב' - סנכרון טוב יותר ביחד: Java ומחלקת Thread. חלק שלישי - אינטראקציה טובה יותר ביחד: Java ומחלקת Thread. חלק IV — ניתן להתקשרות, עתיד וחברים טובים יותר ביחד: Java וכיתה Thread. חלק ו' - אש!
הערות
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION