Laten we eens kijken naar de nieuweWorkStealingPool- methode, die een ExecutorService voor ons voorbereidt.

Deze draadpool is speciaal. Zijn gedrag is gebaseerd op het idee van het "stelen" van werk.

Taken worden in een wachtrij geplaatst en verdeeld over processors. Maar als een processor bezig is, dan kan een andere vrije processor er een taak van stelen en uitvoeren. Dit formaat is in Java geïntroduceerd om conflicten in toepassingen met meerdere threads te verminderen. Het is gebouwd op het fork/join- framework.

vork / voeg toe

In het fork/join- framework worden taken recursief ontleed, dat wil zeggen dat ze worden opgesplitst in subtaken. Vervolgens worden de subtaken afzonderlijk uitgevoerd en worden de resultaten van de subtaken gecombineerd om het resultaat van de oorspronkelijke taak te vormen.

De fork- methode start een taak asynchroon op een thread en met de join- methode kunt u wachten tot deze taak is voltooid.

nieuwWorkStealingPool

De methode newWorkStealingPool heeft twee implementaties:


public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
 
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Vanaf het begin merken we op dat we onder de motorkap niet de ThreadPoolExecutor -constructor aanroepen . Hier werken we met de ForkJoinPool -entiteit. Net als ThreadPoolExecutor is het een implementatie van AbstractExecutorService .

We hebben 2 methoden om uit te kiezen. In het eerste geven we zelf aan welk niveau van parallellisme we willen zien. Als we deze waarde niet specificeren, is de parallelliteit van onze pool gelijk aan het aantal processorkernen dat beschikbaar is voor de virtuele Java-machine.

Het blijft om erachter te komen hoe het in de praktijk werkt:


Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);
 
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

We maken 10 taken die hun eigen voltooiingsstatus weergeven. Daarna starten we alle taken met behulp van de invokeAll- methode.

Resultaten bij het uitvoeren van 10 taken op 10 threads in de pool:

Verwerkt gebruikersverzoek #9 op ForkJoinPool-1-worker-10 thread
Verwerkt gebruikersverzoek #4 op ForkJoinPool-1-worker-5 thread
Verwerkt gebruikersverzoek #7 op ForkJoinPool-1-worker-8 thread
Verwerkt gebruikersverzoek #1 op ForkJoinPool- 1-worker-2 thread
Verwerkt gebruikersverzoek #2 op ForkJoinPool-1-worker-3 thread
Verwerkt gebruikersverzoek #3 op ForkJoinPool-1-worker-4 thread
Verwerkt gebruikersverzoek #6 op ForkJoinPool-1-worker-7 thread
Verwerkte gebruiker verzoek #0 op ForkJoinPool-1-worker-1 thread
Verwerkt gebruikersverzoek #5 op ForkJoinPool-1-worker-6 thread
Verwerkt gebruikersverzoek #8 op ForkJoinPool-1-worker-9 thread

We zien dat nadat de wachtrij is gevormd, de threads taken overnemen voor uitvoering. U kunt ook controleren hoe 20 taken worden verdeeld in een pool van 10 threads.

Verwerkt gebruikersverzoek #3 op ForkJoinPool-1-worker-4 thread
Verwerkt gebruikersverzoek #7 op ForkJoinPool-1-worker-8 thread
Verwerkt gebruikersverzoek #2 op ForkJoinPool-1-worker-3 thread
Verwerkt gebruikersverzoek #4 op ForkJoinPool- 1-worker-5 thread
Verwerkt gebruikersverzoek #1 op ForkJoinPool-1-worker-2 thread
Verwerkt gebruikersverzoek #5 op ForkJoinPool-1-worker-6 thread
Verwerkt gebruikersverzoek #8 op ForkJoinPool-1-worker-9 thread
Verwerkte gebruiker verzoek #9 op ForkJoinPool-1-worker-10 thread
Verwerkt gebruikersverzoek #0 op ForkJoinPool-1-worker-1 thread
Verwerkt gebruikersverzoek #6 op ForkJoinPool-1-worker-7 thread
Verwerkt gebruikersverzoek #10 op ForkJoinPool-1- werker-9 draad
Verwerkt gebruikersverzoek #12 op ForkJoinPool-1-worker-1 thread
Verwerkt gebruikersverzoek #13 op ForkJoinPool-1-worker-8 thread
Verwerkt gebruikersverzoek #11 op ForkJoinPool-1-worker-6 thread
Verwerkt gebruikersverzoek #15 op ForkJoinPool- 1-worker-8 thread
Verwerkt gebruikersverzoek #14 op ForkJoinPool-1-worker-1 thread
Verwerkt gebruikersverzoek #17 op ForkJoinPool-1-worker-6 thread
Verwerkt gebruikersverzoek #16 op ForkJoinPool-1-worker-7 thread
Verwerkte gebruiker verzoek #19 op ForkJoinPool-1-worker-6 thread
Verwerkt gebruikersverzoek #18 op ForkJoinPool-1-worker-1 thread

Uit de uitvoer kunnen we zien dat sommige threads erin slagen om verschillende taken uit te voeren ( ForkJoinPool-1-worker-6 voltooide 4 taken), terwijl sommige er slechts één voltooien ( ForkJoinPool-1-worker-2 ). Als er een vertraging van 1 seconde wordt toegevoegd aan de implementatie van de oproepmethode , verandert het beeld.


Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Laten we, omwille van het experiment, dezelfde code op een andere machine uitvoeren. De resulterende uitvoer:

Verwerkt gebruikersverzoek #2 op ForkJoinPool-1-worker-23 thread
Verwerkt gebruikersverzoek #7 op ForkJoinPool-1-worker-31 thread
Verwerkt gebruikersverzoek #4 op ForkJoinPool-1-worker-27 thread
Verwerkt gebruikersverzoek #5 op ForkJoinPool- 1-worker-13 thread
Verwerkt gebruikersverzoek #0 op ForkJoinPool-1-worker-19 thread
Verwerkt gebruikersverzoek #8 op ForkJoinPool-1-worker-3 thread
Verwerkt gebruikersverzoek #9 op ForkJoinPool-1-worker-21 thread
Verwerkte gebruiker verzoek #6 op ForkJoinPool-1-worker-17 thread
Verwerkt gebruikersverzoek #3 op ForkJoinPool-1-worker-9 thread
Verwerkt gebruikersverzoek #1 op ForkJoinPool-1-worker-5 thread
Verwerkt gebruikersverzoek #12 op ForkJoinPool-1- werker-23 draad
Verwerkt gebruikersverzoek #15 op ForkJoinPool-1-worker-19 thread
Verwerkt gebruikersverzoek #14 op ForkJoinPool-1-worker-27 thread
Verwerkt gebruikersverzoek #11 op ForkJoinPool-1-worker-3 thread
Verwerkt gebruikersverzoek #13 op ForkJoinPool- 1-worker-13 thread
Verwerkt gebruikersverzoek #10 op ForkJoinPool-1-worker-31 thread
Verwerkt gebruikersverzoek #18 op ForkJoinPool-1-worker-5 thread
Verwerkt gebruikersverzoek #16 op ForkJoinPool-1-worker-9 thread
Verwerkte gebruiker verzoek #17 op ForkJoinPool-1-worker-21 thread
Verwerkt gebruikersverzoek #19 op ForkJoinPool-1-worker-17 thread

In deze uitvoer valt op dat we "vroegen" naar de threads in de pool. Bovendien gaan de namen van de werkthreads niet van één tot tien, maar zijn ze in plaats daarvan soms hoger dan tien. Als we naar de unieke namen kijken, zien we dat het er echt tien zijn (3, 5, 9, 13, 17, 19, 21, 23, 27 en 31). Hier is het heel redelijk om te vragen waarom dit is gebeurd? Gebruik de debugger wanneer u niet begrijpt wat er aan de hand is.

Dit is wat we gaan doen. Laten we deuitvoerderDienstbezwaar maken tegen een ForkJoinPool :


final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

We zullen de actie Evaluate Expression gebruiken om dit object te onderzoeken nadat de methode invokeAll is aangeroepen . Om dit te doen, voegt u na de methode invokeAll een statement toe, zoals een lege sout, en stelt u er een breekpunt op in.

We kunnen zien dat de pool 10 threads heeft, maar de grootte van de reeks werkthreads is 32. Raar, maar oké. Laten we blijven graven. Laten we bij het maken van een pool proberen het parallelliteitsniveau in te stellen op meer dan 32, zeg 40.


ExecutorService executorService = Executors.newWorkStealingPool(40);

Laten we in de debugger kijken naar deforkJoinPool-object opnieuw.

Nu is de grootte van de reeks werkthreads 128. We kunnen aannemen dat dit een van de interne optimalisaties van de JVM is. Laten we proberen het te vinden in de code van de JDK (openjdk-14):

Precies zoals we al vermoedden: de grootte van de array van worker-threads wordt berekend door bitsgewijze manipulaties uit te voeren op de parallelliteitswaarde. We hoeven niet te proberen erachter te komen wat hier precies gebeurt. Het is voldoende om simpelweg te weten dat een dergelijke optimalisatie bestaat.

Een ander interessant aspect van ons voorbeeld is het gebruik van de methode invokeAll . Het is vermeldenswaard dat de methode invokeAll een resultaat kan retourneren, of liever een lijst met resultaten (in ons geval een List<Future<Void>>) , waar we het resultaat van elk van de taken kunnen vinden.


var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Dit speciale soort service en threadpool kan worden gebruikt bij taken met een voorspelbaar of op zijn minst impliciet niveau van gelijktijdigheid.