Invoering
We weten dus dat Java threads heeft. Dat lees je in de recensie Better together: Java and the Thread class. Deel I — Draden van executie .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Zoals je kunt zien, is de code om een taak te starten vrij typisch, maar we moeten deze herhalen voor een nieuwe taak. Een oplossing is om het in een aparte methode te plaatsen, bijvoorbeeld execute(Runnable runnable)
. Maar de makers van Java hebben onze benarde situatie overwogen en kwamen met de Executor
interface:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Deze code is duidelijk beknopter: nu schrijven we gewoon code om de Runnable
thread te starten. Dat is geweldig, toch? Maar dit is nog maar het begin: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
heeft de interface een ExecutorService
subinterface. De Javadoc voor deze interface zegt dat een een ExecutorService
beschrijving beschrijft Executor
die methoden biedt om het Executor
. Het maakt het ook mogelijk om een java.util.concurrent.Future
overzicht te krijgen van het uitvoeringsproces. Voorheen, in Samen beter: Java en de klasse Thread. Deel IV — Callable, Future en vrienden , we hebben kort de mogelijkheden van Future
. Als je het bent vergeten of het nooit hebt gelezen, raad ik je aan je geheugen op te frissen ;) Wat zegt de Javadoc nog meer? Het vertelt ons dat we een speciale java.util.concurrent.Executors
fabriek hebben waarmee we standaardimplementaties van ExecutorService
.
UitvoerderService
Laten we eens kijken. We moeten een bepaalde taak op een threadExecutor
uitvoeren (dwz een beroep doen op) en de code die de thread maakt, is voor ons verborgen. execute()
We hebben ExecutorService
- een specifieke Executor
die verschillende opties heeft om de voortgang te controleren. En we hebben de Executors
fabriek waarmee we een ExecutorService
. Laten we het nu zelf doen:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
U kunt zien dat we een vaste threadpool hebben gespecificeerd waarvan de grootte 2 is. Vervolgens leggen we taken één voor één voor aan de pool. Elke taak retourneert een String
met daarin de threadnaam ( currentThread().GetName()
). Het is belangrijk om de helemaal aan het einde af te sluiten ExecutorService
, anders stopt ons programma niet. De Executors
fabriek heeft aanvullende fabrieksmethoden. We kunnen bijvoorbeeld een pool maken die bestaat uit slechts één thread ( newSingleThreadExecutor
) of een pool die een cache bevat ( newCachedThreadPool
) waaruit threads worden verwijderd nadat ze 1 minuut inactief zijn geweest. In werkelijkheid worden deze ExecutorService
ondersteund door een blokkeerwachtrij , waarin taken worden geplaatst en van waaruit taken worden uitgevoerd. Meer informatie over het blokkeren van wachtrijen vindt u in deze video . U kunt dit ook lezenrecensie over BlockingQueue . En bekijk het antwoord op de vraag "Wanneer geeft u de voorkeur aan LinkedBlockingQueue boven ArrayBlockingQueue?" In de eenvoudigste bewoordingen BlockingQueue
blokkeert a een thread in twee gevallen:
- de thread probeert items uit een lege wachtrij te halen
- de thread probeert items in een volledige wachtrij te plaatsen
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
of
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Zoals we kunnen zien, ExecutorService
worden implementaties van gemaakt binnen de fabrieksmethoden. En voor het grootste deel hebben we het over ThreadPoolExecutor
. Alleen de parameters die van invloed zijn op het werk worden gewijzigd. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Zoals we eerder zagen,ThreadPoolExecutor
is dat wat meestal binnen de fabrieksmethoden wordt gemaakt. De functionaliteit wordt beïnvloed door de argumenten die we doorgeven als het maximale en minimale aantal threads, evenals welk type wachtrij wordt gebruikt. Maar elke implementatie van de java.util.concurrent.BlockingQueue
interface kan worden gebruikt. Over gesproken ThreadPoolExecutor
, we moeten enkele interessante functies noemen. U kunt bijvoorbeeld geen taken indienen bij een ThreadPoolExecutor
als er geen beschikbare ruimte is:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Deze code crasht met een fout als deze:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Met andere woorden, task
kan niet worden ingediend, omdat SynchronousQueue
het zo is ontworpen dat het eigenlijk uit één enkel element bestaat en we er niets meer in kunnen stoppen. We kunnen zien dat we queued tasks
hier nul ("wachtrijtaken = 0") hebben. Maar daar is niets vreemds aan, want dit is een speciaal kenmerk van SynchronousQueue
, wat in feite een wachtrij met 1 element is die altijd leeg is! Wanneer een thread een element in de wachtrij plaatst, wacht deze totdat een andere thread het element uit de wachtrij haalt. Dienovereenkomstig kunnen we het vervangen door new LinkedBlockingQueue<>(1)
en de fout zal nu worden weergegeven queued tasks = 1
. Omdat de wachtrij slechts 1 element is, kunnen we geen tweede element toevoegen. En dat is wat ervoor zorgt dat het programma mislukt. Voortbordurend op onze bespreking van wachtrij, is het vermeldenswaard dat deThreadPoolExecutor
class heeft aanvullende methoden voor het onderhouden van de wachtrij. De methode verwijdert bijvoorbeeld threadPoolExecutor.purge()
alle geannuleerde taken uit de wachtrij om ruimte vrij te maken in de wachtrij. Een andere interessante wachtrijgerelateerde functie is de handler voor afgewezen taken:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
In dit voorbeeld geeft onze handler simpelweg Rejected
elke keer weer dat een taak in de wachtrij wordt afgewezen. Handig, toch? Heeft bovendien ThreadPoolExecutor
een interessante subklasse: ScheduledThreadPoolExecutor
, wat een ScheduledExecutorService
. Het biedt de mogelijkheid om een taak uit te voeren op basis van een timer.
ScheduledExecutorService
ScheduledExecutorService
(wat een type is van ExecutorService
) laat ons taken volgens een schema uitvoeren. Laten we naar een voorbeeld kijken:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Alles is hier eenvoudig. De taken worden ingediend en dan krijgen we een java.util.concurrent.ScheduledFuture
. Een schema kan ook nuttig zijn in de volgende situatie:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Hier leggen we een Runnable
taak voor uitvoering voor met een vaste frequentie ("FixedRate") met een bepaalde initiële vertraging. In dit geval wordt de taak na 1 seconde om de 2 seconden uitgevoerd. Er is een vergelijkbare optie:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Maar in dit geval worden de taken uitgevoerd met een specifiek interval TUSSEN elke uitvoering. Dat wil zeggen, het task
wordt na 1 seconde uitgevoerd. Zodra het voltooid is, gaan er 2 seconden voorbij en wordt een nieuwe taak gestart. Hier zijn enkele aanvullende bronnen over dit onderwerp:
- Een inleiding tot threadpools in Java
- Inleiding tot threadpools in Java
- Java Multithreading Steeplechase: taken annuleren in uitvoerders
- Java-uitvoerders gebruiken voor achtergrondtaken

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Naast de bovenstaande threadpools is er nog een. We kunnen eerlijk zeggen dat het een beetje speciaal is. Het heet een pool voor werkstelen. Kortom, work-stealing is een algoritme waarbij inactieve threads taken gaan overnemen van andere threads of taken van een gedeelde wachtrij. Laten we naar een voorbeeld kijken:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Als we deze code uitvoeren, ExecutorService
worden er 5 threads voor ons gemaakt, omdat elke thread in de wachtrij voor het lock-object wordt geplaatst. Monitoren en vergrendelingen hebben we al samen bedacht in Better: Java en de Thread-klasse. Deel II — Synchronisatie . Laten we nu vervangen Executors.newCachedThreadPool()
door Executors.newWorkStealingPool()
. Wat gaat er veranderen? We zullen zien dat onze taken worden uitgevoerd op minder dan 5 threads. Weet je nog dat er CachedThreadPool
voor elke taak een thread wordt gemaakt? Dat komt omdat wait()
de thread is geblokkeerd, volgende taken willen worden voltooid en er zijn nieuwe threads voor hen gemaakt in de pool. Met een stealing-pool staan threads niet eeuwig stil. Ze beginnen de taken van hun buren uit te voeren. Wat maakt a WorkStealingPool
zo anders dan andere threadpools? Het feit dat het magischeForkJoinPool
leeft erin:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Eigenlijk is er nog een verschil. Standaard zijn de threads die worden gemaakt voor een ForkJoinPool
daemon-threads, in tegenstelling tot de threads die worden gemaakt via een standaard ThreadPool
. Over het algemeen moet u daemon-threads onthouden, omdat bijvoorbeeld CompletableFuture
ook daemon-threads worden gebruikt, tenzij u uw eigen thread opgeeft ThreadFactory
die niet-daemon-threads maakt. Dit zijn de verrassingen die op onverwachte plaatsen op de loer kunnen liggen! :)
ForkJoinPool
In dit deel zullen we het opnieuw hebben overForkJoinPool
(ook wel het fork/join-framework genoemd), dat "onder de motorkap" leeft van WorkStealingPool
. Over het algemeen verscheen het fork/join-framework al in Java 1.7. En hoewel Java 11 dichtbij is, is het toch de moeite waard om eraan te denken. Dit is niet de meest voorkomende implementatie, maar het is best interessant. Er is een goede recensie hierover op internet: Java Fork-Join Framework begrijpen met voorbeelden . Het ForkJoinPool
berust op java.util.concurrent.RecursiveTask
. Er is ook java.util.concurrent.RecursiveAction
. RecursiveAction
geeft geen resultaat. Dus, RecursiveTask
is vergelijkbaar met Callable
, en RecursiveAction
is vergelijkbaar met unnable
. We kunnen zien dat de naam de namen van twee belangrijke methoden bevat: fork
en join
. Defork
methode start een taak asynchroon op een aparte thread. En met de join
methode kunt u wachten tot het werk gedaan is. Om het beste begrip te krijgen, moet u From Imperative Programming to Fork/Join to Parallel Streams in Java 8 lezen .
Samenvatting
Nou, dat rondt dit deel van de recensie af. We hebben geleerd dat ditExecutor
oorspronkelijk is uitgevonden om threads uit te voeren. Toen besloten de makers van Java om het idee voort te zetten en kwamen met ExecutorService
. ExecutorService
laten we taken indienen voor uitvoering met submit()
en invoke()
, en ook de service afsluiten. Omdat ExecutorService
er implementaties nodig zijn, schreven ze een klasse met fabrieksmethoden en noemden deze Executors
. Hiermee kunt u threadpools maken ( ThreadPoolExecutor
). Daarnaast zijn er threadpools waarmee we ook een uitvoeringsschema kunnen specificeren. En een ForkJoinPool
verschuilt zich achter een WorkStealingPool
. Ik hoop dat je wat ik hierboven schreef niet alleen interessant, maar ook begrijpelijk vond :) Ik ben altijd blij om je suggesties en opmerkingen te horen. Samen beter: Java en de klasse Thread. Deel I — Uitvoeringsthreads Beter samen: Java en de klasse Thread. Deel II — Synchronisatie Samen beter: Java en de klasse Thread. Deel III — Interactie Samen beter: Java en de klasse Thread. Deel IV — Callable, Future en vrienden Samen beter: Java en de Thread-klasse. Deel VI — Vuur weg!
GO TO FULL VERSION