Introducere
Deci, știm că Java are fire. Puteți citi despre asta în recenzia intitulată Better together: Java and the Thread class. Partea I — Fire de execuție .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
După cum puteți vedea, codul pentru a începe o sarcină este destul de tipic, dar trebuie să îl repetăm pentru o sarcină nouă. O soluție este să o puneți într-o metodă separată, de ex execute(Runnable runnable)
. Dar creatorii Java au luat în considerare situația noastră și au venit cu interfața Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Acest cod este în mod clar mai concis: acum pur și simplu scriem cod pentru a începe pe Runnable
fir. E grozav, nu-i așa? Dar acesta este doar începutul: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
interfața are o ExecutorService
subinterfață. Javadoc pentru această interfață spune că an ExecutorService
descrie un anume Executor
care oferă metode pentru a închide fișierul Executor
. De asemenea, face posibilă obținerea unui java.util.concurrent.Future
pentru a urmări procesul de execuție. Anterior, în Better together: Java și clasa Thread. Partea a IV-a — Apelabil, viitor și prieteni , am trecut în revistă pe scurt capacitățile Future
. Dacă l-ai uitat sau nu l-ai citit niciodată, îți sugerez să-ți împrospătează memoria ;) Ce mai spune Javadoc-ul? Ne spune că avem o java.util.concurrent.Executors
fabrică specială care ne permite să creăm implementări implicite ale ExecutorService
.
ExecutorService
Să recapitulăm. TrebuieExecutor
să executăm (adică să apelăm execute()
) o anumită sarcină pe un fir, iar codul care creează firul ne este ascuns. Avem ExecutorService
— un specific Executor
care are mai multe opțiuni pentru controlul progresului. Și avem Executors
fabrica care ne permite să creăm un ExecutorService
. Acum hai să o facem singuri:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Puteți vedea că am specificat un pool de fire fixe a cărui dimensiune este 2. Apoi trimitem sarcinile grupului unul câte unul. Fiecare sarcină returnează un String
care conține numele firului ( currentThread().GetName()
). Este important să închideți ExecutorService
la sfârșit, pentru că altfel programul nostru nu se va termina. Fabrica Executors
are metode de fabricare suplimentare. De exemplu, putem crea un pool format dintr-un singur thread ( newSingleThreadExecutor
) sau un pool care include un cache ( newCachedThreadPool
) din care firele sunt eliminate după ce sunt inactive timp de 1 minut. În realitate, acestea ExecutorService
sunt susținute de o coadă de blocare , în care sunt plasate sarcinile și din care sarcinile sunt executate. Mai multe informații despre blocarea cozilor pot fi găsite în acest videoclip . Puteți citi și astarecenzie despre BlockingQueue . Și verificați răspunsul la întrebarea „Când să preferați LinkedBlockingQueue decât ArrayBlockingQueue?” În cei mai simpli termeni, un BlockingQueue
blochează un fir în două cazuri:
- firul de execuție încearcă să obțină articole dintr-o coadă goală
- firul încearcă să pună articole într-o coadă plină
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
sau
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
După cum putem vedea, implementările ExecutorService
sunt create în cadrul metodelor din fabrică. Și în cea mai mare parte, vorbim despre ThreadPoolExecutor
. Sunt modificați doar parametrii care afectează lucrarea. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
După cum am văzut mai devreme,ThreadPoolExecutor
este ceea ce de obicei este creat în cadrul metodelor din fabrică. Funcționalitatea este afectată de argumentele pe care le transmitem ca număr maxim și minim de fire de execuție, precum și de tipul de coadă utilizat. Dar orice implementare a java.util.concurrent.BlockingQueue
interfeței poate fi folosită. Apropo de ThreadPoolExecutor
, ar trebui să menționăm câteva caracteristici interesante. De exemplu, nu puteți trimite sarcini la un ThreadPoolExecutor
dacă nu există spațiu disponibil:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Acest cod se va bloca cu o eroare ca aceasta:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Cu alte cuvinte, task
nu poate fi depus, pentru că SynchronousQueue
este conceput astfel încât să constă de fapt dintr-un singur element și să nu ne permită să mai punem nimic în el. Putem vedea că avem zero queued tasks
(„sarcini în coadă = 0”) aici. Dar nu este nimic ciudat în asta, deoarece aceasta este o caracteristică specială a SynchronousQueue
, care de fapt este o coadă de 1 element care este întotdeauna goală! Când un thread pune un element în coadă, acesta va aștepta până când un alt thread preia elementul din coadă. În consecință, îl putem înlocui cu new LinkedBlockingQueue<>(1)
și eroarea se va schimba pentru a afișa acum queued tasks = 1
. Deoarece coada este doar 1 element, nu putem adăuga un al doilea element. Și asta face ca programul să eșueze. Continuând discuția noastră despre coadă, este de remarcat faptul căThreadPoolExecutor
clasa are metode suplimentare pentru deservirea cozii. De exemplu, threadPoolExecutor.purge()
metoda va elimina toate sarcinile anulate din coadă pentru a elibera spațiu în coadă. O altă funcție interesantă legată de coadă este handlerul pentru sarcinile respinse:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
În acest exemplu, handlerul nostru afișează pur și simplu Rejected
de fiecare dată când o sarcină din coadă este respinsă. Convenabil, nu-i așa? În plus, ThreadPoolExecutor
are o subclasă interesantă: ScheduledThreadPoolExecutor
, care este un ScheduledExecutorService
. Oferă capacitatea de a efectua o sarcină pe baza unui cronometru.
ScheduledExecutorService
ScheduledExecutorService
(care este un tip de ExecutorService
) ne permite să rulăm sarcini într-un program. Să ne uităm la un exemplu:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Totul este simplu aici. Sarcinile sunt trimise și apoi obținem un java.util.concurrent.ScheduledFuture
. Un program poate fi de asemenea util în următoarea situație:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aici trimitem o Runnable
sarcină pentru execuție la o frecvență fixă ("FixedRate") cu o anumită întârziere inițială. În acest caz, după 1 secundă, sarcina va începe să fie executată la fiecare 2 secunde. Există o opțiune similară:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Dar în acest caz, sarcinile sunt efectuate cu un interval specific ÎNTRE fiecare execuție. Adică task
va fi executat după 1 secundă. Apoi, de îndată ce este finalizat, vor trece 2 secunde, iar apoi va fi începută o nouă sarcină. Iată câteva resurse suplimentare pe această temă:
- O introducere în pool-urile de fire în Java
- Introducere în grupurile de fire în Java
- Java Multithreading Steeplechase: Anularea sarcinilor în executanți
- Utilizarea Java Executors pentru sarcini de fundal

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealing Pool
În plus față de pool-urile de fire de mai sus, mai există unul. Putem spune sincer că este puțin special. Se numește un grup de furt de muncă. Pe scurt, furtul de lucru este un algoritm în care firele inactive încep să preia sarcini din alte fire sau sarcini dintr-o coadă partajată. Să ne uităm la un exemplu:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Dacă rulăm acest cod, atunci ExecutorService
vom crea 5 fire pentru noi, deoarece fiecare thread va fi pus în coada de așteptare pentru obiectul de blocare. Am descoperit deja monitoare și blocări în Better împreună: Java și clasa Thread. Partea a II-a — Sincronizare . Acum să înlocuim Executors.newCachedThreadPool()
cu Executors.newWorkStealingPool()
. Ce se va schimba? Vom vedea că sarcinile noastre sunt executate pe mai puțin de 5 fire. Îți amintești că CachedThreadPool
creează un fir pentru fiecare sarcină? Asta pentru că wait()
a fost blocat firul de execuție, sarcinile ulterioare vor să fie finalizate și au fost create fire noi pentru ele în pool. Cu un bazin de furt, firele nu stau inactiv pentru totdeauna. Încep să îndeplinească sarcinile vecinilor. Ce face un grup WorkStealingPool
atât de diferit de alte grupuri de fire? Faptul că magiculForkJoinPool
trăiește în interiorul ei:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
De fapt, mai este o diferență. În mod implicit, firele create pentru un ForkJoinPool
sunt fire de execuție demon, spre deosebire de firele create printr-un ThreadPool
. În general, ar trebui să vă amintiți firele daemon, deoarece, de exemplu, CompletableFuture
utilizează și firele daemon, cu excepția cazului în care specificați propriile fire de execuție ThreadFactory
care creează fire non-daemon. Acestea sunt surprizele care pot pândi în locuri neașteptate! :)
ForkJoinPool
În această parte, vom vorbi din nou despreForkJoinPool
(numit și cadrul fork/join), care trăiește „sub capota” lui WorkStealingPool
. În general, cadrul fork/join a apărut înapoi în Java 1.7. Și chiar dacă Java 11 este la îndemână, merită să ne amintim. Aceasta nu este cea mai comună implementare, dar este destul de interesantă. Există o recenzie bună despre aceasta pe web: Understanding Java Fork-Join Framework with Examples . Se ForkJoinPool
bazează pe java.util.concurrent.RecursiveTask
. Există și java.util.concurrent.RecursiveAction
. RecursiveAction
nu returnează un rezultat. Astfel, RecursiveTask
este similar cu Callable
, și RecursiveAction
este similar cu unnable
. Putem vedea că numele include numele a două metode importante: fork
și join
. Thefork
metoda începe o sarcină asincron pe un fir separat. Și join
metoda vă permite să așteptați ca munca să fie terminată. Pentru a obține cea mai bună înțelegere, ar trebui să citiți De la programarea imperativă la fork/join to Parallel Streams în Java 8 .
rezumat
Ei bine, asta încheie această parte a recenziei. Am aflat căExecutor
a fost inventat inițial pentru a executa fire. Atunci creatorii lui Java au decis să continue ideea și au venit cu ExecutorService
. ExecutorService
ne permite să trimitem sarcini pentru execuție folosind submit()
și invoke()
și, de asemenea, să închidem serviciul. Pentru că ExecutorService
necesită implementări, au scris o clasă cu metode din fabrică și au numit-o Executors
. Vă permite să creați pool-uri de fire ( ThreadPoolExecutor
). În plus, există pool-uri de fire care ne permit, de asemenea, să specificăm un program de execuție. Și a ForkJoinPool
se ascunde în spatele unui WorkStealingPool
. Sper că ați găsit ceea ce am scris mai sus nu doar interesant, ci și de înțeles :) Mă bucur mereu să aud sugestiile și comentariile voastre. Mai bine împreună: Java și clasa Thread. Partea I — Fire de execuție Mai bine împreună: Java și clasa Thread. Partea a II-a — Sincronizare Mai bine împreună: Java și clasa Thread. Partea a III-a — Interacțiunea Mai bine împreună: Java și clasa Thread. Partea a IV-a — Apelabil, viitor și prieteni Mai bine împreună: Java și clasa Thread. Partea a VI-a — Foc departe!
GO TO FULL VERSION