La oss finne ut den nyeWorkStealingPool -metoden, som forbereder en ExecutorService for oss.

Denne trådpoolen er spesiell. Dens oppførsel er basert på ideen om å "stjele" arbeid.

Oppgaver settes i kø og fordeles mellom prosessorer. Men hvis en prosessor er opptatt, kan en annen ledig prosessor stjele en oppgave fra den og utføre den. Dette formatet ble introdusert i Java for å redusere konflikter i flertrådede applikasjoner. Den er bygget på gaffel/skjøt rammeverket.

gaffel/join

I fork/join -rammeverket dekomponeres oppgaver rekursivt, det vil si at de brytes ned i deloppgaver. Deretter utføres deloppgavene individuelt, og resultatene av deloppgavene kombineres for å danne resultatet av den opprinnelige oppgaven.

Fork - metoden starter en oppgave asynkront på en eller annen tråd, og join -metoden lar deg vente på at denne oppgaven er ferdig.

newWorkStealingPool

NewWorkStealingPool - metoden har to implementeringer:


public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
 
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Fra begynnelsen merker vi at vi under panseret ikke kaller ThreadPoolExecutor -konstruktøren. Her jobber vi med ForkJoinPool -enheten. I likhet med ThreadPoolExecutor er det en implementering av AbstractExecutorService .

Vi har 2 metoder å velge mellom. I den første angir vi selv hvilket nivå av parallellitet vi ønsker å se. Hvis vi ikke spesifiserer denne verdien, vil parallelliteten til bassenget vårt være lik antall prosessorkjerner som er tilgjengelig for den virtuelle Java-maskinen.

Det gjenstår å finne ut hvordan det fungerer i praksis:


Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);
 
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Vi lager 10 oppgaver som viser sin egen fullføringsstatus. Etter det starter vi alle oppgavene ved å bruke invokeAll- metoden.

Resultater når du utfører 10 oppgaver på 10 tråder i bassenget:

Behandlet brukerforespørsel #9 på ForkJoinPool-1-worker-10 tråd
Behandlet brukerforespørsel #4 på ForkJoinPool-1-worker-5 tråd
Behandlet brukerforespørsel #7 på ForkJoinPool-1-worker-8 tråd
Behandlet brukerforespørsel #1 på ForkJoinPool- 1-arbeider-2 tråd
Behandlet brukerforespørsel #2 på ForkJoinPool-1-worker-3 tråd
Behandlet brukerforespørsel #3 på ForkJoinPool-1-worker-4 tråd
Behandlet brukerforespørsel #6 på ForkJoinPool-1-worker-7 tråd
Behandlet bruker forespørsel #0 på ForkJoinPool-1-worker-1 tråd
Behandlet brukerforespørsel #5 på ForkJoinPool-1-worker-6 tråd
Behandlet brukerforespørsel #8 på ForkJoinPool-1-worker-9 tråd

Vi ser at etter at køen er dannet, tar trådene oppgaver for utførelse. Du kan også sjekke hvordan 20 oppgaver vil bli fordelt i en pool med 10 tråder.

Behandlet brukerforespørsel #3 på ForkJoinPool-1-worker-4 tråd
Behandlet brukerforespørsel #7 på ForkJoinPool-1-worker-8 tråd
Behandlet brukerforespørsel #2 på ForkJoinPool-1-worker-3 tråd
Behandlet brukerforespørsel #4 på ForkJoinPool- 1-worker-5 thread
Behandlet brukerforespørsel #1 på ForkJoinPool-1-worker-2 thread
Behandlet brukerforespørsel #5 på ForkJoinPool-1-worker-6 thread
Behandlet brukerforespørsel #8 på ForkJoinPool-1-worker-9 thread
Behandlet bruker forespørsel #9 på ForkJoinPool-1-worker-10 tråd
Behandlet brukerforespørsel #0 på ForkJoinPool-1-worker-1 tråd
Behandlet brukerforespørsel #6 på ForkJoinPool-1-worker-7 tråd
Behandlet brukerforespørsel #10 på ForkJoinPool-1- arbeider-9 tråd
Behandlet brukerforespørsel #12 på ForkJoinPool-1-worker-1 tråd
Behandlet brukerforespørsel #13 på ForkJoinPool-1-worker-8 tråd
Behandlet brukerforespørsel #11 på ForkJoinPool-1-worker-6 tråd
Behandlet brukerforespørsel #15 på ForkJoinPool- 1-worker-8 thread
Behandlet brukerforespørsel #14 på ForkJoinPool-1-worker-1 thread
Behandlet brukerforespørsel #17 på ForkJoinPool-1-worker-6 thread
Behandlet brukerforespørsel #16 på ForkJoinPool-1-worker-7 thread
Behandlet bruker forespørsel #19 på ForkJoinPool-1-worker-6 thread
Behandlet brukerforespørsel #18 på ForkJoinPool-1-worker-1 thread

Fra utdataene kan vi se at noen tråder klarer å fullføre flere oppgaver ( ForkJoinPool-1-worker-6 fullførte 4 oppgaver), mens noen fullfører kun én ( ForkJoinPool-1-worker-2 ). Hvis en 1-sekunds forsinkelse legges til implementeringen av anropsmetoden , endres bildet.


Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

For eksperimentets skyld, la oss kjøre den samme koden på en annen maskin. Den resulterende utgangen:

Behandlet brukerforespørsel #2 på ForkJoinPool-1-worker-23 tråd
Behandlet brukerforespørsel #7 på ForkJoinPool-1-worker-31 tråd
Behandlet brukerforespørsel #4 på ForkJoinPool-1-worker-27 tråd
Behandlet brukerforespørsel #5 på ForkJoinPool- 1-arbeider-13 tråd
Behandlet brukerforespørsel #0 på ForkJoinPool-1-worker-19 tråd
Behandlet brukerforespørsel #8 på ForkJoinPool-1-worker-3 tråd
Behandlet brukerforespørsel #9 på ForkJoinPool-1-worker-21 tråd
Behandlet bruker forespørsel #6 på ForkJoinPool-1-worker-17 tråd
Behandlet brukerforespørsel #3 på ForkJoinPool-1-worker-9 tråd
Behandlet brukerforespørsel #1 på ForkJoinPool-1-worker-5 tråd
Behandlet brukerforespørsel #12 på ForkJoinPool-1- arbeider-23 tråd
Behandlet brukerforespørsel #15 på ForkJoinPool-1-worker-19 tråd
Behandlet brukerforespørsel #14 på ForkJoinPool-1-worker-27 tråd
Behandlet brukerforespørsel #11 på ForkJoinPool-1-worker-3 tråd
Behandlet brukerforespørsel #13 på ForkJoinPool- 1-arbeider-13 tråd
Behandlet brukerforespørsel #10 på ForkJoinPool-1-worker-31 tråd
Behandlet brukerforespørsel #18 på ForkJoinPool-1-worker-5 tråd
Behandlet brukerforespørsel #16 på ForkJoinPool-1-worker-9 tråd
Behandlet bruker forespørsel #17 på ForkJoinPool-1-worker-21 tråd
Behandlet brukerforespørsel #19 på ForkJoinPool-1-worker-17 tråd

I denne utgangen er det bemerkelsesverdig at vi "spurte om" trådene i bassenget. Dessuten går ikke navnene på arbeidertrådene fra én til ti, men er i stedet noen ganger høyere enn ti. Ser vi på de unike navnene, ser vi at det virkelig er ti arbeidere (3, 5, 9, 13, 17, 19, 21, 23, 27 og 31). Her er det ganske rimelig å spørre hvorfor dette skjedde? Når du ikke forstår hva som skjer, bruk feilsøkeren.

Dette er hva vi skal gjøre. La oss kasteexecutorServiceprotestere mot en ForkJoinPool :


final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Vi vil bruke Evaluate Expression-handlingen for å undersøke dette objektet etter å ha kalt invokeAll -metoden. For å gjøre dette, etter invokeAll- metoden, legg til en setning, for eksempel en tom sout, og sett et bruddpunkt på den.

Vi kan se at bassenget har 10 tråder, men størrelsen på utvalget av arbeidertråder er 32. Rart, men greit. La oss fortsette å grave. Når du oppretter et basseng, la oss prøve å sette parallellitetsnivået til mer enn 32, for eksempel 40.


ExecutorService executorService = Executors.newWorkStealingPool(40);

I debuggeren, la oss se påforkJoinPool-objektet igjen.

Nå er størrelsen på utvalget av arbeidertråder 128. Vi kan anta at dette er en av JVMs interne optimaliseringer. La oss prøve å finne den i koden til JDK (openjdk-14):

Akkurat som vi mistenkte: størrelsen på utvalget av arbeidertråder beregnes ved å utføre bitvise manipulasjoner på parallellitetsverdien. Vi trenger ikke prøve å finne ut hva som skjer her. Det er nok å bare vite at en slik optimalisering eksisterer.

Et annet interessant aspekt ved vårt eksempel er bruken av invokeAll- metoden. Det er verdt å merke seg at invokeAll -metoden kan returnere et resultat, eller snarere en liste med resultater (i vårt tilfelle en List<Future<Void>>) , der vi kan finne resultatet av hver av oppgavene.


var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Denne spesielle typen tjeneste og trådpool kan brukes i oppgaver med et forutsigbart, eller i det minste implisitt, samtidighetsnivå.