CodeGym/Java blogg/Slumpmässig/Bättre tillsammans: Java och trådklassen. Del V — Executo...
John Squirrels
Nivå
San Francisco

Bättre tillsammans: Java och trådklassen. Del V — Executor, ThreadPool, Fork/Join

Publicerad i gruppen

Introduktion

Så vi vet att Java har trådar. Det kan du läsa om i recensionen Better together: Java and the Thread class. Del I — Avrättningstrådar . Bättre tillsammans: Java och trådklassen.  Del V — Executor, ThreadPool, Fork/Join - 1Låt oss ta en ny titt på den typiska koden:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Som du kan se är koden för att starta en uppgift ganska typisk, men vi måste upprepa den för ny uppgift. En lösning är att lägga det i en separat metod, execute(Runnable runnable)t.ex. Men Javas skapare har övervägt vår svåra situation och kommit fram till gränssnittet Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Den här koden är helt klart mer koncis: nu skriver vi helt enkelt kod för att starta Runnablepå tråden. Det är bra, eller hur? Men detta är bara början: Bättre tillsammans: Java och trådklassen.  Del V — Executor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Som du kan se Executorhar gränssnittet ett ExecutorServiceundergränssnitt. Javadoc för detta gränssnitt säger att en ExecutorServicebeskriver en speciell Executorsom tillhandahåller metoder för att stänga av Executor. Det gör det också möjligt att få en java.util.concurrent.Futureför att spåra exekveringsprocessen. Tidigare i Better together: Java and the Thread-klassen. Del IV – Callable, Future och friends , vi granskade kort kapaciteten hos Future. Om du har glömt eller aldrig läst den, föreslår jag att du fräschar upp minnet ;) Vad säger Javadoc? Det berättar för oss att vi har en speciell java.util.concurrent.Executorsfabrik som låter oss skapa standardimplementationer av ExecutorService.

ExecutorService

Låt oss gå igenom. Vi måste Executorutföra (dvs. kalla execute()på) en viss uppgift på en tråd, och koden som skapar tråden är dold för oss. Vi har ExecutorService— en specifik Executorsom har flera alternativ för att kontrollera framsteg. Och vi har Executorsfabriken som låter oss skapa en ExecutorService. Låt oss nu göra det själva:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Du kan se att vi angett en fast trådpool vars storlek är 2. Sedan lämnar vi in ​​uppgifter till poolen en efter en. Varje uppgift returnerar ett Stringinnehållande trådnamnet ( currentThread().GetName()). Det är viktigt att stänga av ExecutorServicei slutet, för annars kommer inte vårt program att ta slut. Fabriken Executorshar ytterligare fabriksmetoder. Till exempel kan vi skapa en pool som bara består av en tråd ( newSingleThreadExecutor) eller en pool som innehåller en cache ( newCachedThreadPool) från vilken trådar tas bort efter att de varit inaktiva i 1 minut. ExecutorServiceI verkligheten backas dessa upp av en blockeringskö , i vilken uppgifter placeras och från vilka uppgifter exekveras. Mer information om att blockera köer finns i den här videon . Du kan också läsa dettarecension om BlockingQueue . Och kolla in svaret på frågan "När ska man föredra LinkedBlockingQueue framför ArrayBlockingQueue?" I de enklaste termerna BlockingQueueblockerar a en tråd i två fall:
  • tråden försöker hämta objekt från en tom kö
  • tråden försöker placera objekt i en full kö
Om vi ​​tittar på implementeringen av fabriksmetoderna kan vi se hur de fungerar. Till exempel:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
eller
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Som vi kan se ExecutorServiceskapas implementeringar av inuti fabriksmetoderna. Och för det mesta pratar vi om ThreadPoolExecutor. Endast de parametrar som påverkar arbetet ändras. Bättre tillsammans: Java och trådklassen.  Del V — Executor, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Som vi såg tidigare, ThreadPoolExecutorär det som vanligtvis skapas inuti fabriksmetoderna. Funktionaliteten påverkas av de argument vi skickar som maximalt och minsta antal trådar, samt vilken typ av kö som används. Men vilken implementering av java.util.concurrent.BlockingQueuegränssnittet som helst kan användas. På tal om ThreadPoolExecutorbör vi nämna några intressanta funktioner. Till exempel kan du inte skicka uppgifter till en ThreadPoolExecutorom det inte finns något tillgängligt utrymme:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Den här koden kommer att krascha med ett fel som detta:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Med andra ord, taskkan inte skickas, eftersom SynchronousQueueden är utformad så att den faktiskt består av ett enda element och inte tillåter oss att lägga något mer i det. Vi kan se att vi har noll queued tasks("köade uppgifter = 0") här. Men det är inget konstigt med detta, eftersom det här är en speciell egenskap hos , SynchronousQueuesom i själva verket är en 1-elementskö som alltid är tom! När en tråd sätter ett element i kön kommer den att vänta tills en annan tråd tar elementet från kön. Följaktligen kan vi ersätta den med new LinkedBlockingQueue<>(1)och felet kommer att ändras till att nu visa queued tasks = 1. Eftersom kön bara är ett element kan vi inte lägga till ett andra element. Och det är det som gör att programmet misslyckas. För att fortsätta vår diskussion om kö, är det värt att notera attThreadPoolExecutorclass har ytterligare metoder för att betjäna kön. Till exempel threadPoolExecutor.purge()kommer metoden att ta bort alla avbrutna uppgifter från kön för att frigöra utrymme i kön. En annan intressant körelaterad funktion är hanteraren för avvisade uppgifter:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
I det här exemplet visar vår hanterare helt enkelt Rejectedvarje gång en uppgift i kön avvisas. Bekvämt, eller hur? Har dessutom ThreadPoolExecutoren intressant underklass: , ScheduledThreadPoolExecutorsom är en ScheduledExecutorService. Det ger möjlighet att utföra en uppgift baserat på en timer.

ScheduledExecutorService

ScheduledExecutorService(vilket är en typ av ExecutorService) låter oss köra uppgifter enligt ett schema. Låt oss titta på ett exempel:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Allt är enkelt här. Uppgifterna lämnas in och sedan får vi en java.util.concurrent.ScheduledFuture. Ett schema kan också vara till hjälp i följande situation:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Här skickar vi en Runnableuppgift för exekvering med en fast frekvens ("FixedRate") med en viss initial fördröjning. I det här fallet, efter 1 sekund, kommer uppgiften att börja utföras varannan sekund. Det finns ett liknande alternativ:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Men i det här fallet utförs uppgifterna med ett specifikt intervall MELLAN varje exekvering. Det vill säga taskkommer att exekveras efter 1 sekund. Sedan, så snart den är klar, kommer 2 sekunder att passera, och sedan kommer en ny uppgift att startas. Här är några ytterligare resurser om detta ämne: Bättre tillsammans: Java och trådklassen.  Del V — Executor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Utöver ovanstående trådpooler finns det en till. Vi kan ärligt säga att det är lite speciellt. Det kallas en pool för att stjäla arbete. Kort sagt är arbetsstöld en algoritm där lediga trådar börjar ta uppgifter från andra trådar eller uppgifter från en delad kö. Låt oss titta på ett exempel:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Om vi ​​kör den här koden ExecutorServicekommer den att skapa 5 trådar åt oss, eftersom varje tråd kommer att läggas i väntekön för låsobjektet. Vi har redan listat ut monitorer och låser i Better tillsammans: Java och klassen Thread. Del II — Synkronisering . Låt oss nu ersätta Executors.newCachedThreadPool()med Executors.newWorkStealingPool(). Vad kommer att förändras? Vi kommer att se att våra uppgifter utförs på färre än 5 trådar. Kommer du ihåg att det CachedThreadPoolskapar en tråd för varje uppgift? Det beror på att wait()tråden blockerades, efterföljande uppgifter vill slutföras och nya trådar skapades för dem i poolen. Med en stjälande pool står trådarna inte overksamma för alltid. De börjar utföra sina grannars uppgifter. Vad skiljer en WorkStealingPoolså mycket från andra trådpooler? Det faktum att det magiskaForkJoinPoolbor inuti det:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Egentligen finns det ytterligare en skillnad. Som standard är trådarna som skapats för en ForkJoinPooldemon-trådar, till skillnad från de trådar som skapas genom en vanlig ThreadPool. I allmänhet bör du komma ihåg demon-trådar, eftersom du till exempel CompletableFutureockså använder demon-trådar om du inte anger din egen ThreadFactorysom skapar icke-demon-trådar. Det här är överraskningarna som kan gömma sig på oväntade ställen! :)

ForkJoinPool

I den här delen kommer vi återigen att prata om ForkJoinPool(även kallat gaffel/join-ramverket), som bor "under huven" på WorkStealingPool. I allmänhet dök gaffel/join-ramverket upp i Java 1.7. Och även om Java 11 ligger nära till hands är det ändå värt att komma ihåg. Detta är inte den vanligaste implementeringen, men det är ganska intressant. Det finns en bra recension om detta på webben: Förstå Java Fork-Join Framework med exempel . Den ForkJoinPoolförlitar sig på java.util.concurrent.RecursiveTask. Det finns också java.util.concurrent.RecursiveAction. RecursiveActionger inget resultat. Således RecursiveTaskliknar Callable, och RecursiveActionliknar unnable. Vi kan se att namnet inkluderar namnen på två viktiga metoder: forkoch join. Deforkmetoden startar någon uppgift asynkront på en separat tråd. Och joinmetoden låter dig vänta på att arbetet ska göras. För att få bästa möjliga förståelse bör du läsa From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .

Sammanfattning

Tja, det avslutar den här delen av recensionen. Vi har lärt oss att det Executorursprungligen uppfanns för att köra trådar. Då bestämde sig Javas skapare för att fortsätta idén och kom på ExecutorService. ExecutorServicelåter oss skicka uppgifter för exekvering med hjälp av submit()och invoke(), och även stänga av tjänsten. Eftersom ExecutorServicebehöver implementeringar skrev de en klass med fabriksmetoder och kallade den Executors. Det låter dig skapa trådpooler ( ThreadPoolExecutor). Dessutom finns det trådpooler som också låter oss specificera ett körschema. Och a ForkJoinPoolgömmer sig bakom en WorkStealingPool. Jag hoppas att du fann det jag skrev ovan inte bara intressant, utan också förståeligt :) Jag är alltid glad att höra dina förslag och kommentarer. Bättre tillsammans: Java och trådklassen. Del I — Trådar av utförande Bättre tillsammans: Java och klassen Thread. Del II — Synkronisering Bättre tillsammans: Java och klassen Thread. Del III — Interaktion Bättre tillsammans: Java och klassen Thread. Del IV — Callable, Future och friends Bättre tillsammans: Java och Thread-klassen. Del VI — Skjut loss!
Kommentarer
  • Populär
  • Ny
  • Gammal
Du måste vara inloggad för att lämna en kommentar
Den här sidan har inga kommentarer än