CodeGym/Java-blogg/Tilfeldig/Bedre sammen: Java og Thread-klassen. Del V — Utfører, Th...
John Squirrels
Nivå
San Francisco

Bedre sammen: Java og Thread-klassen. Del V — Utfører, ThreadPool, Fork/Join

Publisert i gruppen

Introduksjon

Så vi vet at Java har tråder. Det kan du lese om i anmeldelsen Bedre sammen: Java og trådklassen. Del I – Tråder om henrettelse . Bedre sammen: Java og Thread-klassen.  Del V — Utfører, trådpool, gaffel/skjøt - 1La oss ta en ny titt på den typiske koden:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Som du kan se, er koden for å starte en oppgave ganske typisk, men vi må gjenta den for ny oppgave. En løsning er å sette det i en egen metode, f.eks execute(Runnable runnable). Men Javas skapere har vurdert vår situasjon og kommet opp med grensesnittet Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Denne koden er helt klart mer kortfattet: nå skriver vi bare kode for å starte Runnablepå tråden. Det er flott, ikke sant? Men dette er bare begynnelsen: Bedre sammen: Java og Thread-klassen.  Del V — Utfører, trådpool, gaffel/skjøt - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Som du kan se, Executorhar grensesnittet et ExecutorServiceundergrensesnitt. Javadoc for dette grensesnittet sier at en ExecutorServicebeskriver en bestemt Executorsom gir metoder for å slå av Executor. Det gjør det også mulig å få en java.util.concurrent.Futurefor å spore utførelsesprosessen. Tidligere i Better together: Java and the Thread-klassen. Del IV – Callable, Future, and friends , vi gjennomgikk kort mulighetene til Future. Hvis du har glemt eller aldri har lest det, foreslår jeg at du frisker opp hukommelsen ;) Hva mer sier Javadoc? Den forteller oss at vi har en spesiell java.util.concurrent.Executorsfabrikk som lar oss lage standardimplementeringer av ExecutorService.

ExecutorService

La oss vurdere. Vi må Executorutføre (dvs. kalle execute()på) en bestemt oppgave på en tråd, og koden som lager tråden er skjult for oss. Vi har ExecutorService- en spesifikk Executorsom har flere alternativer for å kontrollere fremdriften. Og vi har Executorsfabrikken som lar oss lage en ExecutorService. La oss nå gjøre det selv:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Du kan se at vi spesifiserte en fast trådpool hvis størrelse er 2. Deretter sender vi inn oppgaver til bassenget en etter en. Hver oppgave returnerer en Stringsom inneholder trådnavnet ( currentThread().GetName()). Det er viktig å stenge ExecutorServicehelt på slutten, for ellers vil ikke programmet ta slutt. Fabrikken Executorshar flere fabrikkmetoder. For eksempel kan vi lage et basseng som bare består av én tråd ( newSingleThreadExecutor) eller et basseng som inkluderer en cache ( newCachedThreadPool) som tråder fjernes fra etter at de har vært inaktive i 1 minutt. I virkeligheten er disse ExecutorServicestøttet av en blokkeringskø , hvor oppgaver plasseres og hvorfra oppgaver utføres. Mer informasjon om blokkering av køer finner du i denne videoen . Du kan også lese detteanmeldelse om BlockingQueue . Og sjekk ut svaret på spørsmålet "Når foretrekker LinkedBlockingQueue fremfor ArrayBlockingQueue?" På de enkleste vilkårene BlockingQueueblokkerer a en tråd i to tilfeller:
  • tråden prøver å hente elementer fra en tom kø
  • tråden prøver å sette elementer i en full kø
Hvis vi ser på implementeringen av fabrikkmetodene, kan vi se hvordan de fungerer. For eksempel:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
eller
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Som vi kan se, ExecutorServiceer implementeringer av opprettet inne i fabrikkmetodene. Og for det meste snakker vi om ThreadPoolExecutor. Bare parametrene som påvirker arbeidet endres. Bedre sammen: Java og Thread-klassen.  Del V – Utfører, trådpool, gaffel/skjøt – 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Som vi så tidligere, ThreadPoolExecutorer det som vanligvis lages i fabrikkmetodene. Funksjonaliteten påvirkes av argumentene vi sender som maksimum og minimum antall tråder, samt hvilken type kø som brukes. Men enhver implementering av java.util.concurrent.BlockingQueuegrensesnittet kan brukes. Når vi snakker om ThreadPoolExecutor, bør vi nevne noen interessante funksjoner. Du kan for eksempel ikke sende inn oppgaver til en ThreadPoolExecutorhvis det ikke er ledig plass:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Denne koden vil krasje med en feil som dette:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Med andre ord, taskkan ikke sendes inn, fordi SynchronousQueueden er utformet slik at den faktisk består av et enkelt element og ikke tillater oss å legge noe mer inn i det. Vi kan se at vi har null queued tasks("oppgaver i kø = 0") her. Men det er ikke noe rart med dette, for dette er en spesiell egenskap ved SynchronousQueue, som faktisk er en 1-elementskø som alltid er tom! Når en tråd setter et element i køen, vil den vente til en annen tråd tar elementet fra køen. Følgelig kan vi erstatte den med new LinkedBlockingQueue<>(1)og feilen vil endres til å vise queued tasks = 1. Fordi køen bare er 1 element, kan vi ikke legge til et andre element. Og det er det som gjør at programmet mislykkes. For å fortsette vår diskusjon om kø, er det verdt å merke seg atThreadPoolExecutorklasse har flere metoder for å betjene køen. Metoden vil for eksempel threadPoolExecutor.purge()fjerne alle kansellerte oppgaver fra køen for å frigjøre plass i køen. En annen interessant kørelatert funksjon er behandleren for avviste oppgaver:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
I dette eksemplet viser vår behandler ganske enkelt Rejectedhver gang en oppgave i køen blir avvist. Praktisk, ikke sant? I tillegg ThreadPoolExecutorhar en interessant underklasse: ScheduledThreadPoolExecutor, som er en ScheduledExecutorService. Det gir muligheten til å utføre en oppgave basert på en tidtaker.

ScheduledExecutorService

ScheduledExecutorService(som er en type ExecutorService) lar oss kjøre oppgaver etter en tidsplan. La oss se på et eksempel:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Alt er enkelt her. Oppgavene sendes inn og så får vi en java.util.concurrent.ScheduledFuture. En tidsplan kan også være nyttig i følgende situasjon:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Her sender vi en Runnableoppgave for utførelse med en fast frekvens ("FixedRate") med en viss initial forsinkelse. I dette tilfellet, etter 1 sekund, vil oppgaven begynne å utføres hvert 2. sekund. Det er et lignende alternativ:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Men i dette tilfellet utføres oppgavene med et spesifikt intervall MELLOM hver utførelse. Det vil si at taskvil bli utført etter 1 sekund. Så snart den er fullført, vil det gå 2 sekunder, og deretter startes en ny oppgave. Her er noen tilleggsressurser om dette emnet: Bedre sammen: Java og Thread-klassen.  Del V — Utfører, trådpool, gaffel/skjøt - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

I tillegg til de ovennevnte trådbassengene er det en til. Vi kan ærlig si at det er litt spesielt. Det kalles et basseng for arbeid som stjeler. Kort sagt er arbeidstyveri en algoritme der ledige tråder begynner å ta oppgaver fra andre tråder eller oppgaver fra en delt kø. La oss se på et eksempel:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Hvis vi kjører denne koden, vil den ExecutorServiceopprette 5 tråder for oss, fordi hver tråd vil bli satt i ventekøen for låsobjektet. Vi har allerede funnet ut skjermer og låser i Better sammen: Java og Thread-klassen. Del II – Synkronisering . La oss nå erstatte Executors.newCachedThreadPool()med Executors.newWorkStealingPool(). Hva vil endre seg? Vi vil se at oppgavene våre utføres på færre enn 5 tråder. Husker du at det CachedThreadPoolopprettes en tråd for hver oppgave? Det er fordi wait()tråden ble blokkert, påfølgende oppgaver ønsker å fullføres, og nye tråder ble opprettet for dem i bassenget. Med et stjelende basseng står ikke tråder uvirksomme for alltid. De begynner å utføre sine naboers oppgaver. Hva gjør en WorkStealingPoolså forskjellig fra andre trådpooler? Det faktum at det magiskeForkJoinPoolbor inni den:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Faktisk er det en forskjell til. Som standard er trådene som er opprettet for en ForkJoinPooldaemon-tråder, i motsetning til trådene som er opprettet gjennom en vanlig ThreadPool. Generelt bør du huske daemon-tråder, fordi for eksempel CompletableFutureogså bruker daemon-tråder med mindre du spesifiserer din egen ThreadFactorysom lager ikke-demon-tråder. Dette er overraskelsene som kan lure på uventede steder! :)

ForkJoinPool

I denne delen skal vi igjen snakke om ForkJoinPool(også kalt fork/join-rammeverket), som lever "under panseret" på WorkStealingPool. Generelt dukket fork/join-rammeverket opp i Java 1.7. Og selv om Java 11 er like ved hånden, er det fortsatt verdt å huske. Dette er ikke den vanligste implementeringen, men den er ganske interessant. Det er en god anmeldelse om dette på nettet: Forstå Java Fork-Join Framework med eksempler . Den ForkJoinPooler avhengig av java.util.concurrent.RecursiveTask. Det er også java.util.concurrent.RecursiveAction. RecursiveActionreturnerer ikke et resultat. Dermed RecursiveTaskligner på Callable, og RecursiveActionligner på unnable. Vi kan se at navnet inkluderer navnene på to viktige metoder: forkog join. Deforkmetoden starter en oppgave asynkront på en egen tråd. Og joinmetoden lar deg vente på at arbeidet skal gjøres. For å få best mulig forståelse bør du lese Fra imperativ programmering til gaffel/join til parallelle strømmer i Java 8 .

Sammendrag

Vel, det avslutter denne delen av anmeldelsen. Vi har lært at det Executoropprinnelig ble oppfunnet for å utføre tråder. Så bestemte Javas skapere å fortsette ideen og kom opp med ExecutorService. ExecutorServicelar oss sende inn oppgaver for utførelse ved hjelp av submit()og invoke(), og også stenge ned tjenesten. Fordi ExecutorServicetrenger implementeringer, skrev de en klasse med fabrikkmetoder og kalte den Executors. Den lar deg lage trådpooler ( ThreadPoolExecutor). I tillegg er det trådpooler som også lar oss spesifisere en utførelsesplan. Og a ForkJoinPoolgjemmer seg bak en WorkStealingPool. Jeg håper du fant det jeg skrev ovenfor ikke bare interessant, men også forståelig :) Jeg er alltid glad for å høre dine forslag og kommentarer. Bedre sammen: Java og Thread-klassen. Del I — Tråder av utførelse Bedre sammen: Java og trådklassen. Del II — Synkronisering Bedre sammen: Java og Thread-klassen. Del III — Interaksjon Bedre sammen: Java og Thread-klassen. Del IV — Callable, Future og friends Bedre sammen: Java og Thread-klassen. Del VI – Fyr vekk!
Kommentarer
  • Populær
  • Ny
  • Gammel
Du må være pålogget for å legge igjen en kommentar
Denne siden har ingen kommentarer ennå