Vamos descobrir o método newWorkStealingPool , que prepara um ExecutorService para nós.

Este pool de threads é especial. Seu comportamento é baseado na ideia de "roubar" o trabalho.

As tarefas são enfileiradas e distribuídas entre os processadores. Mas se um processador estiver ocupado, outro processador livre pode roubar uma tarefa dele e executá-la. Este formato foi introduzido em Java para reduzir conflitos em aplicações multi-threaded. Ele é construído na estrutura fork/join .

bifurcar/juntar

No framework fork/join , as tarefas são decompostas recursivamente, ou seja, são divididas em subtarefas. Em seguida, as subtarefas são executadas individualmente e os resultados das subtarefas são combinados para formar o resultado da tarefa original.

O método fork inicia uma tarefa de forma assíncrona em algum thread, e o método join permite que você aguarde a conclusão dessa tarefa.

newWorkStealingPool

O método newWorkStealingPool tem duas implementações:

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

Desde o início, notamos que não estamos chamando o construtor ThreadPoolExecutor . Aqui estamos trabalhando com a entidade ForkJoinPool . Como ThreadPoolExecutor , é uma implementação de AbstractExecutorService .

Temos 2 métodos para escolher. Na primeira, nós mesmos indicamos que nível de paralelismo queremos ver. Se não especificarmos esse valor, o paralelismo de nosso pool será igual ao número de núcleos de processador disponíveis para a máquina virtual Java.

Resta descobrir como funciona na prática:

Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);

        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

Criamos 10 tarefas que exibem seu próprio status de conclusão. Depois disso, lançamos todas as tarefas usando o método invokeAll .

Resultados ao executar 10 tarefas em 10 threads no pool:

Solicitação de usuário processada nº 9 no thread ForkJoinPool-1-worker-10
Solicitação de usuário processada nº 4 no thread ForkJoinPool-1-worker-5
Solicitação de usuário processada nº 7 no thread ForkJoinPool-1-worker-8 Solicitação de
usuário processada nº 1 em ForkJoinPool- 1-worker-2 thread
Solicitação de usuário processada nº 2 em ForkJoinPool-1-worker-3 thread
Solicitação de usuário processada nº 3 em ForkJoinPool-1-worker-4 thread
Solicitação de usuário processada nº 6 em ForkJoinPool-1-worker-7 thread
Usuário processado solicitação nº 0 no thread ForkJoinPool-1-worker-1
Solicitação de usuário processada nº 5 no thread ForkJoinPool-1-worker-6
Solicitação de usuário processada nº 8 no thread ForkJoinPool-1-worker-9

Vemos que após a formação da fila, as threads recebem as tarefas para execução. Você também pode verificar como 20 tarefas serão distribuídas em um pool de 10 threads.

Solicitação de usuário processada nº 3 no thread ForkJoinPool-1-worker-4
Solicitação de usuário processada nº 7 no thread ForkJoinPool-1-worker-8
Solicitação de usuário processada nº 2 no thread ForkJoinPool-1-worker-3
Solicitação de usuário processada nº 4 em ForkJoinPool- 1-worker-5 thread
Solicitação de usuário processada nº 1 no thread ForkJoinPool-1-worker-2
Solicitação de usuário processada nº 5 no thread ForkJoinPool-1-worker-6
Solicitação de usuário processada nº 8 no thread ForkJoinPool-1-worker-9
Usuário processado solicitação nº 9 no thread ForkJoinPool-1-worker-10
Solicitação de usuário processada nº 0 no thread ForkJoinPool-1-worker-1
Solicitação de usuário processada nº 6 no thread ForkJoinPool-1-worker-7
Solicitação de usuário processada nº 10 em ForkJoinPool-1- thread trabalhador-9
Solicitação de usuário processada nº 12 no thread ForkJoinPool-1-worker-1
Solicitação de usuário processada nº 13 no thread ForkJoinPool-1-worker-8
Solicitação de usuário processada nº 11 no thread ForkJoinPool-1-worker-6 Solicitação de
usuário processada nº 15 em ForkJoinPool- 1-worker-8 thread
Solicitação de usuário processada nº 14 em ForkJoinPool-1-worker-1 thread
Solicitação de usuário processada nº 17 em ForkJoinPool-1-worker-6 thread
Solicitação de usuário processada nº 16 em ForkJoinPool-1-worker-7 thread
Usuário processado solicitação nº 19 no thread ForkJoinPool-1-worker-6
Solicitação de usuário processada nº 18 no thread ForkJoinPool-1-worker-1

A partir da saída, podemos ver que alguns threads conseguem concluir várias tarefas ( ForkJoinPool-1-worker-6 completou 4 tarefas), enquanto alguns concluem apenas uma ( ForkJoinPool-1-worker-2 ). Se um atraso de 1 segundo for adicionado à implementação do método de chamada , a imagem muda.

Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

Para fins de experiência, vamos executar o mesmo código em outra máquina. A saída resultante:

Solicitação de usuário processada nº 2 no thread ForkJoinPool-1-worker-23
Solicitação de usuário processada nº 7 no thread ForkJoinPool-1-worker-31
Solicitação de usuário processada nº 4 no thread ForkJoinPool-1-worker-27 Solicitação de
usuário processada nº 5 em ForkJoinPool- 1-worker-13 thread
Solicitação de usuário processada nº 0 no thread ForkJoinPool-1-worker-19
Solicitação de usuário processada nº 8 no thread ForkJoinPool-1-worker-3
Solicitação de usuário processada nº 9 no thread ForkJoinPool-1-worker-21
Usuário processado solicitação nº 6 no thread ForkJoinPool-1-worker-17
Solicitação de usuário processada nº 3 no thread ForkJoinPool-1-worker-9
Solicitação de usuário processada nº 1 no thread ForkJoinPool-1-worker-5 Solicitação de
usuário processada nº 12 em ForkJoinPool-1- thread trabalhador-23
Solicitação de usuário processada nº 15 no thread ForkJoinPool-1-worker-19
Solicitação de usuário processada nº 14 no thread ForkJoinPool-1-worker-27
Solicitação de usuário processada nº 11 no thread ForkJoinPool-1-worker-3 Solicitação de
usuário processada nº 13 em ForkJoinPool- 1-worker-13 thread
Solicitação de usuário processada nº 10 no thread ForkJoinPool-1-worker-31
Solicitação de usuário processada nº 18 no thread ForkJoinPool-1-worker-5
Solicitação de usuário processada nº 16 no thread ForkJoinPool-1-worker-9
usuário processado solicitação nº 17 no thread ForkJoinPool-1-worker-21
Solicitação de usuário processada nº 19 no thread ForkJoinPool-1-worker-17

Nesta saída, é notável que "pedimos" os encadeamentos no pool. Além do mais, os nomes dos threads de trabalho não vão de um a dez, mas às vezes são maiores que dez. Olhando para os nomes únicos, vemos que realmente existem dez trabalhadores (3, 5, 9, 13, 17, 19, 21, 23, 27 e 31). Aqui é bastante razoável perguntar por que isso aconteceu? Sempre que você não entender o que está acontecendo, use o depurador.

Isso é o que faremos. Vamos lançar oexecutorServiceobjeto para um ForkJoinPool :

final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

Usaremos a ação Evaluate Expression para examinar esse objeto após chamar o método invokeAll . Para fazer isso, após o método invokeAll , adicione qualquer instrução, como um sout vazio, e defina um ponto de interrupção nele.

Podemos ver que o pool tem 10 threads, mas o tamanho da matriz de threads de trabalho é 32. Estranho, mas tudo bem. Vamos continuar cavando. Ao criar um pool, vamos tentar definir o nível de paralelismo para mais de 32, digamos 40.

ExecutorService executorService = Executors.newWorkStealingPool(40);

No depurador, vamos ver oobjeto forkJoinPool novamente.

Agora, o tamanho da matriz de threads de trabalho é 128. Podemos assumir que esta é uma das otimizações internas da JVM. Vamos tentar encontrá-lo no código do JDK (openjdk-14):

Exatamente como suspeitávamos: o tamanho do array de threads de trabalho é calculado realizando manipulações bit a bit no valor de paralelismo. Não precisamos tentar descobrir o que exatamente está acontecendo aqui. Basta simplesmente saber que tal otimização existe.

Outro aspecto interessante do nosso exemplo é o uso do método invokeAll . Vale ressaltar que o método invokeAll pode retornar um resultado, ou melhor, uma lista de resultados (no nosso caso, uma List<Future<Void>>) , onde podemos encontrar o resultado de cada uma das tarefas.

var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

Esse tipo especial de pool de threads e serviços pode ser usado em tarefas com um nível de simultaneidade previsível ou, pelo menos, implícito.