Lad os finde ud af den nyeWorkStealingPool- metode, som forbereder en ExecutorService for os.

Denne trådpulje er speciel. Dens adfærd er baseret på ideen om at "stjæle" arbejde.

Opgaver sættes i kø og fordeles mellem processorer. Men hvis en processor er optaget, så kan en anden ledig processor stjæle en opgave fra den og udføre den. Dette format blev introduceret i Java for at reducere konflikter i flertrådede applikationer. Den er bygget på gaffel/samlingsramme .

gaffel/join

I fork/join -rammen dekomponeres opgaver rekursivt, det vil sige, at de er opdelt i delopgaver. Derefter udføres delopgaverne individuelt, og resultaterne af delopgaverne kombineres for at danne resultatet af den oprindelige opgave.

Fork - metoden starter en opgave asynkront på en eller anden tråd, og join -metoden lader dig vente på, at denne opgave er færdig.

newWorkStealingPool

NewWorkStealingPool - metoden har to implementeringer:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Fra starten bemærker vi, at vi under motorhjelmen ikke kalder ThreadPoolExecutor -konstruktøren. Her arbejder vi med ForkJoinPool -enheden. Ligesom ThreadPoolExecutor er det en implementering af AbstractExecutorService .

Vi har 2 metoder at vælge imellem. I den første angiver vi selv, hvilket niveau af parallelitet vi ønsker at se. Hvis vi ikke angiver denne værdi, vil paralleliteten af ​​vores pool være lig med antallet af processorkerner, der er tilgængelige for den virtuelle Java-maskine.

Det er tilbage at finde ud af, hvordan det fungerer i praksis:

Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Vi opretter 10 opgaver, der viser deres egen afslutningsstatus. Derefter starter vi alle opgaverne ved hjælp af invokeAll- metoden.

Resultater ved udførelse af 10 opgaver på 10 tråde i puljen:

Behandlet brugeranmodning #9 på ForkJoinPool-1-worker-10 tråd
Behandlet brugeranmodning #4 på ForkJoinPool-1-worker-5 tråd
Behandlet brugeranmodning #7 på ForkJoinPool-1-worker-8 tråd
Behandlet brugeranmodning #1 på ForkJoinPool- 1-arbejder-2 tråd
Behandlet brugeranmodning #2 på ForkJoinPool-1-worker-3 tråd
Behandlet brugeranmodning #3 på ForkJoinPool-1-worker-4 tråd
Behandlet brugeranmodning #6 på ForkJoinPool-1-worker-7 tråd
Behandlet bruger anmodning #0 på ForkJoinPool-1-worker-1 tråd
Behandlet brugeranmodning #5 på ForkJoinPool-1-worker-6 tråd
Behandlet brugeranmodning #8 på ForkJoinPool-1-worker-9 tråd

Vi ser, at efter køen er dannet, tager trådene opgaver til udførelse. Du kan også tjekke, hvordan 20 opgaver vil blive fordelt i en pulje med 10 tråde.

Behandlet brugeranmodning #3 på ForkJoinPool-1-worker-4 tråd
Behandlet brugeranmodning #7 på ForkJoinPool-1-worker-8 tråd
Behandlet brugeranmodning #2 på ForkJoinPool-1-worker-3 tråd
Behandlet brugeranmodning #4 på ForkJoinPool- 1-arbejder-5 tråd
Behandlet brugeranmodning #1 på ForkJoinPool-1-worker-2 tråd
Behandlet brugeranmodning #5 på ForkJoinPool-1-worker-6 tråd
Behandlet brugeranmodning #8 på ForkJoinPool-1-worker-9 tråd
Behandlet bruger anmodning #9 på ForkJoinPool-1-worker-10 tråd
Behandlet brugeranmodning #0 på ForkJoinPool-1-worker-1 tråd
Behandlet brugeranmodning #6 på ForkJoinPool-1-worker-7 tråd
Behandlet brugeranmodning #10 på ForkJoinPool-1- arbejder-9 tråd
Behandlet brugeranmodning #12 på ForkJoinPool-1-worker-1 tråd
Behandlet brugeranmodning #13 på ForkJoinPool-1-worker-8 tråd
Behandlet brugeranmodning #11 på ForkJoinPool-1-worker-6 tråd
Behandlet brugeranmodning #15 på ForkJoinPool- 1-worker-8 thread
Behandlet brugeranmodning #14 på ForkJoinPool-1-worker-1 thread
Behandlet brugeranmodning #17 på ForkJoinPool-1-worker-6 thread
Behandlet brugeranmodning #16 på ForkJoinPool-1-worker-7 thread
Behandlet bruger anmodning #19 på ForkJoinPool-1-worker-6 tråd
Behandlet brugeranmodning #18 på ForkJoinPool-1-worker-1 tråd

Fra outputtet kan vi se, at nogle tråde formår at udføre flere opgaver ( ForkJoinPool-1-worker-6 fuldførte 4 opgaver), mens nogle kun udfører én ( ForkJoinPool-1-worker-2 ). Hvis en 1-sekunds forsinkelse tilføjes til implementeringen af ​​opkaldsmetoden , ændres billedet.

Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

For eksperimentets skyld, lad os køre den samme kode på en anden maskine. Det resulterende output:

Behandlet brugeranmodning #2 på ForkJoinPool-1-worker-23 tråd
Behandlet brugeranmodning #7 på ForkJoinPool-1-worker-31 tråd
Behandlet brugeranmodning #4 på ForkJoinPool-1-worker-27 tråd
Behandlet brugeranmodning #5 på ForkJoinPool- 1-worker-13 tråd
Behandlet brugeranmodning #0 på ForkJoinPool-1-worker-19 tråd
Behandlet brugeranmodning #8 på ForkJoinPool-1-worker-3 thread
Behandlet brugeranmodning #9 på ForkJoinPool-1-worker-21 tråd
Behandlet bruger anmodning #6 på ForkJoinPool-1-worker-17 tråd
Behandlet brugeranmodning #3 på ForkJoinPool-1-worker-9 tråd
Behandlet brugeranmodning #1 på ForkJoinPool-1-worker-5 tråd
Behandlet brugeranmodning #12 på ForkJoinPool-1- arbejder-23 tråd
Behandlet brugeranmodning #15 på ForkJoinPool-1-worker-19 tråd
Behandlet brugeranmodning #14 på ForkJoinPool-1-worker-27 tråd
Behandlet brugeranmodning #11 på ForkJoinPool-1-worker-3 tråd
Behandlet brugeranmodning #13 på ForkJoinPool- 1-arbejder-13 tråd
Behandlet brugeranmodning #10 på ForkJoinPool-1-worker-31 tråd
Behandlet brugeranmodning #18 på ForkJoinPool-1-worker-5 tråd
Behandlet brugeranmodning #16 på ForkJoinPool-1-worker-9 tråd
Behandlet bruger anmodning #17 på ForkJoinPool-1-worker-21 tråd
Behandlet brugeranmodning #19 på ForkJoinPool-1-worker-17 tråd

I dette output er det bemærkelsesværdigt, at vi "bad om" trådene i poolen. Hvad mere er, går navnene på arbejdertrådene ikke fra et til ti, men er i stedet nogle gange højere end ti. Ser vi på de unikke navne, ser vi, at der virkelig er ti arbejdere (3, 5, 9, 13, 17, 19, 21, 23, 27 og 31). Her er det ganske rimeligt at spørge, hvorfor det skete? Når du ikke forstår, hvad der foregår, skal du bruge debuggeren.

Det er, hvad vi vil gøre. Lad os kaste deneksekutorServicegøre indsigelse mod en ForkJoinPool :

final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Vi vil bruge Evaluate Expression-handlingen til at undersøge dette objekt efter at have kaldt invokeAll -metoden. For at gøre dette skal du efter invokeAll- metoden tilføje ethvert udsagn, såsom en tom sout, og sætte et brudpunkt på det.

Vi kan se, at poolen har 10 tråde, men størrelsen på rækken af ​​arbejdertråde er 32. Underligt, men okay. Lad os blive ved med at grave. Når du opretter en pulje, lad os prøve at indstille parallelitetsniveauet til mere end 32, f.eks. 40.

ExecutorService executorService = Executors.newWorkStealingPool(40);

I debuggeren, lad os se påforkJoinPool-objekt igen.

Nu er størrelsen af ​​rækken af ​​arbejdertråde 128. Vi kan antage, at dette er en af ​​JVM's interne optimeringer. Lad os prøve at finde det i koden til JDK (openjdk-14):

Ligesom vi havde mistanke om: størrelsen af ​​rækken af ​​arbejdertråde beregnes ved at udføre bitvise manipulationer på parallelitetsværdien. Vi behøver ikke at forsøge at finde ud af, hvad der præcist sker her. Det er nok blot at vide, at en sådan optimering findes.

Et andet interessant aspekt af vores eksempel er brugen af ​​metoden invokeAll . Det er værd at bemærke, at metoden invokeAll kan returnere et resultat, eller rettere en liste over resultater (i vores tilfælde en List<Future<Void>>) , hvor vi kan finde resultatet af hver af opgaverne.

var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Denne særlige form for service og trådpulje kan bruges i opgaver med et forudsigeligt, eller i det mindste implicit, samtidighedsniveau.