Introduktion
Så vi ved, at Java har tråde. Det kan du læse om i anmeldelsen med titlen Better together: Java and the Thread class. Del I — Udførelsestråde .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Som du kan se, er koden til at starte en opgave ret typisk, men vi er nødt til at gentage den for en ny opgave. En løsning er at sætte det i en separat metode, f.eks execute(Runnable runnable)
. Men Javas skabere har overvejet vores situation og kom med grænsefladen Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Denne kode er klart mere kortfattet: nu skriver vi simpelthen kode for at starte Runnable
på tråden. Det er fantastisk, ikke? Men dette er kun begyndelsen: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
har grænsefladen en ExecutorService
undergrænseflade. Javadoc for denne grænseflade siger, at en ExecutorService
beskriver en bestemt Executor
, der giver metoder til at lukke ned Executor
. Det gør det også muligt at få en java.util.concurrent.Future
for at spore eksekveringsprocessen. Tidligere i Better together: Java and the Thread-klassen. Del IV — Callable, Future, and friends , vi gennemgik kort mulighederne for Future
. Hvis du har glemt eller aldrig læser det, foreslår jeg, at du genopfrisker din hukommelse ;) Hvad siger Javadoc ellers? Det fortæller os, at vi har en speciel java.util.concurrent.Executors
fabrik, der lader os oprette standardimplementeringer af ExecutorService
.
ExecutorService
Lad os gennemgå. Vi skalExecutor
udføre (dvs. kalde execute()
på) en bestemt opgave på en tråd, og koden, der skaber tråden, er skjult for os. Vi har ExecutorService
- en specifik Executor
, der har flere muligheder for at kontrollere fremskridt. Og vi har Executors
fabrikken, der lader os skabe en ExecutorService
. Lad os nu gøre det selv:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Du kan se, at vi har specificeret en fast trådpulje, hvis størrelse er 2. Derefter afleverer vi opgaver til puljen én efter én. Hver opgave returnerer en String
indeholdende trådnavnet ( currentThread().GetName()
). Det er vigtigt at lukke ned til ExecutorService
allersidst, for ellers slutter vores program ikke. Fabrikken Executors
har yderligere fabriksmetoder. For eksempel kan vi oprette en pulje, der kun består af én tråd ( newSingleThreadExecutor
) eller en pool, der indeholder en cache ( newCachedThreadPool
), hvorfra tråde fjernes, når de har været inaktive i 1 minut. I virkeligheden er disse ExecutorService
understøttet af en blokerende kø , hvori opgaver placeres, og hvorfra opgaver udføres. Mere information om blokering af køer kan findes i denne video . Du kan også læse detteanmeldelse om BlockingQueue . Og tjek svaret på spørgsmålet "Hvornår skal du foretrække LinkedBlockingQueue frem for ArrayBlockingQueue?" I de enkleste vendinger BlockingQueue
blokerer a en tråd i to tilfælde:
- tråden forsøger at hente varer fra en tom kø
- tråden forsøger at sætte elementer i en fuld kø
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
eller
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Som vi kan se, ExecutorService
er implementeringer af skabt inde i fabrikkens metoder. Og for det meste taler vi om ThreadPoolExecutor
. Kun de parametre, der påvirker arbejdet, ændres. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Som vi så tidligere,ThreadPoolExecutor
er det, der normalt bliver skabt inde i fabrikkens metoder. Funktionaliteten påvirkes af de argumenter, vi sender som maksimum og minimum antal tråde, samt hvilken type kø, der bruges. Men enhver implementering af java.util.concurrent.BlockingQueue
grænsefladen kan bruges. Når vi taler om ThreadPoolExecutor
, bør vi nævne nogle interessante funktioner. Du kan f.eks. ikke sende opgaver til en, ThreadPoolExecutor
hvis der ikke er ledig plads:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Denne kode vil gå ned med en fejl som denne:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Med andre ord kan den task
ikke indsendes, fordi SynchronousQueue
den er designet, så den faktisk består af et enkelt element og ikke tillader os at lægge noget mere i det. Vi kan se, at vi har nul queued tasks
("opgaver i kø = 0") her. Men der er ikke noget mærkeligt ved dette, for dette er et særligt træk ved SynchronousQueue
, som faktisk er en 1-element kø, der altid er tom! Når en tråd sætter et element i køen, vil den vente, indtil en anden tråd tager elementet fra køen. Derfor kan vi erstatte det med, new LinkedBlockingQueue<>(1)
og fejlen ændres til nu at vise queued tasks = 1
. Fordi køen kun er 1 element, kan vi ikke tilføje et andet element. Og det er det, der får programmet til at fejle. For at fortsætte vores diskussion af kø, er det værd at bemærke, atThreadPoolExecutor
klasse har yderligere metoder til at servicere køen. For eksempel threadPoolExecutor.purge()
vil metoden fjerne alle annullerede opgaver fra køen for at frigøre plads i køen. En anden interessant kø-relateret funktion er handleren for afviste opgaver:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
I dette eksempel viser vores handler simpelthen Rejected
hver gang en opgave i køen afvises. Praktisk, ikke? Derudover ThreadPoolExecutor
har en interessant underklasse: ScheduledThreadPoolExecutor
, som er en ScheduledExecutorService
. Det giver mulighed for at udføre en opgave baseret på en timer.
ScheduledExecutorService
ScheduledExecutorService
(som er en type ExecutorService
) lader os køre opgaver efter en tidsplan. Lad os se på et eksempel:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Alt er enkelt her. Opgaverne afleveres og så får vi en java.util.concurrent.ScheduledFuture
. En tidsplan kan også være nyttig i følgende situation:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Her sender vi en Runnable
opgave til udførelse med en fast frekvens ("FixedRate") med en vis indledende forsinkelse. I dette tilfælde vil opgaven efter 1 sekund begynde at blive udført hvert 2. sekund. Der er en lignende mulighed:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Men i dette tilfælde udføres opgaverne med et bestemt interval MELLEM hver udførelse. Det vil sige, task
vil blive udført efter 1 sekund. Så snart det er afsluttet, går der 2 sekunder, og så starter en ny opgave. Her er nogle yderligere ressourcer om dette emne:
- En introduktion til trådpuljer i Java
- Introduktion til trådpuljer i Java
- Java Multithreading Steeplechase: Annullering af opgaver i udførere
- Brug af Java Executors til baggrundsopgaver

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Ud over ovenstående trådpuljer er der en mere. Vi kan ærligt sige, at det er lidt specielt. Det kaldes en pulje til at stjæle arbejde. Kort sagt er arbejdstyveri en algoritme, hvor inaktive tråde begynder at tage opgaver fra andre tråde eller opgaver fra en delt kø. Lad os se på et eksempel:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Hvis vi kører denne kode, vil den ExecutorService
oprette 5 tråde for os, fordi hver tråd vil blive sat i ventekøen til låseobjektet. Vi har allerede fundet ud af skærme og låse i Better sammen: Java og Thread-klassen. Del II — Synkronisering . Lad os nu erstatte Executors.newCachedThreadPool()
med Executors.newWorkStealingPool()
. Hvad vil ændre sig? Vi vil se, at vores opgaver udføres på færre end 5 tråde. Husk, at der CachedThreadPool
opretter en tråd for hver opgave? Det skyldes, at wait()
tråden er blokeret, efterfølgende opgaver ønsker at blive udført, og der blev oprettet nye tråde til dem i puljen. Med en stjælende pool står tråde ikke uvirksomme for evigt. De begynder at udføre deres naboers opgaver. Hvad gør en WorkStealingPool
så forskellig fra andre trådpuljer? Det faktum, at det magiskeForkJoinPool
bor inde i det:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Faktisk er der endnu en forskel. Som standard er de tråde, der er oprettet til en, ForkJoinPool
dæmontråde, i modsætning til de tråde, der er oprettet gennem en almindelig ThreadPool
. Generelt bør du huske dæmontråde, fordi du for eksempel CompletableFuture
også bruger dæmontråde, medmindre du angiver din egen ThreadFactory
, der opretter ikke-dæmontråde. Det er de overraskelser, der kan gemme sig uventede steder! :)
ForkJoinPool
I denne del vil vi igen tale omForkJoinPool
(også kaldet gaffel/sammenføjningsramme), som lever "under motorhjelmen" på WorkStealingPool
. Generelt dukkede fork/join-rammen op tilbage i Java 1.7. Og selvom Java 11 er lige ved hånden, er det stadig værd at huske. Dette er ikke den mest almindelige implementering, men den er ret interessant. Der er en god anmeldelse om dette på nettet: Forstå Java Fork-Join Framework med eksempler . Den ForkJoinPool
er afhængig af java.util.concurrent.RecursiveTask
. Der er også java.util.concurrent.RecursiveAction
. RecursiveAction
returnerer ikke et resultat. Således RecursiveTask
ligner Callable
, og RecursiveAction
ligner unnable
. Vi kan se, at navnet omfatter navnene på to vigtige metoder: fork
og join
. Detfork
metode starter en opgave asynkront på en separat tråd. Og join
metoden lader dig vente på, at arbejdet skal udføres. For at få den bedste forståelse bør du læse Fra imperativ programmering til Fork/Join til Parallel Streams i Java 8 .
Resumé
Nå, det afslutter denne del af anmeldelsen. Vi har erfaret, at detExecutor
oprindeligt blev opfundet til at udføre tråde. Så besluttede Javas skabere at fortsætte ideen og kom op med ExecutorService
. ExecutorService
lader os sende opgaver til udførelse ved hjælp af submit()
og invoke()
, og også lukke tjenesten ned. Fordi ExecutorService
behov for implementeringer, skrev de en klasse med fabriksmetoder og kaldte den Executors
. Det lader dig oprette trådpuljer ( ThreadPoolExecutor
). Derudover er der trådpuljer, som også giver os mulighed for at angive en udførelsesplan. Og a ForkJoinPool
gemmer sig bag en WorkStealingPool
. Jeg håber, at du fandt det, jeg skrev ovenfor, ikke kun interessant, men også forståeligt :) Jeg er altid glad for at høre dine forslag og kommentarer. Bedre sammen: Java og Tråd-klassen. Del I — Udførelsestråde Bedre sammen: Java og trådklassen. Del II — Synkronisering Bedre sammen: Java og Thread-klassen. Del III — Interaktion Bedre sammen: Java og Thread-klassen. Del IV — Callable, Future og friends Bedre sammen: Java og Thread-klassen. Del VI - Fyr væk!
GO TO FULL VERSION