מבוא
אז, אנחנו יודעים שלג'אווה יש חוטים. אתה יכול לקרוא על כך בסקירה שכותרתה Better together: Java and the Thread class. חלק א' - חוטי הוצאה להורג .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
כפי שאתה יכול לראות, הקוד כדי להתחיל משימה הוא די אופייני, אבל אנחנו צריכים לחזור עליו עבור משימה חדשה. פתרון אחד הוא לשים אותו בשיטה נפרדת, למשל execute(Runnable runnable)
. אבל היוצרים של ג'אווה שקלו את מצוקתנו והגיעו לממשק Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
ברור שהקוד הזה הוא תמציתי יותר: עכשיו אנחנו פשוט כותבים קוד כדי להתחיל את Runnable
השרשור. זה נהדר, לא? אבל זו רק ההתחלה: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
לממשק יש ממשק ExecutorService
משנה. ה-Javadoc עבור ממשק זה אומר שמתאר ExecutorService
פרט Executor
המספק שיטות לכיבוי ה- Executor
. זה גם מאפשר לקבל java.util.concurrent.Future
על מנת לעקוב אחר תהליך הביצוע. בעבר, ב- Better together: Java and the Thread class. חלק IV - ניתן להתקשרות, עתיד וחברים
, סקרנו בקצרה את היכולות של Future
. אם שכחת או אף פעם לא קראת אותו, אני מציע לך לרענן את הזיכרון ;) מה עוד אומר ה-Javadoc? זה אומר לנו שיש לנו java.util.concurrent.Executors
מפעל מיוחד שמאפשר לנו ליצור יישומי ברירת מחדל של ExecutorService
.
ExecutorService
בואו נסקור. עלינוExecutor
לבצע (כלומר להתקשר execute()
) משימה מסוימת בשרשור, והקוד שיוצר את השרשור מוסתר מאיתנו. יש לנו ExecutorService
- ספציפי Executor
שיש לו מספר אפשרויות לשליטה בהתקדמות. ויש לנו את Executors
המפעל שמאפשר לנו ליצור ExecutorService
. עכשיו בואו נעשה זאת בעצמנו:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
ניתן לראות שציינו מאגר חוטים קבוע שגודלו הוא 2. לאחר מכן אנו מגישים משימות למאגר אחד אחד. כל משימה מחזירה a String
המכיל את שם השרשור ( currentThread().GetName()
). חשוב לסגור את התוכנית ExecutorService
ממש בסוף, כי אחרת התוכנית שלנו לא תסתיים. למפעל Executors
שיטות מפעל נוספות. לדוגמה, נוכל ליצור בריכה המורכבת משרשור אחד בלבד ( newSingleThreadExecutor
) או מאגר הכולל מטמון ( newCachedThreadPool
) שממנו מוסרים שרשורים לאחר שהם לא פעילים במשך דקה. במציאות, אלה ExecutorService
מגובים בתור חסימה , שלתוכו ממוקמות משימות וממנו מבוצעות משימות. מידע נוסף על חסימת תורים ניתן למצוא בסרטון זה
. אתה יכול גם לקרוא סקירה זו על BlockingQueue
. ובדוק את התשובה לשאלה "מתי להעדיף LinkedBlockingQueue על פני ArrayBlockingQueue?"
במונחים הפשוטים ביותר, a BlockingQueue
חוסם חוט בשני מקרים:
- השרשור מנסה להשיג פריטים מתור ריק
- השרשור מנסה להכניס פריטים לתור מלא
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
אוֹ
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
כפי שאנו יכולים לראות, יישומים של ExecutorService
נוצרים בתוך שיטות המפעל. ולרוב, אנחנו מדברים על ThreadPoolExecutor
. רק הפרמטרים המשפיעים על העבודה משתנים. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
כפי שראינו קודם,ThreadPoolExecutor
זה מה שבדרך כלל נוצר בתוך שיטות המפעל. הפונקציונליות מושפעת מהארגומנטים שאנו מעבירים כמספר המרבי והמינימלי של שרשורים, כמו גם מאיזה סוג תור נעשה שימוש. אבל java.util.concurrent.BlockingQueue
ניתן להשתמש בכל יישום של הממשק. אם כבר מדברים על ThreadPoolExecutor
, עלינו להזכיר כמה תכונות מעניינות. לדוגמה, לא ניתן לשלוח משימות ל- ThreadPoolExecutor
אם אין מקום פנוי:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
הקוד הזה יקרוס עם שגיאה כזו:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
במילים אחרות, task
לא ניתן להגיש, כי SynchronousQueue
הוא מתוכנן כך שהוא למעשה מורכב מאלמנט בודד ולא מאפשר לנו להכניס בו שום דבר נוסף. אנחנו יכולים לראות שיש לנו אפס queued tasks
("משימות בתור = 0") כאן. אבל אין בזה שום דבר מוזר, כי זו תכונה מיוחדת של SynchronousQueue
, שלמעשה היא תור של אלמנט אחד שתמיד ריק! כאשר שרשור אחד מכניס אלמנט לתור, הוא ימתין עד שרשור אחר ייקח את האלמנט מהתור. בהתאם לכך, נוכל להחליף אותו ב- new LinkedBlockingQueue<>(1)
והשגיאה תשתנה להצגת כעת queued tasks = 1
. מכיוון שהתור הוא רק אלמנט אחד, אנחנו לא יכולים להוסיף אלמנט שני. וזה מה שגורם לתוכנית להיכשל. בהמשך הדיון שלנו על תור, ראוי לציין שלכיתה ThreadPoolExecutor
יש שיטות נוספות לשירות התור. לדוגמה, threadPoolExecutor.purge()
השיטה תסיר את כל המשימות שבוטלו מהתור על מנת לפנות מקום בתור. פונקציה מעניינת נוספת הקשורה לתור היא המטפל למשימות שנדחו:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
בדוגמה זו, המטפל שלנו פשוט מציג Rejected
בכל פעם שמשימה בתור נדחית. נוח, לא? בנוסף, ThreadPoolExecutor
יש תת-מחלקה מעניינת: ScheduledThreadPoolExecutor
, שהיא ScheduledExecutorService
. הוא מספק את היכולת לבצע משימה על בסיס טיימר.
ScheduledExecutorService
ScheduledExecutorService
(שזה סוג של ExecutorService
) מאפשר לנו להריץ משימות לפי לוח זמנים. בואו נסתכל על דוגמה:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
הכל פשוט כאן. המשימות מוגשות ואז אנחנו מקבלים java.util.concurrent.ScheduledFuture
. לוח זמנים עשוי להיות מועיל גם במצב הבא:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
כאן אנו מגישים Runnable
משימה לביצוע בתדירות קבועה ("FixedRate") עם עיכוב ראשוני מסוים. במקרה זה, לאחר שנייה אחת, המשימה תתחיל להתבצע כל 2 שניות. יש אפשרות דומה:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
אבל במקרה זה, המשימות מבוצעות עם מרווח מסוים בין כל ביצוע. כלומר, הצוואה task
תתבצע לאחר שנייה אחת. לאחר מכן, ברגע שהיא תושלם, יעברו 2 שניות, ואז תתחיל משימה חדשה. להלן מספר משאבים נוספים בנושא זה:
- מבוא לבריכות חוטים בג'אווה
- מבוא ל-Thread Pools ב-Java
- ג'אווה Multithreading Steplechase: ביטול משימות ב-Executors
- שימוש ב-Java Executors עבור משימות רקע

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
בנוסף לבריכות החוטים הנ"ל, יש עוד אחת. אנחנו יכולים לומר בכנות שזה קצת מיוחד. זה נקרא בריכת גניבת עבודה. בקיצור, גניבת עבודה היא אלגוריתם שבו שרשורים סרק מתחילים לקחת משימות משרשורים אחרים או משימות מתור משותף. בואו נסתכל על דוגמה:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
אם נריץ את הקוד הזה, אז זה ExecutorService
ייצור עבורנו 5 שרשורים, מכיוון שכל שרשור יוכנס לתור ההמתנה לאובייקט הנעילה. כבר גילינו צגים ומנעולים ב- Better ביחד: Java ומחלקת Thread. חלק ב' - סנכרון
. עכשיו בואו נחליף Executors.newCachedThreadPool()
ב Executors.newWorkStealingPool()
. מה ישתנה? נראה שהמשימות שלנו מבוצעות בפחות מ-5 שרשורים. זוכרים שזה CachedThreadPool
יוצר שרשור לכל משימה? הסיבה לכך היא wait()
שחסמה את השרשור, משימות עוקבות רוצות להסתיים, ונוצרו עבורן שרשורים חדשים במאגר. עם בריכה גניבה, חוטים לא עומדים בטלים לנצח. הם מתחילים לבצע את המשימות של שכניהם. מה עושה WorkStealingPool
כל כך שונה מבריכות חוטים אחרות? העובדה שהקסום ForkJoinPool
חי בתוכו:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
למעשה, יש עוד הבדל אחד. כברירת מחדל, השרשורים שנוצרו עבור a ForkJoinPool
הם שרשורי דמון, בשונה מהשרשורים שנוצרו באמצעות שרשור רגיל ThreadPool
. באופן כללי, אתה צריך לזכור שרשורי דמון, מכיוון, למשל, CompletableFuture
משתמש גם בשרשורי דמון אלא אם כן אתה מציין משלך ThreadFactory
שיוצר שרשורים שאינם דמונים. אלו ההפתעות שעלולות להיות אורבות במקומות לא צפויים! :)
ForkJoinPool
בחלק זה, נדבר שוב עלForkJoinPool
(הנקראת גם מסגרת המזלג/החבר), שחי "מתחת למכסה המנוע" של WorkStealingPool
. באופן כללי, המסגרת fork/join הופיעה בחזרה ב-Java 1.7. ולמרות ש-Java 11 קרובה, עדיין כדאי לזכור אותה. זה לא היישום הנפוץ ביותר, אבל הוא די מעניין. יש סקירה טובה על זה באינטרנט: הבנת Java Fork-Join Framework עם דוגמאות
. המסתמכת ForkJoinPool
על java.util.concurrent.RecursiveTask
. יש גם java.util.concurrent.RecursiveAction
. RecursiveAction
לא מחזיר תוצאה. לפיכך, RecursiveTask
דומה ל Callable
, ודומה RecursiveAction
ל unnable
. אנו יכולים לראות שהשם כולל שמות של שתי שיטות חשובות: fork
ו join
. השיטה fork
מתחילה משימה כלשהי באופן אסינכרוני בשרשור נפרד. והשיטה join
מאפשרת לך לחכות לעבודה שתתבצע. כדי לקבל את ההבנה הטובה ביותר, כדאי לקרוא מתכנות ציווי ל-Fork/Join לזרמים מקבילים ב-Java 8
.
סיכום
ובכן, זה מסכם את החלק הזה של הסקירה. למדנו שזהExecutor
הומצא במקור כדי לבצע שרשורים. ואז היוצרים של ג'אווה החליטו להמשיך ברעיון והעלו ExecutorService
. ExecutorService
מאפשר לנו לשלוח משימות לביצוע באמצעות submit()
ו invoke()
, וגם לכבות את השירות. מכיוון ExecutorService
שצריך יישומים, הם כתבו מחלקה עם שיטות מפעל וקראו לזה Executors
. זה מאפשר לך ליצור בריכות חוטים ( ThreadPoolExecutor
). בנוסף, ישנם בריכות שרשורים המאפשרות לנו גם לציין לוח זמנים לביצוע. ומסתתר ForkJoinPool
מאחורי א WorkStealingPool
. אני מקווה שמצאת את מה שכתבתי למעלה לא רק מעניין, אלא גם מובן :) אני תמיד שמח לשמוע את ההצעות וההערות שלך. עדיף ביחד: Java וכיתה Thread. חלק א' - חוטי ביצוע
טוב יותר ביחד: ג'אווה ומחלקת ה-Thread. חלק ב' - סנכרון
טוב יותר ביחד: Java ומחלקת Thread. חלק שלישי - אינטראקציה
טובה יותר ביחד: Java ומחלקת Thread. חלק IV — ניתן להתקשרות, עתיד וחברים
טובים יותר ביחד: Java וכיתה Thread. חלק ו' - אש!
GO TO FULL VERSION