Låt oss ta reda på den nyaWorkStealingPool -metoden, som förbereder en ExecutorService åt oss.

Den här trådpoolen är speciell. Dess beteende bygger på idén om att "stjäla" arbete.

Uppgifter köas och fördelas mellan processorer. Men om en processor är upptagen, kan en annan ledig processor stjäla en uppgift från den och köra den. Detta format introducerades i Java för att minska konflikter i flertrådade applikationer. Den är byggd på gaffel/fogramen .

gaffel/foga

I fork/join -ramverket bryts uppgifter upp rekursivt, det vill säga de bryts ner i deluppgifter. Sedan utförs deluppgifterna individuellt, och resultaten av deluppgifterna kombineras för att bilda resultatet av den ursprungliga uppgiften.

Fork - metoden startar en uppgift asynkront på någon tråd, och join -metoden låter dig vänta på att denna uppgift ska slutföras.

newWorkStealingPool

NewWorkStealingPool - metoden har två implementeringar:


public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
 
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Från början noterar vi att vi under huven inte kallar ThreadPoolExecutor -konstruktören. Här arbetar vi med ForkJoinPool -enheten. Liksom ThreadPoolExecutor är det en implementering av AbstractExecutorService .

Vi har 2 metoder att välja mellan. I den första anger vi själva vilken nivå av parallellitet vi vill se. Om vi ​​inte anger detta värde kommer vår pools parallellitet att vara lika med antalet processorkärnor som är tillgängliga för den virtuella Java-maskinen.

Det återstår att ta reda på hur det fungerar i praktiken:


Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);
 
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Vi skapar 10 uppgifter som visar sin egen slutförandestatus. Efter det startar vi alla uppgifter med metoden invokeAll .

Resultat när du utför 10 uppgifter på 10 trådar i poolen:

Bearbetad användarförfrågan #9 på ForkJoinPool-1-worker-10 tråd
Behandlad användarförfrågan #4 på ForkJoinPool-1-worker-5 tråd
Bearbetad användarförfrågan #7 på ForkJoinPool-1-worker-8 tråd
Bearbetad användarförfrågan #1 på ForkJoinPool- 1-arbetare-2 tråd
Bearbetad användarförfrågan #2 på ForkJoinPool-1-worker-3 tråd
Bearbetad användarförfrågan #3 på ForkJoinPool-1-arbetare-4 tråd
Bearbetad användarförfrågan #6 på ForkJoinPool-1-worker-7 tråd
Bearbetad användare
begäran nr
_

Vi ser att efter att kön har bildats tar trådarna uppgifter för exekvering. Du kan också kontrollera hur 20 uppgifter kommer att fördelas i en pool med 10 trådar.

Behandlad användarförfrågan #3 på ForkJoinPool-1-worker-4 tråd
Behandlad användarförfrågan #7 på ForkJoinPool-1-worker-8 tråd
Behandlad användarförfrågan #2 på ForkJoinPool-1-worker-3 tråd
Bearbetad användarförfrågan #4 på ForkJoinPool- 1-arbetare-5 tråd
Bearbetad användarförfrågan #1 på ForkJoinPool-1-worker-2 tråd
Bearbetad användarförfrågan #5 på ForkJoinPool-1-arbetare-6 tråd
Bearbetad användarförfrågan #8 på ForkJoinPool-1-worker-9 tråd
Bearbetad användare begäran #9 på ForkJoinPool-1-worker-10 tråd
Bearbetad användarförfrågan #0 på ForkJoinPool-1-worker-1 tråd
Bearbetad användarförfrågan #6 på ForkJoinPool-1-worker-7 tråd
Bearbetad användarförfrågan #10 på ForkJoinPool-1- arbetar-9 tråd
Behandlad användarförfrågan #12 på ForkJoinPool-1-worker-1 tråd
Behandlad användarförfrågan #13 på ForkJoinPool-1-worker-8 tråd
Behandlad användarförfrågan #11 på ForkJoinPool-1-worker-6 tråd
Bearbetad användarförfrågan #15 på ForkJoinPool- 1-arbetare-8 tråd
Bearbetad användarförfrågan #14 på ForkJoinPool-1-arbetare-1 tråd
Bearbetad användarförfrågan #17 på ForkJoinPool-1-arbetare-6 tråd
Bearbetad användarförfrågan #16 på ForkJoinPool-1-worker-7 tråd
Bearbetad användare begäran #19 på ForkJoinPool-1-worker-6 thread
Bearbetad användarförfrågan #18 på ForkJoinPool-1-worker-1 thread

Från utgången kan vi se att vissa trådar klarar av att slutföra flera uppgifter ( ForkJoinPool-1-worker-6 slutförde 4 uppgifter), medan vissa bara slutför en ( ForkJoinPool-1-worker-2 ) . Om en 1-sekunds fördröjning läggs till i implementeringen av anropsmetoden ändras bilden.


Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

För experimentets skull, låt oss köra samma kod på en annan maskin. Den resulterande utgången:

Bearbetad användarförfrågan #2 på tråden ForkJoinPool-1-worker-23
Bearbetad användarförfrågan #7 på tråden ForkJoinPool-1-worker-31
Bearbetad användarförfrågan #4 på tråden ForkJoinPool-1-worker-27 Bearbetad
användarförfrågan #5 på ForkJoinPool- 1-worker-13 tråd
Bearbetad användarförfrågan #0 på ForkJoinPool-1-worker-19 tråd
Bearbetad användarförfrågan #8 på ForkJoinPool-1-worker-3 tråd
Bearbetad användarförfrågan #9 på ForkJoinPool-1-worker-21 tråd
Bearbetad användare begäran #6 på ForkJoinPool-1-worker-17 tråd
Bearbetad användarförfrågan #3 på ForkJoinPool-1-worker-9 tråd
Bearbetad användarförfrågan #1 på ForkJoinPool-1-worker-5 tråd
Bearbetad användarförfrågan #12 på ForkJoinPool-1- arbetar-23 tråd
Bearbetad användarförfrågan #15 på tråden ForkJoinPool-1-worker-19
Bearbetad användarförfrågan #14 på tråden ForkJoinPool-1-worker-27
Bearbetad användarförfrågan #11 på tråden ForkJoinPool-1-worker-3
Bearbetad användarförfrågan #13 på ForkJoinPool- 1-arbetare-13 tråd
Bearbetad användarförfrågan #10 på ForkJoinPool-1-worker-31 tråd
Bearbetad användarförfrågan #18 på ForkJoinPool-1-arbetare-5 tråd
Bearbetad användarförfrågan #16 på ForkJoinPool-1-worker-9 tråd
Bearbetad användare begäran #17 på tråden ForkJoinPool-1-worker-21
Behandlad användarförfrågan #19 på tråden ForkJoinPool-1-worker-17

I denna utgång är det anmärkningsvärt att vi "badade efter" trådarna i poolen. Dessutom går namnen på arbetartrådarna inte från ett till tio, utan är istället ibland högre än tio. När vi tittar på de unika namnen ser vi att det verkligen finns tio arbetare (3, 5, 9, 13, 17, 19, 21, 23, 27 och 31). Här är det ganska rimligt att fråga varför detta hände? När du inte förstår vad som händer, använd felsökaren.

Det här är vad vi ska göra. Låt oss gjutaexecutorServiceinvända mot en ForkJoinPool :


final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Vi kommer att använda åtgärden Evaluate Expression för att undersöka detta objekt efter att ha anropat metoden invokeAll . För att göra detta, efter invokeAll -metoden, lägg till valfri sats, till exempel en tom sout, och ställ in en brytpunkt på den.

Vi kan se att poolen har 10 trådar, men storleken på uppsättningen av arbetartrådar är 32. Konstigt, men okej. Låt oss fortsätta gräva. När du skapar en pool, låt oss försöka ställa in parallellitetsnivån till mer än 32, säg 40.


ExecutorService executorService = Executors.newWorkStealingPool(40);

I debuggern, låt oss titta påforkJoinPool-objekt igen.

Nu är storleken på arrayen av arbetartrådar 128. Vi kan anta att detta är en av JVM:s interna optimeringar. Låt oss försöka hitta det i koden för JDK (openjdk-14):

Precis som vi misstänkte: storleken på arrayen av arbetartrådar beräknas genom att utföra bitvisa manipulationer på parallellitetsvärdet. Vi behöver inte försöka lista ut exakt vad som händer här. Det räcker med att helt enkelt veta att en sådan optimering finns.

En annan intressant aspekt av vårt exempel är användningen av metoden invokeAll . Det är värt att notera att metoden invokeAll kan returnera ett resultat, eller snarare en lista med resultat (i vårt fall en List<Future<Void>>) där vi kan hitta resultatet av var och en av uppgifterna.


var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Denna speciella typ av tjänst och trådpool kan användas i uppgifter med en förutsägbar, eller åtminstone implicit, nivå av samtidighet.