Analicemos el método newWorkStealingPool , que prepara un ExecutorService para nosotros.

Este grupo de subprocesos es especial. Su comportamiento se basa en la idea de "robar" el trabajo.

Las tareas se ponen en cola y se distribuyen entre los procesadores. Pero si un procesador está ocupado, otro procesador libre puede robarle una tarea y ejecutarla. Este formato se introdujo en Java para reducir los conflictos en las aplicaciones de subprocesos múltiples. Está construido sobre el marco fork/join .

bifurcarse/unirse

En el marco fork/join , las tareas se descomponen recursivamente, es decir, se dividen en subtareas. Luego, las subtareas se ejecutan individualmente y los resultados de las subtareas se combinan para formar el resultado de la tarea original.

El método de bifurcación inicia una tarea de forma asíncrona en algún subproceso, y el método de unión le permite esperar a que finalice esta tarea.

método newWorkStealingPool

El método newWorkStealingPool tiene dos implementaciones:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Desde el principio, notamos que bajo el capó no estamos llamando al constructor ThreadPoolExecutor . Aquí estamos trabajando con la entidad ForkJoinPool . Al igual que ThreadPoolExecutor , es una implementación de AbstractExecutorService .

Tenemos 2 métodos para elegir. En el primero, nosotros mismos indicamos qué nivel de paralelismo queremos ver. Si no especificamos este valor, entonces el paralelismo de nuestro grupo será igual a la cantidad de núcleos de procesador disponibles para la máquina virtual Java.

Queda por descubrir cómo funciona en la práctica:

Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Creamos 10 tareas que muestran su propio estado de finalización. Después de eso, lanzamos todas las tareas usando el método invocarTodos .

Resultados al ejecutar 10 tareas en 10 subprocesos en el grupo:

Solicitud de usuario procesada #9 en el subproceso ForkJoinPool-1-worker-10
Solicitud de usuario procesada #4 en el subproceso ForkJoinPool-1-worker-5
Solicitud de usuario procesada #7 en el subproceso ForkJoinPool-1-worker-8
Solicitud de usuario procesada #1 en ForkJoinPool- 1-worker-2 subproceso
Solicitud de usuario procesada n.º 2 en ForkJoinPool-1-worker-3 subproceso
Solicitud de usuario procesada n.º 3 en ForkJoinPool-1-worker-4 subproceso
Solicitud de usuario procesada n.º 6 en ForkJoinPool-1-worker-7 subproceso
Usuario procesado solicitud #0 en el subproceso ForkJoinPool-1-worker-1
Solicitud de usuario procesada #5 en el subproceso ForkJoinPool-1-worker-6
Solicitud de usuario procesada #8 en el subproceso ForkJoinPool-1-worker-9

Vemos que después de que se forma la cola, los hilos toman tareas para su ejecución. También puede verificar cómo se distribuirán 20 tareas en un grupo de 10 hilos.

Solicitud de usuario procesada #3 en el subproceso ForkJoinPool-1-worker-4
Solicitud de usuario procesada #7 en el subproceso ForkJoinPool-1-worker-8
Solicitud de usuario procesada #2 en el subproceso ForkJoinPool-1-worker-3 Solicitud de
usuario procesada #4 en ForkJoinPool- 1-worker-5 subproceso
Solicitud de usuario procesada n.° 1 en ForkJoinPool-1-worker-2 subproceso
Solicitud de usuario procesada n.° 5 en ForkJoinPool-1-worker-6 subproceso
Solicitud de usuario procesada n.° 8 en ForkJoinPool-1-worker-9 subproceso
Usuario procesado solicitud n.° 9 en el subproceso ForkJoinPool-1-worker-10
Solicitud de usuario procesada n.° 0 en el subproceso ForkJoinPool-1-worker-1
Solicitud de usuario procesada n.° 6 en el subproceso ForkJoinPool-1-worker-7
Solicitud procesada del usuario n.° 10 en ForkJoinPool-1- subproceso trabajador-9
Solicitud de usuario procesada #12 en el subproceso ForkJoinPool-1-worker-1
Solicitud de usuario procesada #13 en el subproceso ForkJoinPool-1-worker-8
Solicitud de usuario procesada #11 en el subproceso ForkJoinPool-1-worker-6 Solicitud de
usuario procesada #15 en ForkJoinPool- 1-worker-8 subproceso
Solicitud de usuario procesada n.º 14 en ForkJoinPool-1-worker-1 subproceso
Solicitud de usuario procesada n.º 17 en ForkJoinPool-1-worker-6 subproceso
Solicitud de usuario procesada n.º 16 en ForkJoinPool-1-worker-7 subproceso
Usuario procesado solicitud n.º 19 en el subproceso ForkJoinPool-1-worker-6
Solicitud de usuario procesada n.º 18 en el subproceso ForkJoinPool-1-worker-1

A partir del resultado, podemos ver que algunos subprocesos logran completar varias tareas ( ForkJoinPool-1-worker-6 completó 4 tareas), mientras que algunos completan solo una ( ForkJoinPool-1-worker-2 ). Si se agrega un retraso de 1 segundo a la implementación del método de llamada , la imagen cambia.

Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Por el bien del experimento, ejecutemos el mismo código en otra máquina. La salida resultante:

Solicitud de usuario procesada #2 en el subproceso ForkJoinPool-1-worker-23
Solicitud de usuario procesada #7 en el subproceso ForkJoinPool-1-worker-31
Solicitud de usuario procesada #4 en el subproceso ForkJoinPool-1-worker-27 Solicitud de
usuario procesada #5 en ForkJoinPool- 1-worker-13 subproceso
Solicitud de usuario procesada #0 en ForkJoinPool-1-worker-19 subproceso
Solicitud de usuario procesada #8 en ForkJoinPool-1-worker-3 subproceso
Solicitud de usuario procesada #9 en ForkJoinPool-1-worker-21 subproceso
Usuario procesado solicitud n.° 6 en el subproceso ForkJoinPool-1-worker-17
Solicitud de usuario procesada n.° 3 en el subproceso
ForkJoinPool-1-worker-9 Solicitud de usuario procesada n.° 1 en el subproceso ForkJoinPool-1-worker-5
Solicitud de usuario procesada n.° 12 en ForkJoinPool-1- subproceso trabajador-23
Solicitud de usuario procesada #15 en el subproceso ForkJoinPool-1-worker-19
Solicitud de usuario procesada #14 en el subproceso
ForkJoinPool-1-worker-27 Solicitud de usuario procesada #11 en el subproceso ForkJoinPool-1-worker-3
Solicitud de usuario procesada #13 en ForkJoinPool- 1-worker-13 subproceso
Solicitud de usuario procesada n.º 10 en ForkJoinPool-1-worker-31 subproceso
Solicitud de usuario procesada n.º 18 en ForkJoinPool-1-worker-5 subproceso
Solicitud de usuario procesada n.º 16 en ForkJoinPool-1-worker-9 subproceso
Usuario procesado solicitud n.° 17 en el subproceso ForkJoinPool-1-worker-21
Solicitud de usuario procesada n.° 19 en el subproceso ForkJoinPool-1-worker-17

En esta salida, es notable que "pedimos" los subprocesos en el grupo. Además, los nombres de los subprocesos de trabajo no van del uno al diez, sino que a veces son superiores a diez. Mirando los nombres únicos, vemos que realmente hay diez trabajadores (3, 5, 9, 13, 17, 19, 21, 23, 27 y 31). Aquí es bastante razonable preguntar por qué sucedió esto. Siempre que no entienda lo que está pasando, use el depurador.

Esto es lo que haremos. Echemos elalbaceaServicioobjeto a un ForkJoinPool :

final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Usaremos la acción Evaluar expresión para examinar este objeto después de llamar al método invocarTodo . Para hacer esto, después del método invoqueTodo , agregue cualquier declaración, como un sout vacío, y establezca un punto de interrupción en él.

Podemos ver que el grupo tiene 10 subprocesos, pero el tamaño de la matriz de subprocesos de trabajo es 32. Extraño, pero está bien. Sigamos cavando. Al crear un grupo, intentemos establecer el nivel de paralelismo en más de 32, digamos 40.

ExecutorService executorService = Executors.newWorkStealingPool(40);

En el depurador, veamos elobjeto forkJoinPool de nuevo.

Ahora, el tamaño de la matriz de subprocesos de trabajo es 128. Podemos suponer que esta es una de las optimizaciones internas de JVM. Intentemos encontrarlo en el código del JDK (openjdk-14):

Tal como sospechábamos: el tamaño de la matriz de subprocesos de trabajo se calcula realizando manipulaciones bit a bit en el valor de paralelismo. No necesitamos tratar de averiguar qué está sucediendo exactamente aquí. Es suficiente simplemente saber que tal optimización existe.

Otro aspecto interesante de nuestro ejemplo es el uso del método invocarTodos . Vale la pena señalar que el método invocarTodo puede devolver un resultado, o más bien una lista de resultados (en nuestro caso, un List<Future<Void>>) , donde podemos encontrar el resultado de cada una de las tareas.

var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Este tipo especial de servicio y grupo de subprocesos se puede utilizar en tareas con un nivel de simultaneidad predecible, o al menos implícito.