Introduksjon
Så vi vet at Java har tråder. Det kan du lese om i anmeldelsen Bedre sammen: Java og trådklassen. Del I – Tråder om henrettelse . La oss ta en ny titt på den typiske koden:public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Som du kan se, er koden for å starte en oppgave ganske typisk, men vi må gjenta den for ny oppgave. En løsning er å sette det i en egen metode, f.eks execute(Runnable runnable)
. Men Javas skapere har vurdert vår situasjon og kommet opp med grensesnittet Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Denne koden er helt klart mer kortfattet: nå skriver vi bare kode for å starte Runnable
på tråden. Det er flott, ikke sant? Men dette er bare begynnelsen:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
har grensesnittet et ExecutorService
undergrensesnitt. Javadoc for dette grensesnittet sier at en ExecutorService
beskriver en bestemt Executor
som gir metoder for å slå av Executor
. Det gjør det også mulig å få en java.util.concurrent.Future
for å spore utførelsesprosessen. Tidligere i Better together: Java and the Thread-klassen. Del IV – Callable, Future, and friends , vi gjennomgikk kort mulighetene til Future
. Hvis du har glemt eller aldri har lest det, foreslår jeg at du frisker opp hukommelsen ;) Hva mer sier Javadoc? Den forteller oss at vi har en spesiell java.util.concurrent.Executors
fabrikk som lar oss lage standardimplementeringer av ExecutorService
.
ExecutorService
La oss vurdere. Vi måExecutor
utføre (dvs. kalle execute()
på) en bestemt oppgave på en tråd, og koden som lager tråden er skjult for oss. Vi har ExecutorService
- en spesifikk Executor
som har flere alternativer for å kontrollere fremdriften. Og vi har Executors
fabrikken som lar oss lage en ExecutorService
. La oss nå gjøre det selv:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Du kan se at vi spesifiserte en fast trådpool hvis størrelse er 2. Deretter sender vi inn oppgaver til bassenget en etter en. Hver oppgave returnerer en String
som inneholder trådnavnet ( currentThread().GetName()
). Det er viktig å stenge ExecutorService
helt på slutten, for ellers vil ikke programmet ta slutt. Fabrikken Executors
har flere fabrikkmetoder. For eksempel kan vi lage et basseng som bare består av én tråd ( newSingleThreadExecutor
) eller et basseng som inkluderer en cache ( newCachedThreadPool
) som tråder fjernes fra etter at de har vært inaktive i 1 minutt. I virkeligheten er disse ExecutorService
støttet av en blokkeringskø , hvor oppgaver plasseres og hvorfra oppgaver utføres. Mer informasjon om blokkering av køer finner du i denne videoen . Du kan også lese detteanmeldelse om BlockingQueue . Og sjekk ut svaret på spørsmålet "Når foretrekker LinkedBlockingQueue fremfor ArrayBlockingQueue?" På de enkleste vilkårene BlockingQueue
blokkerer a en tråd i to tilfeller:
- tråden prøver å hente elementer fra en tom kø
- tråden prøver å sette elementer i en full kø
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
eller
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Som vi kan se, ExecutorService
er implementeringer av opprettet inne i fabrikkmetodene. Og for det meste snakker vi om ThreadPoolExecutor
. Bare parametrene som påvirker arbeidet endres.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Som vi så tidligere,ThreadPoolExecutor
er det som vanligvis lages i fabrikkmetodene. Funksjonaliteten påvirkes av argumentene vi sender som maksimum og minimum antall tråder, samt hvilken type kø som brukes. Men enhver implementering av java.util.concurrent.BlockingQueue
grensesnittet kan brukes. Når vi snakker om ThreadPoolExecutor
, bør vi nevne noen interessante funksjoner. Du kan for eksempel ikke sende inn oppgaver til en ThreadPoolExecutor
hvis det ikke er ledig plass:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Denne koden vil krasje med en feil som dette:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Med andre ord, task
kan ikke sendes inn, fordi SynchronousQueue
den er utformet slik at den faktisk består av et enkelt element og ikke tillater oss å legge noe mer inn i det. Vi kan se at vi har null queued tasks
("oppgaver i kø = 0") her. Men det er ikke noe rart med dette, for dette er en spesiell egenskap ved SynchronousQueue
, som faktisk er en 1-elementskø som alltid er tom! Når en tråd setter et element i køen, vil den vente til en annen tråd tar elementet fra køen. Følgelig kan vi erstatte den med new LinkedBlockingQueue<>(1)
og feilen vil endres til å vise queued tasks = 1
. Fordi køen bare er 1 element, kan vi ikke legge til et andre element. Og det er det som gjør at programmet mislykkes. For å fortsette vår diskusjon om kø, er det verdt å merke seg atThreadPoolExecutor
klasse har flere metoder for å betjene køen. Metoden vil for eksempel threadPoolExecutor.purge()
fjerne alle kansellerte oppgaver fra køen for å frigjøre plass i køen. En annen interessant kørelatert funksjon er behandleren for avviste oppgaver:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
I dette eksemplet viser vår behandler ganske enkelt Rejected
hver gang en oppgave i køen blir avvist. Praktisk, ikke sant? I tillegg ThreadPoolExecutor
har en interessant underklasse: ScheduledThreadPoolExecutor
, som er en ScheduledExecutorService
. Det gir muligheten til å utføre en oppgave basert på en tidtaker.
ScheduledExecutorService
ScheduledExecutorService
(som er en type ExecutorService
) lar oss kjøre oppgaver etter en tidsplan. La oss se på et eksempel:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Alt er enkelt her. Oppgavene sendes inn og så får vi en java.util.concurrent.ScheduledFuture
. En tidsplan kan også være nyttig i følgende situasjon:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Her sender vi en Runnable
oppgave for utførelse med en fast frekvens ("FixedRate") med en viss initial forsinkelse. I dette tilfellet, etter 1 sekund, vil oppgaven begynne å utføres hvert 2. sekund. Det er et lignende alternativ:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Men i dette tilfellet utføres oppgavene med et spesifikt intervall MELLOM hver utførelse. Det vil si at task
vil bli utført etter 1 sekund. Så snart den er fullført, vil det gå 2 sekunder, og deretter startes en ny oppgave. Her er noen tilleggsressurser om dette emnet:
- En introduksjon til trådpooler i Java
- Introduksjon til trådpooler i Java
- Java Multithreading Steeplechase: Kansellering av oppgaver i utførere
- Bruke Java Executors for bakgrunnsoppgaver
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
I tillegg til de ovennevnte trådbassengene er det en til. Vi kan ærlig si at det er litt spesielt. Det kalles et basseng for arbeid som stjeler. Kort sagt er arbeidstyveri en algoritme der ledige tråder begynner å ta oppgaver fra andre tråder eller oppgaver fra en delt kø. La oss se på et eksempel:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Hvis vi kjører denne koden, vil den ExecutorService
opprette 5 tråder for oss, fordi hver tråd vil bli satt i ventekøen for låsobjektet. Vi har allerede funnet ut skjermer og låser i Better sammen: Java og Thread-klassen. Del II – Synkronisering . La oss nå erstatte Executors.newCachedThreadPool()
med Executors.newWorkStealingPool()
. Hva vil endre seg? Vi vil se at oppgavene våre utføres på færre enn 5 tråder. Husker du at det CachedThreadPool
opprettes en tråd for hver oppgave? Det er fordi wait()
tråden ble blokkert, påfølgende oppgaver ønsker å fullføres, og nye tråder ble opprettet for dem i bassenget. Med et stjelende basseng står ikke tråder uvirksomme for alltid. De begynner å utføre sine naboers oppgaver. Hva gjør en WorkStealingPool
så forskjellig fra andre trådpooler? Det faktum at det magiskeForkJoinPool
bor inni den:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Faktisk er det en forskjell til. Som standard er trådene som er opprettet for en ForkJoinPool
daemon-tråder, i motsetning til trådene som er opprettet gjennom en vanlig ThreadPool
. Generelt bør du huske daemon-tråder, fordi for eksempel CompletableFuture
også bruker daemon-tråder med mindre du spesifiserer din egen ThreadFactory
som lager ikke-demon-tråder. Dette er overraskelsene som kan lure på uventede steder! :)
ForkJoinPool
I denne delen skal vi igjen snakke omForkJoinPool
(også kalt fork/join-rammeverket), som lever "under panseret" på WorkStealingPool
. Generelt dukket fork/join-rammeverket opp i Java 1.7. Og selv om Java 11 er like ved hånden, er det fortsatt verdt å huske. Dette er ikke den vanligste implementeringen, men den er ganske interessant. Det er en god anmeldelse om dette på nettet: Forstå Java Fork-Join Framework med eksempler . Den ForkJoinPool
er avhengig av java.util.concurrent.RecursiveTask
. Det er også java.util.concurrent.RecursiveAction
. RecursiveAction
returnerer ikke et resultat. Dermed RecursiveTask
ligner på Callable
, og RecursiveAction
ligner på unnable
. Vi kan se at navnet inkluderer navnene på to viktige metoder: fork
og join
. Defork
metoden starter en oppgave asynkront på en egen tråd. Og join
metoden lar deg vente på at arbeidet skal gjøres. For å få best mulig forståelse bør du lese Fra imperativ programmering til gaffel/join til parallelle strømmer i Java 8 .
Sammendrag
Vel, det avslutter denne delen av anmeldelsen. Vi har lært at detExecutor
opprinnelig ble oppfunnet for å utføre tråder. Så bestemte Javas skapere å fortsette ideen og kom opp med ExecutorService
. ExecutorService
lar oss sende inn oppgaver for utførelse ved hjelp av submit()
og invoke()
, og også stenge ned tjenesten. Fordi ExecutorService
trenger implementeringer, skrev de en klasse med fabrikkmetoder og kalte den Executors
. Den lar deg lage trådpooler ( ThreadPoolExecutor
). I tillegg er det trådpooler som også lar oss spesifisere en utførelsesplan. Og a ForkJoinPool
gjemmer seg bak en WorkStealingPool
. Jeg håper du fant det jeg skrev ovenfor ikke bare interessant, men også forståelig :) Jeg er alltid glad for å høre dine forslag og kommentarer. Bedre sammen: Java og Thread-klassen. Del I — Tråder av utførelse Bedre sammen: Java og trådklassen. Del II — Synkronisering Bedre sammen: Java og Thread-klassen. Del III — Interaksjon Bedre sammen: Java og Thread-klassen. Del IV — Callable, Future og friends Bedre sammen: Java og Thread-klassen. Del VI – Fyr vekk!