CodeGym /Java Blog /Willekeurig /Samen beter: Java en de klasse Thread. Deel V — Uitvoerde...
John Squirrels
Niveau 41
San Francisco

Samen beter: Java en de klasse Thread. Deel V — Uitvoerder, ThreadPool, Fork/Join

Gepubliceerd in de groep Willekeurig

Invoering

We weten dus dat Java threads heeft. Dat lees je in de recensie Better together: Java and the Thread class. Deel I — Draden van executie . Samen beter: Java en de klasse Thread.  Deel V — Uitvoerder, ThreadPool, Fork/Join - 1Laten we de typische code nog eens bekijken:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Zoals je kunt zien, is de code om een ​​taak te starten vrij typisch, maar we moeten deze herhalen voor een nieuwe taak. Een oplossing is om het in een aparte methode te plaatsen, bijvoorbeeld execute(Runnable runnable). Maar de makers van Java hebben onze benarde situatie overwogen en kwamen met de Executorinterface:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Deze code is duidelijk beknopter: nu schrijven we gewoon code om de Runnablethread te starten. Dat is geweldig, toch? Maar dit is nog maar het begin: Samen beter: Java en de klasse Thread.  Deel V — Uitvoerder, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Zoals je kunt zien, Executorheeft de interface een ExecutorServicesubinterface. De Javadoc voor deze interface zegt dat een een ExecutorServicebeschrijving beschrijft Executordie methoden biedt om het Executor. Het maakt het ook mogelijk om een java.util.concurrent.Future​​overzicht te krijgen van het uitvoeringsproces. Voorheen, in Samen beter: Java en de klasse Thread. Deel IV — Callable, Future en vrienden , we hebben kort de mogelijkheden van Future. Als je het bent vergeten of het nooit hebt gelezen, raad ik je aan je geheugen op te frissen ;) Wat zegt de Javadoc nog meer? Het vertelt ons dat we een speciale java.util.concurrent.Executorsfabriek hebben waarmee we standaardimplementaties van ExecutorService.

UitvoerderService

Laten we eens kijken. We moeten een bepaalde taak op een thread Executoruitvoeren (dwz een beroep doen op) en de code die de thread maakt, is voor ons verborgen. execute()We hebben ExecutorService- een specifieke Executordie verschillende opties heeft om de voortgang te controleren. En we hebben de Executorsfabriek waarmee we een ExecutorService. Laten we het nu zelf doen:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
U kunt zien dat we een vaste threadpool hebben gespecificeerd waarvan de grootte 2 is. Vervolgens leggen we taken één voor één voor aan de pool. Elke taak retourneert een Stringmet daarin de threadnaam ( currentThread().GetName()). Het is belangrijk om de helemaal aan het einde af te sluiten ExecutorService, anders stopt ons programma niet. De Executorsfabriek heeft aanvullende fabrieksmethoden. We kunnen bijvoorbeeld een pool maken die bestaat uit slechts één thread ( newSingleThreadExecutor) of een pool die een cache bevat ( newCachedThreadPool) waaruit threads worden verwijderd nadat ze 1 minuut inactief zijn geweest. In werkelijkheid worden deze ExecutorServiceondersteund door een blokkeerwachtrij , waarin taken worden geplaatst en van waaruit taken worden uitgevoerd. Meer informatie over het blokkeren van wachtrijen vindt u in deze video . U kunt dit ook lezenrecensie over BlockingQueue . En bekijk het antwoord op de vraag "Wanneer geeft u de voorkeur aan LinkedBlockingQueue boven ArrayBlockingQueue?" In de eenvoudigste bewoordingen BlockingQueueblokkeert a een thread in twee gevallen:
  • de thread probeert items uit een lege wachtrij te halen
  • de thread probeert items in een volledige wachtrij te plaatsen
Als we kijken naar de implementatie van de fabrieksmethoden, kunnen we zien hoe ze werken. Bijvoorbeeld:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
of

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Zoals we kunnen zien, ExecutorServiceworden implementaties van gemaakt binnen de fabrieksmethoden. En voor het grootste deel hebben we het over ThreadPoolExecutor. Alleen de parameters die van invloed zijn op het werk worden gewijzigd. Samen beter: Java en de klasse Thread.  Deel V — Uitvoerder, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Zoals we eerder zagen, ThreadPoolExecutoris dat wat meestal binnen de fabrieksmethoden wordt gemaakt. De functionaliteit wordt beïnvloed door de argumenten die we doorgeven als het maximale en minimale aantal threads, evenals welk type wachtrij wordt gebruikt. Maar elke implementatie van de java.util.concurrent.BlockingQueueinterface kan worden gebruikt. Over gesproken ThreadPoolExecutor, we moeten enkele interessante functies noemen. U kunt bijvoorbeeld geen taken indienen bij een ThreadPoolExecutorals er geen beschikbare ruimte is:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Deze code crasht met een fout als deze:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Met andere woorden, taskkan niet worden ingediend, omdat SynchronousQueuehet zo is ontworpen dat het eigenlijk uit één enkel element bestaat en we er niets meer in kunnen stoppen. We kunnen zien dat we queued taskshier nul ("wachtrijtaken = 0") hebben. Maar daar is niets vreemds aan, want dit is een speciaal kenmerk van SynchronousQueue, wat in feite een wachtrij met 1 element is die altijd leeg is! Wanneer een thread een element in de wachtrij plaatst, wacht deze totdat een andere thread het element uit de wachtrij haalt. Dienovereenkomstig kunnen we het vervangen door new LinkedBlockingQueue<>(1)en de fout zal nu worden weergegeven queued tasks = 1. Omdat de wachtrij slechts 1 element is, kunnen we geen tweede element toevoegen. En dat is wat ervoor zorgt dat het programma mislukt. Voortbordurend op onze bespreking van wachtrij, is het vermeldenswaard dat deThreadPoolExecutorclass heeft aanvullende methoden voor het onderhouden van de wachtrij. De methode verwijdert bijvoorbeeld threadPoolExecutor.purge()alle geannuleerde taken uit de wachtrij om ruimte vrij te maken in de wachtrij. Een andere interessante wachtrijgerelateerde functie is de handler voor afgewezen taken:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
In dit voorbeeld geeft onze handler simpelweg Rejectedelke keer weer dat een taak in de wachtrij wordt afgewezen. Handig, toch? Heeft bovendien ThreadPoolExecutoreen interessante subklasse: ScheduledThreadPoolExecutor, wat een ScheduledExecutorService. Het biedt de mogelijkheid om een ​​taak uit te voeren op basis van een timer.

ScheduledExecutorService

ScheduledExecutorService(wat een type is van ExecutorService) laat ons taken volgens een schema uitvoeren. Laten we naar een voorbeeld kijken:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Alles is hier eenvoudig. De taken worden ingediend en dan krijgen we een java.util.concurrent.ScheduledFuture. Een schema kan ook nuttig zijn in de volgende situatie:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Hier leggen we een Runnabletaak voor uitvoering voor met een vaste frequentie ("FixedRate") met een bepaalde initiële vertraging. In dit geval wordt de taak na 1 seconde om de 2 seconden uitgevoerd. Er is een vergelijkbare optie:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Maar in dit geval worden de taken uitgevoerd met een specifiek interval TUSSEN elke uitvoering. Dat wil zeggen, het taskwordt na 1 seconde uitgevoerd. Zodra het voltooid is, gaan er 2 seconden voorbij en wordt een nieuwe taak gestart. Hier zijn enkele aanvullende bronnen over dit onderwerp: Samen beter: Java en de klasse Thread.  Deel V — Uitvoerder, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Naast de bovenstaande threadpools is er nog een. We kunnen eerlijk zeggen dat het een beetje speciaal is. Het heet een pool voor werkstelen. Kortom, work-stealing is een algoritme waarbij inactieve threads taken gaan overnemen van andere threads of taken van een gedeelde wachtrij. Laten we naar een voorbeeld kijken:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Als we deze code uitvoeren, ExecutorServiceworden er 5 threads voor ons gemaakt, omdat elke thread in de wachtrij voor het lock-object wordt geplaatst. Monitoren en vergrendelingen hebben we al samen bedacht in Better: Java en de Thread-klasse. Deel II — Synchronisatie . Laten we nu vervangen Executors.newCachedThreadPool()door Executors.newWorkStealingPool(). Wat gaat er veranderen? We zullen zien dat onze taken worden uitgevoerd op minder dan 5 threads. Weet je nog dat er CachedThreadPoolvoor elke taak een thread wordt gemaakt? Dat komt omdat wait()de thread is geblokkeerd, volgende taken willen worden voltooid en er zijn nieuwe threads voor hen gemaakt in de pool. Met een stealing-pool staan ​​threads niet eeuwig stil. Ze beginnen de taken van hun buren uit te voeren. Wat maakt a WorkStealingPoolzo anders dan andere threadpools? Het feit dat het magischeForkJoinPoolleeft erin:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Eigenlijk is er nog een verschil. Standaard zijn de threads die worden gemaakt voor een ForkJoinPooldaemon-threads, in tegenstelling tot de threads die worden gemaakt via een standaard ThreadPool. Over het algemeen moet u daemon-threads onthouden, omdat bijvoorbeeld CompletableFutureook daemon-threads worden gebruikt, tenzij u uw eigen thread opgeeft ThreadFactorydie niet-daemon-threads maakt. Dit zijn de verrassingen die op onverwachte plaatsen op de loer kunnen liggen! :)

ForkJoinPool

In dit deel zullen we het opnieuw hebben over ForkJoinPool(ook wel het fork/join-framework genoemd), dat "onder de motorkap" leeft van WorkStealingPool. Over het algemeen verscheen het fork/join-framework al in Java 1.7. En hoewel Java 11 dichtbij is, is het toch de moeite waard om eraan te denken. Dit is niet de meest voorkomende implementatie, maar het is best interessant. Er is een goede recensie hierover op internet: Java Fork-Join Framework begrijpen met voorbeelden . Het ForkJoinPoolberust op java.util.concurrent.RecursiveTask. Er is ook java.util.concurrent.RecursiveAction. RecursiveActiongeeft geen resultaat. Dus, RecursiveTaskis vergelijkbaar met Callable, en RecursiveActionis vergelijkbaar met unnable. We kunnen zien dat de naam de namen van twee belangrijke methoden bevat: forken join. Deforkmethode start een taak asynchroon op een aparte thread. En met de joinmethode kunt u wachten tot het werk gedaan is. Om het beste begrip te krijgen, moet u From Imperative Programming to Fork/Join to Parallel Streams in Java 8 lezen .

Samenvatting

Nou, dat rondt dit deel van de recensie af. We hebben geleerd dat dit Executoroorspronkelijk is uitgevonden om threads uit te voeren. Toen besloten de makers van Java om het idee voort te zetten en kwamen met ExecutorService. ExecutorServicelaten we taken indienen voor uitvoering met submit()en invoke(), en ook de service afsluiten. Omdat ExecutorServiceer implementaties nodig zijn, schreven ze een klasse met fabrieksmethoden en noemden deze Executors. Hiermee kunt u threadpools maken ( ThreadPoolExecutor). Daarnaast zijn er threadpools waarmee we ook een uitvoeringsschema kunnen specificeren. En een ForkJoinPoolverschuilt zich achter een WorkStealingPool. Ik hoop dat je wat ik hierboven schreef niet alleen interessant, maar ook begrijpelijk vond :) Ik ben altijd blij om je suggesties en opmerkingen te horen. Samen beter: Java en de klasse Thread. Deel I — Uitvoeringsthreads Beter samen: Java en de klasse Thread. Deel II — Synchronisatie Samen beter: Java en de klasse Thread. Deel III — Interactie Samen beter: Java en de klasse Thread. Deel IV — Callable, Future en vrienden Samen beter: Java en de Thread-klasse. Deel VI — Vuur weg!
Opmerkingen
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION