Introduktion
Så vi vet att Java har trådar. Det kan du läsa om i recensionen Better together: Java and the Thread class. Del I — Avrättningstrådar .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Som du kan se är koden för att starta en uppgift ganska typisk, men vi måste upprepa den för ny uppgift. En lösning är att lägga det i en separat metod, execute(Runnable runnable)
t.ex. Men Javas skapare har övervägt vår svåra situation och kommit fram till gränssnittet Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Den här koden är helt klart mer koncis: nu skriver vi helt enkelt kod för att starta Runnable
på tråden. Det är bra, eller hur? Men detta är bara början: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
har gränssnittet ett ExecutorService
undergränssnitt. Javadoc för detta gränssnitt säger att en ExecutorService
beskriver en speciell Executor
som tillhandahåller metoder för att stänga av Executor
. Det gör det också möjligt att få en java.util.concurrent.Future
för att spåra exekveringsprocessen. Tidigare i Better together: Java and the Thread-klassen. Del IV – Callable, Future och friends , vi granskade kort kapaciteten hos Future
. Om du har glömt eller aldrig läst den, föreslår jag att du fräschar upp minnet ;) Vad säger Javadoc? Det berättar för oss att vi har en speciell java.util.concurrent.Executors
fabrik som låter oss skapa standardimplementationer av ExecutorService
.
ExecutorService
Låt oss gå igenom. Vi måsteExecutor
utföra (dvs. kalla execute()
på) en viss uppgift på en tråd, och koden som skapar tråden är dold för oss. Vi har ExecutorService
— en specifik Executor
som har flera alternativ för att kontrollera framsteg. Och vi har Executors
fabriken som låter oss skapa en ExecutorService
. Låt oss nu göra det själva:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Du kan se att vi angett en fast trådpool vars storlek är 2. Sedan lämnar vi in uppgifter till poolen en efter en. Varje uppgift returnerar ett String
innehållande trådnamnet ( currentThread().GetName()
). Det är viktigt att stänga av ExecutorService
i slutet, för annars kommer inte vårt program att ta slut. Fabriken Executors
har ytterligare fabriksmetoder. Till exempel kan vi skapa en pool som bara består av en tråd ( newSingleThreadExecutor
) eller en pool som innehåller en cache ( newCachedThreadPool
) från vilken trådar tas bort efter att de varit inaktiva i 1 minut. ExecutorService
I verkligheten backas dessa upp av en blockeringskö , i vilken uppgifter placeras och från vilka uppgifter exekveras. Mer information om att blockera köer finns i den här videon . Du kan också läsa dettarecension om BlockingQueue . Och kolla in svaret på frågan "När ska man föredra LinkedBlockingQueue framför ArrayBlockingQueue?" I de enklaste termerna BlockingQueue
blockerar a en tråd i två fall:
- tråden försöker hämta objekt från en tom kö
- tråden försöker placera objekt i en full kö
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
eller
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Som vi kan se ExecutorService
skapas implementeringar av inuti fabriksmetoderna. Och för det mesta pratar vi om ThreadPoolExecutor
. Endast de parametrar som påverkar arbetet ändras. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Som vi såg tidigare,ThreadPoolExecutor
är det som vanligtvis skapas inuti fabriksmetoderna. Funktionaliteten påverkas av de argument vi skickar som maximalt och minsta antal trådar, samt vilken typ av kö som används. Men vilken implementering av java.util.concurrent.BlockingQueue
gränssnittet som helst kan användas. På tal om ThreadPoolExecutor
bör vi nämna några intressanta funktioner. Till exempel kan du inte skicka uppgifter till en ThreadPoolExecutor
om det inte finns något tillgängligt utrymme:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Den här koden kommer att krascha med ett fel som detta:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Med andra ord, task
kan inte skickas, eftersom SynchronousQueue
den är utformad så att den faktiskt består av ett enda element och inte tillåter oss att lägga något mer i det. Vi kan se att vi har noll queued tasks
("köade uppgifter = 0") här. Men det är inget konstigt med detta, eftersom det här är en speciell egenskap hos , SynchronousQueue
som i själva verket är en 1-elementskö som alltid är tom! När en tråd sätter ett element i kön kommer den att vänta tills en annan tråd tar elementet från kön. Följaktligen kan vi ersätta den med new LinkedBlockingQueue<>(1)
och felet kommer att ändras till att nu visa queued tasks = 1
. Eftersom kön bara är ett element kan vi inte lägga till ett andra element. Och det är det som gör att programmet misslyckas. För att fortsätta vår diskussion om kö, är det värt att notera attThreadPoolExecutor
class har ytterligare metoder för att betjäna kön. Till exempel threadPoolExecutor.purge()
kommer metoden att ta bort alla avbrutna uppgifter från kön för att frigöra utrymme i kön. En annan intressant körelaterad funktion är hanteraren för avvisade uppgifter:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
I det här exemplet visar vår hanterare helt enkelt Rejected
varje gång en uppgift i kön avvisas. Bekvämt, eller hur? Har dessutom ThreadPoolExecutor
en intressant underklass: , ScheduledThreadPoolExecutor
som är en ScheduledExecutorService
. Det ger möjlighet att utföra en uppgift baserat på en timer.
ScheduledExecutorService
ScheduledExecutorService
(vilket är en typ av ExecutorService
) låter oss köra uppgifter enligt ett schema. Låt oss titta på ett exempel:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Allt är enkelt här. Uppgifterna lämnas in och sedan får vi en java.util.concurrent.ScheduledFuture
. Ett schema kan också vara till hjälp i följande situation:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Här skickar vi en Runnable
uppgift för exekvering med en fast frekvens ("FixedRate") med en viss initial fördröjning. I det här fallet, efter 1 sekund, kommer uppgiften att börja utföras varannan sekund. Det finns ett liknande alternativ:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Men i det här fallet utförs uppgifterna med ett specifikt intervall MELLAN varje exekvering. Det vill säga task
kommer att exekveras efter 1 sekund. Sedan, så snart den är klar, kommer 2 sekunder att passera, och sedan kommer en ny uppgift att startas. Här är några ytterligare resurser om detta ämne:
- En introduktion till trådpooler i Java
- Introduktion till trådpooler i Java
- Java Multithreading Steeplechase: Avbryta uppgifter i exekutörer
- Använda Java Executors för bakgrundsuppgifter

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
Utöver ovanstående trådpooler finns det en till. Vi kan ärligt säga att det är lite speciellt. Det kallas en pool för att stjäla arbete. Kort sagt är arbetsstöld en algoritm där lediga trådar börjar ta uppgifter från andra trådar eller uppgifter från en delad kö. Låt oss titta på ett exempel:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Om vi kör den här koden ExecutorService
kommer den att skapa 5 trådar åt oss, eftersom varje tråd kommer att läggas i väntekön för låsobjektet. Vi har redan listat ut monitorer och låser i Better tillsammans: Java och klassen Thread. Del II — Synkronisering . Låt oss nu ersätta Executors.newCachedThreadPool()
med Executors.newWorkStealingPool()
. Vad kommer att förändras? Vi kommer att se att våra uppgifter utförs på färre än 5 trådar. Kommer du ihåg att det CachedThreadPool
skapar en tråd för varje uppgift? Det beror på att wait()
tråden blockerades, efterföljande uppgifter vill slutföras och nya trådar skapades för dem i poolen. Med en stjälande pool står trådarna inte overksamma för alltid. De börjar utföra sina grannars uppgifter. Vad skiljer en WorkStealingPool
så mycket från andra trådpooler? Det faktum att det magiskaForkJoinPool
bor inuti det:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Egentligen finns det ytterligare en skillnad. Som standard är trådarna som skapats för en ForkJoinPool
demon-trådar, till skillnad från de trådar som skapas genom en vanlig ThreadPool
. I allmänhet bör du komma ihåg demon-trådar, eftersom du till exempel CompletableFuture
också använder demon-trådar om du inte anger din egen ThreadFactory
som skapar icke-demon-trådar. Det här är överraskningarna som kan gömma sig på oväntade ställen! :)
ForkJoinPool
I den här delen kommer vi återigen att prata omForkJoinPool
(även kallat gaffel/join-ramverket), som bor "under huven" på WorkStealingPool
. I allmänhet dök gaffel/join-ramverket upp i Java 1.7. Och även om Java 11 ligger nära till hands är det ändå värt att komma ihåg. Detta är inte den vanligaste implementeringen, men det är ganska intressant. Det finns en bra recension om detta på webben: Förstå Java Fork-Join Framework med exempel . Den ForkJoinPool
förlitar sig på java.util.concurrent.RecursiveTask
. Det finns också java.util.concurrent.RecursiveAction
. RecursiveAction
ger inget resultat. Således RecursiveTask
liknar Callable
, och RecursiveAction
liknar unnable
. Vi kan se att namnet inkluderar namnen på två viktiga metoder: fork
och join
. Defork
metoden startar någon uppgift asynkront på en separat tråd. Och join
metoden låter dig vänta på att arbetet ska göras. För att få bästa möjliga förståelse bör du läsa From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .
Sammanfattning
Tja, det avslutar den här delen av recensionen. Vi har lärt oss att detExecutor
ursprungligen uppfanns för att köra trådar. Då bestämde sig Javas skapare för att fortsätta idén och kom på ExecutorService
. ExecutorService
låter oss skicka uppgifter för exekvering med hjälp av submit()
och invoke()
, och även stänga av tjänsten. Eftersom ExecutorService
behöver implementeringar skrev de en klass med fabriksmetoder och kallade den Executors
. Det låter dig skapa trådpooler ( ThreadPoolExecutor
). Dessutom finns det trådpooler som också låter oss specificera ett körschema. Och a ForkJoinPool
gömmer sig bakom en WorkStealingPool
. Jag hoppas att du fann det jag skrev ovan inte bara intressant, utan också förståeligt :) Jag är alltid glad att höra dina förslag och kommentarer. Bättre tillsammans: Java och trådklassen. Del I — Trådar av utförande Bättre tillsammans: Java och klassen Thread. Del II — Synkronisering Bättre tillsammans: Java och klassen Thread. Del III — Interaktion Bättre tillsammans: Java och klassen Thread. Del IV — Callable, Future och friends Bättre tillsammans: Java och Thread-klassen. Del VI — Skjut loss!
GO TO FULL VERSION