CodeGym /Blog Java /Aleatoriu /Mai bine împreună: Java și clasa Thread. Partea V — Execu...
John Squirrels
Nivel
San Francisco

Mai bine împreună: Java și clasa Thread. Partea V — Executor, ThreadPool, Furk/Join

Publicat în grup

Introducere

Deci, știm că Java are fire. Puteți citi despre asta în recenzia intitulată Better together: Java and the Thread class. Partea I — Fire de execuție . Mai bine împreună: Java și clasa Thread.  Partea V — Executor, ThreadPool, Furk/Join - 1Să aruncăm o altă privire la codul tipic:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
După cum puteți vedea, codul pentru a începe o sarcină este destul de tipic, dar trebuie să îl repetăm ​​pentru o sarcină nouă. O soluție este să o puneți într-o metodă separată, de ex execute(Runnable runnable). Dar creatorii Java au luat în considerare situația noastră și au venit cu interfața Executor:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Acest cod este în mod clar mai concis: acum pur și simplu scriem cod pentru a începe pe Runnablefir. E grozav, nu-i așa? Dar acesta este doar începutul: Mai bine împreună: Java și clasa Thread.  Partea a V-a — Executor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

După cum puteți vedea, Executorinterfața are o ExecutorServicesubinterfață. Javadoc pentru această interfață spune că an ExecutorServicedescrie un anume Executorcare oferă metode pentru a închide fișierul Executor. De asemenea, face posibilă obținerea unui java.util.concurrent.Futurepentru a urmări procesul de execuție. Anterior, în Better together: Java și clasa Thread. Partea a IV-a — Apelabil, viitor și prieteni , am trecut în revistă pe scurt capacitățile Future. Dacă l-ai uitat sau nu l-ai citit niciodată, îți sugerez să-ți împrospătează memoria ;) Ce mai spune Javadoc-ul? Ne spune că avem o java.util.concurrent.Executorsfabrică specială care ne permite să creăm implementări implicite ale ExecutorService.

ExecutorService

Să recapitulăm. Trebuie Executorsă executăm (adică să apelăm execute()) o anumită sarcină pe un fir, iar codul care creează firul ne este ascuns. Avem ExecutorService— un specific Executorcare are mai multe opțiuni pentru controlul progresului. Și avem Executorsfabrica care ne permite să creăm un ExecutorService. Acum hai să o facem singuri:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Puteți vedea că am specificat un pool de fire fixe a cărui dimensiune este 2. Apoi trimitem sarcinile grupului unul câte unul. Fiecare sarcină returnează un Stringcare conține numele firului ( currentThread().GetName()). Este important să închideți ExecutorServicela sfârșit, pentru că altfel programul nostru nu se va termina. Fabrica Executorsare metode de fabricare suplimentare. De exemplu, putem crea un pool format dintr-un singur thread ( newSingleThreadExecutor) sau un pool care include un cache ( newCachedThreadPool) din care firele sunt eliminate după ce sunt inactive timp de 1 minut. În realitate, acestea ExecutorServicesunt susținute de o coadă de blocare , în care sunt plasate sarcinile și din care sarcinile sunt executate. Mai multe informații despre blocarea cozilor pot fi găsite în acest videoclip . Puteți citi și astarecenzie despre BlockingQueue . Și verificați răspunsul la întrebarea „Când să preferați LinkedBlockingQueue decât ArrayBlockingQueue?” În cei mai simpli termeni, un BlockingQueueblochează un fir în două cazuri:
  • firul de execuție încearcă să obțină articole dintr-o coadă goală
  • firul încearcă să pună articole într-o coadă plină
Dacă ne uităm la implementarea metodelor din fabrică, putem vedea cum funcționează. De exemplu:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
sau

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
După cum putem vedea, implementările ExecutorServicesunt create în cadrul metodelor din fabrică. Și în cea mai mare parte, vorbim despre ThreadPoolExecutor. Sunt modificați doar parametrii care afectează lucrarea. Mai bine împreună: Java și clasa Thread.  Partea V — Executor, ThreadPool, Furk/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

După cum am văzut mai devreme, ThreadPoolExecutoreste ceea ce de obicei este creat în cadrul metodelor din fabrică. Funcționalitatea este afectată de argumentele pe care le transmitem ca număr maxim și minim de fire de execuție, precum și de tipul de coadă utilizat. Dar orice implementare a java.util.concurrent.BlockingQueueinterfeței poate fi folosită. Apropo de ThreadPoolExecutor, ar trebui să menționăm câteva caracteristici interesante. De exemplu, nu puteți trimite sarcini la un ThreadPoolExecutordacă nu există spațiu disponibil:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Acest cod se va bloca cu o eroare ca aceasta:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Cu alte cuvinte, tasknu poate fi depus, pentru că SynchronousQueueeste conceput astfel încât să constă de fapt dintr-un singur element și să nu ne permită să mai punem nimic în el. Putem vedea că avem zero queued tasks(„sarcini în coadă = 0”) aici. Dar nu este nimic ciudat în asta, deoarece aceasta este o caracteristică specială a SynchronousQueue, care de fapt este o coadă de 1 element care este întotdeauna goală! Când un thread pune un element în coadă, acesta va aștepta până când un alt thread preia elementul din coadă. În consecință, îl putem înlocui cu new LinkedBlockingQueue<>(1)și eroarea se va schimba pentru a afișa acum queued tasks = 1. Deoarece coada este doar 1 element, nu putem adăuga un al doilea element. Și asta face ca programul să eșueze. Continuând discuția noastră despre coadă, este de remarcat faptul căThreadPoolExecutorclasa are metode suplimentare pentru deservirea cozii. De exemplu, threadPoolExecutor.purge()metoda va elimina toate sarcinile anulate din coadă pentru a elibera spațiu în coadă. O altă funcție interesantă legată de coadă este handlerul pentru sarcinile respinse:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
În acest exemplu, handlerul nostru afișează pur și simplu Rejectedde fiecare dată când o sarcină din coadă este respinsă. Convenabil, nu-i așa? În plus, ThreadPoolExecutorare o subclasă interesantă: ScheduledThreadPoolExecutor, care este un ScheduledExecutorService. Oferă capacitatea de a efectua o sarcină pe baza unui cronometru.

ScheduledExecutorService

ScheduledExecutorService(care este un tip de ExecutorService) ne permite să rulăm sarcini într-un program. Să ne uităm la un exemplu:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Totul este simplu aici. Sarcinile sunt trimise și apoi obținem un java.util.concurrent.ScheduledFuture. Un program poate fi de asemenea util în următoarea situație:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aici trimitem o Runnablesarcină pentru execuție la o frecvență fixă ​​("FixedRate") cu o anumită întârziere inițială. În acest caz, după 1 secundă, sarcina va începe să fie executată la fiecare 2 secunde. Există o opțiune similară:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Dar în acest caz, sarcinile sunt efectuate cu un interval specific ÎNTRE fiecare execuție. Adică taskva fi executat după 1 secundă. Apoi, de îndată ce este finalizat, vor trece 2 secunde, iar apoi va fi începută o nouă sarcină. Iată câteva resurse suplimentare pe această temă: Mai bine împreună: Java și clasa Thread.  Partea a V-a — Executor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealing Pool

În plus față de pool-urile de fire de mai sus, mai există unul. Putem spune sincer că este puțin special. Se numește un grup de furt de muncă. Pe scurt, furtul de lucru este un algoritm în care firele inactive încep să preia sarcini din alte fire sau sarcini dintr-o coadă partajată. Să ne uităm la un exemplu:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Dacă rulăm acest cod, atunci ExecutorServicevom crea 5 fire pentru noi, deoarece fiecare thread va fi pus în coada de așteptare pentru obiectul de blocare. Am descoperit deja monitoare și blocări în Better împreună: Java și clasa Thread. Partea a II-a — Sincronizare . Acum să înlocuim Executors.newCachedThreadPool()cu Executors.newWorkStealingPool(). Ce se va schimba? Vom vedea că sarcinile noastre sunt executate pe mai puțin de 5 fire. Îți amintești că CachedThreadPoolcreează un fir pentru fiecare sarcină? Asta pentru că wait()a fost blocat firul de execuție, sarcinile ulterioare vor să fie finalizate și au fost create fire noi pentru ele în pool. Cu un bazin de furt, firele nu stau inactiv pentru totdeauna. Încep să îndeplinească sarcinile vecinilor. Ce face un grup WorkStealingPoolatât de diferit de alte grupuri de fire? Faptul că magiculForkJoinPooltrăiește în interiorul ei:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
De fapt, mai este o diferență. În mod implicit, firele create pentru un ForkJoinPoolsunt fire de execuție demon, spre deosebire de firele create printr-un ThreadPool. În general, ar trebui să vă amintiți firele daemon, deoarece, de exemplu, CompletableFutureutilizează și firele daemon, cu excepția cazului în care specificați propriile fire de execuție ThreadFactorycare creează fire non-daemon. Acestea sunt surprizele care pot pândi în locuri neașteptate! :)

ForkJoinPool

În această parte, vom vorbi din nou despre ForkJoinPool(numit și cadrul fork/join), care trăiește „sub capota” lui WorkStealingPool. În general, cadrul fork/join a apărut înapoi în Java 1.7. Și chiar dacă Java 11 este la îndemână, merită să ne amintim. Aceasta nu este cea mai comună implementare, dar este destul de interesantă. Există o recenzie bună despre aceasta pe web: Understanding Java Fork-Join Framework with Examples . Se ForkJoinPoolbazează pe java.util.concurrent.RecursiveTask. Există și java.util.concurrent.RecursiveAction. RecursiveActionnu returnează un rezultat. Astfel, RecursiveTaskeste similar cu Callable, și RecursiveActioneste similar cu unnable. Putem vedea că numele include numele a două metode importante: forkși join. Theforkmetoda începe o sarcină asincron pe un fir separat. Și joinmetoda vă permite să așteptați ca munca să fie terminată. Pentru a obține cea mai bună înțelegere, ar trebui să citiți De la programarea imperativă la fork/join to Parallel Streams în Java 8 .

rezumat

Ei bine, asta încheie această parte a recenziei. Am aflat că Executora fost inventat inițial pentru a executa fire. Atunci creatorii lui Java au decis să continue ideea și au venit cu ExecutorService. ExecutorServicene permite să trimitem sarcini pentru execuție folosind submit()și invoke()și, de asemenea, să închidem serviciul. Pentru că ExecutorServicenecesită implementări, au scris o clasă cu metode din fabrică și au numit-o Executors. Vă permite să creați pool-uri de fire ( ThreadPoolExecutor). În plus, există pool-uri de fire care ne permit, de asemenea, să specificăm un program de execuție. Și a ForkJoinPoolse ascunde în spatele unui WorkStealingPool. Sper că ați găsit ceea ce am scris mai sus nu doar interesant, ci și de înțeles :) Mă bucur mereu să aud sugestiile și comentariile voastre. Mai bine împreună: Java și clasa Thread. Partea I — Fire de execuție Mai bine împreună: Java și clasa Thread. Partea a II-a — Sincronizare Mai bine împreună: Java și clasa Thread. Partea a III-a — Interacțiunea Mai bine împreună: Java și clasa Thread. Partea a IV-a — Apelabil, viitor și prieteni Mai bine împreună: Java și clasa Thread. Partea a VI-a — Foc departe!
Comentarii
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION