Vamos descobrir o método newWorkStealingPool , que prepara um ExecutorService para nós.

Este pool de threads é especial. Seu comportamento é baseado na ideia de "roubar" o trabalho.
As tarefas são enfileiradas e distribuídas entre os processadores. Mas se um processador estiver ocupado, outro processador livre pode roubar uma tarefa dele e executá-la. Este formato foi introduzido em Java para reduzir conflitos em aplicações multi-threaded. Ele é construído na estrutura fork/join .
bifurcar/juntar

No framework fork/join , as tarefas são decompostas recursivamente, ou seja, são divididas em subtarefas. Em seguida, as subtarefas são executadas individualmente e os resultados das subtarefas são combinados para formar o resultado da tarefa original.
O método fork inicia uma tarefa de forma assíncrona em algum thread, e o método join permite que você aguarde a conclusão dessa tarefa.
newWorkStealingPool
O método newWorkStealingPool tem duas implementações:
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Desde o início, notamos que não estamos chamando o construtor ThreadPoolExecutor . Aqui estamos trabalhando com a entidade ForkJoinPool . Como ThreadPoolExecutor , é uma implementação de AbstractExecutorService .
Temos 2 métodos para escolher. Na primeira, nós mesmos indicamos que nível de paralelismo queremos ver. Se não especificarmos esse valor, o paralelismo de nosso pool será igual ao número de núcleos de processador disponíveis para a máquina virtual Java.
Resta descobrir como funciona na prática:
Collection<Callable<Void>> tasks = new ArrayList<>();
ExecutorService executorService = Executors.newWorkStealingPool(10);
for (int i = 0; i < 10; i++) {
int taskNumber = i;
Callable<Void> callable = () -> {
System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
return null;
};
tasks.add(callable);
}
executorService.invokeAll(tasks);
Criamos 10 tarefas que exibem seu próprio status de conclusão. Depois disso, lançamos todas as tarefas usando o método invokeAll .
Resultados ao executar 10 tarefas em 10 threads no pool:
Solicitação de usuário processada nº 4 no thread ForkJoinPool-1-worker-5
Solicitação de usuário processada nº 7 no thread ForkJoinPool-1-worker-8 Solicitação de
usuário processada nº 1 em ForkJoinPool- 1-worker-2 thread
Solicitação de usuário processada nº 2 em ForkJoinPool-1-worker-3 thread
Solicitação de usuário processada nº 3 em ForkJoinPool-1-worker-4 thread
Solicitação de usuário processada nº 6 em ForkJoinPool-1-worker-7 thread
Usuário processado solicitação nº 0 no thread ForkJoinPool-1-worker-1
Solicitação de usuário processada nº 5 no thread ForkJoinPool-1-worker-6
Solicitação de usuário processada nº 8 no thread ForkJoinPool-1-worker-9
Vemos que após a formação da fila, as threads recebem as tarefas para execução. Você também pode verificar como 20 tarefas serão distribuídas em um pool de 10 threads.
Solicitação de usuário processada nº 7 no thread ForkJoinPool-1-worker-8
Solicitação de usuário processada nº 2 no thread ForkJoinPool-1-worker-3
Solicitação de usuário processada nº 4 em ForkJoinPool- 1-worker-5 thread
Solicitação de usuário processada nº 1 no thread ForkJoinPool-1-worker-2
Solicitação de usuário processada nº 5 no thread ForkJoinPool-1-worker-6
Solicitação de usuário processada nº 8 no thread ForkJoinPool-1-worker-9
Usuário processado solicitação nº 9 no thread ForkJoinPool-1-worker-10
Solicitação de usuário processada nº 0 no thread ForkJoinPool-1-worker-1
Solicitação de usuário processada nº 6 no thread ForkJoinPool-1-worker-7
Solicitação de usuário processada nº 10 em ForkJoinPool-1- thread trabalhador-9
Solicitação de usuário processada nº 12 no thread ForkJoinPool-1-worker-1
Solicitação de usuário processada nº 13 no thread ForkJoinPool-1-worker-8
Solicitação de usuário processada nº 11 no thread ForkJoinPool-1-worker-6 Solicitação de
usuário processada nº 15 em ForkJoinPool- 1-worker-8 thread
Solicitação de usuário processada nº 14 em ForkJoinPool-1-worker-1 thread
Solicitação de usuário processada nº 17 em ForkJoinPool-1-worker-6 thread
Solicitação de usuário processada nº 16 em ForkJoinPool-1-worker-7 thread
Usuário processado solicitação nº 19 no thread ForkJoinPool-1-worker-6
Solicitação de usuário processada nº 18 no thread ForkJoinPool-1-worker-1
A partir da saída, podemos ver que alguns threads conseguem concluir várias tarefas ( ForkJoinPool-1-worker-6 completou 4 tarefas), enquanto alguns concluem apenas uma ( ForkJoinPool-1-worker-2 ). Se um atraso de 1 segundo for adicionado à implementação do método de chamada , a imagem muda.
Callable<Void> callable = () -> {
System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(1);
return null;
};
Para fins de experiência, vamos executar o mesmo código em outra máquina. A saída resultante:
Solicitação de usuário processada nº 7 no thread ForkJoinPool-1-worker-31
Solicitação de usuário processada nº 4 no thread ForkJoinPool-1-worker-27 Solicitação de
usuário processada nº 5 em ForkJoinPool- 1-worker-13 thread
Solicitação de usuário processada nº 0 no thread ForkJoinPool-1-worker-19
Solicitação de usuário processada nº 8 no thread ForkJoinPool-1-worker-3
Solicitação de usuário processada nº 9 no thread ForkJoinPool-1-worker-21
Usuário processado solicitação nº 6 no thread ForkJoinPool-1-worker-17
Solicitação de usuário processada nº 3 no thread ForkJoinPool-1-worker-9
Solicitação de usuário processada nº 1 no thread ForkJoinPool-1-worker-5 Solicitação de
usuário processada nº 12 em ForkJoinPool-1- thread trabalhador-23
Solicitação de usuário processada nº 15 no thread ForkJoinPool-1-worker-19
Solicitação de usuário processada nº 14 no thread ForkJoinPool-1-worker-27
Solicitação de usuário processada nº 11 no thread ForkJoinPool-1-worker-3 Solicitação de
usuário processada nº 13 em ForkJoinPool- 1-worker-13 thread
Solicitação de usuário processada nº 10 no thread ForkJoinPool-1-worker-31
Solicitação de usuário processada nº 18 no thread ForkJoinPool-1-worker-5
Solicitação de usuário processada nº 16 no thread ForkJoinPool-1-worker-9
usuário processado solicitação nº 17 no thread ForkJoinPool-1-worker-21
Solicitação de usuário processada nº 19 no thread ForkJoinPool-1-worker-17
Nesta saída, é notável que "pedimos" os encadeamentos no pool. Além do mais, os nomes dos threads de trabalho não vão de um a dez, mas às vezes são maiores que dez. Olhando para os nomes únicos, vemos que realmente existem dez trabalhadores (3, 5, 9, 13, 17, 19, 21, 23, 27 e 31). Aqui é bastante razoável perguntar por que isso aconteceu? Sempre que você não entender o que está acontecendo, use o depurador.
Isso é o que faremos. Vamos lançar oexecutorServiceobjeto para um ForkJoinPool :final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;
Usaremos a ação Evaluate Expression para examinar esse objeto após chamar o método invokeAll . Para fazer isso, após o método invokeAll , adicione qualquer instrução, como um sout vazio, e defina um ponto de interrupção nele.

Podemos ver que o pool tem 10 threads, mas o tamanho da matriz de threads de trabalho é 32. Estranho, mas tudo bem. Vamos continuar cavando. Ao criar um pool, vamos tentar definir o nível de paralelismo para mais de 32, digamos 40.
ExecutorService executorService = Executors.newWorkStealingPool(40);
No depurador, vamos ver oobjeto forkJoinPool novamente.

Agora, o tamanho da matriz de threads de trabalho é 128. Podemos assumir que esta é uma das otimizações internas da JVM. Vamos tentar encontrá-lo no código do JDK (openjdk-14):

Exatamente como suspeitávamos: o tamanho do array de threads de trabalho é calculado realizando manipulações bit a bit no valor de paralelismo. Não precisamos tentar descobrir o que exatamente está acontecendo aqui. Basta simplesmente saber que tal otimização existe.
Outro aspecto interessante do nosso exemplo é o uso do método invokeAll . Vale ressaltar que o método invokeAll pode retornar um resultado, ou melhor, uma lista de resultados (no nosso caso, uma List<Future<Void>>) , onde podemos encontrar o resultado de cada uma das tarefas.
var results = executorService.invokeAll(tasks);
for (Future<Void> result : results) {
// Process the task's result
}
Esse tipo especial de pool de threads e serviços pode ser usado em tarefas com um nível de simultaneidade previsível ou, pelo menos, implícito.
GO TO FULL VERSION