CodeGym /Java blog /Tilfældig /Bedre sammen: Java og Tråd-klassen. Del V — Executor, Thr...
John Squirrels
Niveau
San Francisco

Bedre sammen: Java og Tråd-klassen. Del V — Executor, ThreadPool, Fork/Join

Udgivet i gruppen

Introduktion

Så vi ved, at Java har tråde. Det kan du læse om i anmeldelsen med titlen Better together: Java and the Thread class. Del I — Udførelsestråde . Bedre sammen: Java og Tråd-klassen.  Del V — Udfører, ThreadPool, Fork/Join - 1Lad os se på den typiske kode igen:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Som du kan se, er koden til at starte en opgave ret typisk, men vi er nødt til at gentage den for en ny opgave. En løsning er at sætte det i en separat metode, f.eks execute(Runnable runnable). Men Javas skabere har overvejet vores situation og kom med grænsefladen Executor:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Denne kode er klart mere kortfattet: nu skriver vi simpelthen kode for at starte Runnablepå tråden. Det er fantastisk, ikke? Men dette er kun begyndelsen: Bedre sammen: Java og Tråd-klassen.  Del V — Udfører, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Som du kan se, Executorhar grænsefladen en ExecutorServiceundergrænseflade. Javadoc for denne grænseflade siger, at en ExecutorServicebeskriver en bestemt Executor, der giver metoder til at lukke ned Executor. Det gør det også muligt at få en java.util.concurrent.Futurefor at spore eksekveringsprocessen. Tidligere i Better together: Java and the Thread-klassen. Del IV — Callable, Future, and friends , vi gennemgik kort mulighederne for Future. Hvis du har glemt eller aldrig læser det, foreslår jeg, at du genopfrisker din hukommelse ;) Hvad siger Javadoc ellers? Det fortæller os, at vi har en speciel java.util.concurrent.Executorsfabrik, der lader os oprette standardimplementeringer af ExecutorService.

ExecutorService

Lad os gennemgå. Vi skal Executorudføre (dvs. kalde execute()på) en bestemt opgave på en tråd, og koden, der skaber tråden, er skjult for os. Vi har ExecutorService- en specifik Executor, der har flere muligheder for at kontrollere fremskridt. Og vi har Executorsfabrikken, der lader os skabe en ExecutorService. Lad os nu gøre det selv:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Du kan se, at vi har specificeret en fast trådpulje, hvis størrelse er 2. Derefter afleverer vi opgaver til puljen én efter én. Hver opgave returnerer en Stringindeholdende trådnavnet ( currentThread().GetName()). Det er vigtigt at lukke ned til ExecutorServiceallersidst, for ellers slutter vores program ikke. Fabrikken Executorshar yderligere fabriksmetoder. For eksempel kan vi oprette en pulje, der kun består af én tråd ( newSingleThreadExecutor) eller en pool, der indeholder en cache ( newCachedThreadPool), hvorfra tråde fjernes, når de har været inaktive i 1 minut. I virkeligheden er disse ExecutorServiceunderstøttet af en blokerende kø , hvori opgaver placeres, og hvorfra opgaver udføres. Mere information om blokering af køer kan findes i denne video . Du kan også læse detteanmeldelse om BlockingQueue . Og tjek svaret på spørgsmålet "Hvornår skal du foretrække LinkedBlockingQueue frem for ArrayBlockingQueue?" I de enkleste vendinger BlockingQueueblokerer a en tråd i to tilfælde:
  • tråden forsøger at hente varer fra en tom kø
  • tråden forsøger at sætte elementer i en fuld kø
Hvis vi ser på implementeringen af ​​fabriksmetoderne, kan vi se, hvordan de virker. For eksempel:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
eller

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Som vi kan se, ExecutorServiceer implementeringer af skabt inde i fabrikkens metoder. Og for det meste taler vi om ThreadPoolExecutor. Kun de parametre, der påvirker arbejdet, ændres. Bedre sammen: Java og Tråd-klassen.  Del V — Udfører, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Som vi så tidligere, ThreadPoolExecutorer det, der normalt bliver skabt inde i fabrikkens metoder. Funktionaliteten påvirkes af de argumenter, vi sender som maksimum og minimum antal tråde, samt hvilken type kø, der bruges. Men enhver implementering af java.util.concurrent.BlockingQueuegrænsefladen kan bruges. Når vi taler om ThreadPoolExecutor, bør vi nævne nogle interessante funktioner. Du kan f.eks. ikke sende opgaver til en, ThreadPoolExecutorhvis der ikke er ledig plads:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Denne kode vil gå ned med en fejl som denne:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Med andre ord kan den taskikke indsendes, fordi SynchronousQueueden er designet, så den faktisk består af et enkelt element og ikke tillader os at lægge noget mere i det. Vi kan se, at vi har nul queued tasks("opgaver i kø = 0") her. Men der er ikke noget mærkeligt ved dette, for dette er et særligt træk ved SynchronousQueue, som faktisk er en 1-element kø, der altid er tom! Når en tråd sætter et element i køen, vil den vente, indtil en anden tråd tager elementet fra køen. Derfor kan vi erstatte det med, new LinkedBlockingQueue<>(1)og fejlen ændres til nu at vise queued tasks = 1. Fordi køen kun er 1 element, kan vi ikke tilføje et andet element. Og det er det, der får programmet til at fejle. For at fortsætte vores diskussion af kø, er det værd at bemærke, atThreadPoolExecutorklasse har yderligere metoder til at servicere køen. For eksempel threadPoolExecutor.purge()vil metoden fjerne alle annullerede opgaver fra køen for at frigøre plads i køen. En anden interessant kø-relateret funktion er handleren for afviste opgaver:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
I dette eksempel viser vores handler simpelthen Rejectedhver gang en opgave i køen afvises. Praktisk, ikke? Derudover ThreadPoolExecutorhar en interessant underklasse: ScheduledThreadPoolExecutor, som er en ScheduledExecutorService. Det giver mulighed for at udføre en opgave baseret på en timer.

ScheduledExecutorService

ScheduledExecutorService(som er en type ExecutorService) lader os køre opgaver efter en tidsplan. Lad os se på et eksempel:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Alt er enkelt her. Opgaverne afleveres og så får vi en java.util.concurrent.ScheduledFuture. En tidsplan kan også være nyttig i følgende situation:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Her sender vi en Runnableopgave til udførelse med en fast frekvens ("FixedRate") med en vis indledende forsinkelse. I dette tilfælde vil opgaven efter 1 sekund begynde at blive udført hvert 2. sekund. Der er en lignende mulighed:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Men i dette tilfælde udføres opgaverne med et bestemt interval MELLEM hver udførelse. Det vil sige, taskvil blive udført efter 1 sekund. Så snart det er afsluttet, går der 2 sekunder, og så starter en ny opgave. Her er nogle yderligere ressourcer om dette emne: Bedre sammen: Java og Tråd-klassen.  Del V — Udfører, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Ud over ovenstående trådpuljer er der en mere. Vi kan ærligt sige, at det er lidt specielt. Det kaldes en pulje til at stjæle arbejde. Kort sagt er arbejdstyveri en algoritme, hvor inaktive tråde begynder at tage opgaver fra andre tråde eller opgaver fra en delt kø. Lad os se på et eksempel:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Hvis vi kører denne kode, vil den ExecutorServiceoprette 5 tråde for os, fordi hver tråd vil blive sat i ventekøen til låseobjektet. Vi har allerede fundet ud af skærme og låse i Better sammen: Java og Thread-klassen. Del II — Synkronisering . Lad os nu erstatte Executors.newCachedThreadPool()med Executors.newWorkStealingPool(). Hvad vil ændre sig? Vi vil se, at vores opgaver udføres på færre end 5 tråde. Husk, at der CachedThreadPoolopretter en tråd for hver opgave? Det skyldes, at wait()tråden er blokeret, efterfølgende opgaver ønsker at blive udført, og der blev oprettet nye tråde til dem i puljen. Med en stjælende pool står tråde ikke uvirksomme for evigt. De begynder at udføre deres naboers opgaver. Hvad gør en WorkStealingPoolså forskellig fra andre trådpuljer? Det faktum, at det magiskeForkJoinPoolbor inde i det:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Faktisk er der endnu en forskel. Som standard er de tråde, der er oprettet til en, ForkJoinPooldæmontråde, i modsætning til de tråde, der er oprettet gennem en almindelig ThreadPool. Generelt bør du huske dæmontråde, fordi du for eksempel CompletableFutureogså bruger dæmontråde, medmindre du angiver din egen ThreadFactory, der opretter ikke-dæmontråde. Det er de overraskelser, der kan gemme sig uventede steder! :)

ForkJoinPool

I denne del vil vi igen tale om ForkJoinPool(også kaldet gaffel/sammenføjningsramme), som lever "under motorhjelmen" på WorkStealingPool. Generelt dukkede fork/join-rammen op tilbage i Java 1.7. Og selvom Java 11 er lige ved hånden, er det stadig værd at huske. Dette er ikke den mest almindelige implementering, men den er ret interessant. Der er en god anmeldelse om dette på nettet: Forstå Java Fork-Join Framework med eksempler . Den ForkJoinPooler afhængig af java.util.concurrent.RecursiveTask. Der er også java.util.concurrent.RecursiveAction. RecursiveActionreturnerer ikke et resultat. Således RecursiveTaskligner Callable, og RecursiveActionligner unnable. Vi kan se, at navnet omfatter navnene på to vigtige metoder: forkog join. Detforkmetode starter en opgave asynkront på en separat tråd. Og joinmetoden lader dig vente på, at arbejdet skal udføres. For at få den bedste forståelse bør du læse Fra imperativ programmering til Fork/Join til Parallel Streams i Java 8 .

Resumé

Nå, det afslutter denne del af anmeldelsen. Vi har erfaret, at det Executoroprindeligt blev opfundet til at udføre tråde. Så besluttede Javas skabere at fortsætte ideen og kom op med ExecutorService. ExecutorServicelader os sende opgaver til udførelse ved hjælp af submit()og invoke(), og også lukke tjenesten ned. Fordi ExecutorServicebehov for implementeringer, skrev de en klasse med fabriksmetoder og kaldte den Executors. Det lader dig oprette trådpuljer ( ThreadPoolExecutor). Derudover er der trådpuljer, som også giver os mulighed for at angive en udførelsesplan. Og a ForkJoinPoolgemmer sig bag en WorkStealingPool. Jeg håber, at du fandt det, jeg skrev ovenfor, ikke kun interessant, men også forståeligt :) Jeg er altid glad for at høre dine forslag og kommentarer. Bedre sammen: Java og Tråd-klassen. Del I — Udførelsestråde Bedre sammen: Java og trådklassen. Del II — Synkronisering Bedre sammen: Java og Thread-klassen. Del III — Interaktion Bedre sammen: Java og Thread-klassen. Del IV — Callable, Future og friends Bedre sammen: Java og Thread-klassen. Del VI - Fyr væk!
Kommentarer
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION