CodeGym /Blogue Java /Random-PT /Melhor juntos: Java e a classe Thread. Parte V — Executor...
John Squirrels
Nível 41
San Francisco

Melhor juntos: Java e a classe Thread. Parte V — Executor, ThreadPool, Fork/Join

Publicado no grupo Random-PT

Introdução

Então, sabemos que Java tem threads. Você pode ler sobre isso na revisão intitulada Better together: Java and the Thread class. Parte I — Threads de execução . Melhor juntos: Java e a classe Thread.  Parte V — Executor, ThreadPool, Fork/Join - 1Vamos dar outra olhada no código típico:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Como você pode ver, o código para iniciar uma tarefa é bastante comum, mas temos que repeti-lo para uma nova tarefa. Uma solução é colocá-lo em um método separado, por exemplo execute(Runnable runnable). Mas os criadores do Java consideraram nossa situação e criaram a Executorinterface:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Este código é claramente mais conciso: agora simplesmente escrevemos o código para iniciar o Runnablena thread. Isso é ótimo, não é? Mas isso é só o começo: Melhor juntos: Java e a classe Thread.  Parte V — Executor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Como você pode ver, a Executorinterface tem uma ExecutorServicesubinterface. O Javadoc para esta interface diz que um ExecutorServicedescreve um particular Executorque fornece métodos para desligar o Executor. Também possibilita a obtenção de um java.util.concurrent.Futurea fim de acompanhar o processo de execução. Anteriormente, em Better Together: Java e a classe Thread. Parte IV — Callable, Future e friends , revisamos brevemente os recursos do Future. Se você esqueceu ou nunca leu, sugiro que refresque a memória ;) O que mais o Javadoc diz? Ele nos diz que temos uma java.util.concurrent.Executorsfábrica especial que nos permite criar implementações padrão de ExecutorService.

ExecutorService

Vamos revisar. Temos Executorque executar (ou seja, chamar execute()) uma determinada tarefa em um thread, e o código que cria o thread está oculto para nós. Temos ExecutorService— um específico Executorque tem várias opções para controlar o progresso. E temos a Executorsfábrica que nos permite criar um arquivo ExecutorService. Agora vamos fazer nós mesmos:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Você pode ver que especificamos um pool de encadeamento fixo cujo tamanho é 2. Em seguida, enviamos as tarefas para o pool, uma a uma. Cada tarefa retorna um Stringcontendo o nome do thread ( currentThread().GetName()). É importante fechar o logo ExecutorServiceno final, senão nosso programa não vai acabar. A Executorsfábrica tem métodos de fábrica adicionais. Por exemplo, podemos criar um pool composto por apenas um thread ( newSingleThreadExecutor) ou um pool que inclua um cache ( newCachedThreadPool) do qual os threads são removidos após ficarem ociosos por 1 minuto. Na realidade, eles ExecutorServicesão apoiados por uma fila de bloqueio , na qual as tarefas são colocadas e a partir da qual as tarefas são executadas. Mais informações sobre como bloquear filas podem ser encontradas neste vídeo . Você também pode ler issorevisão sobre BlockingQueue . E confira a resposta para a pergunta "Quando preferir LinkedBlockingQueue em vez de ArrayBlockingQueue?" Em termos simples, a BlockingQueuebloqueia uma thread em dois casos:
  • o thread tenta obter itens de uma fila vazia
  • o thread tenta colocar itens em uma fila cheia
Se olharmos para a implementação dos métodos de fábrica, podemos ver como eles funcionam. Por exemplo:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
ou

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Como podemos ver, as implementações de ExecutorServicesão criadas dentro dos métodos de fábrica. E, na maioria das vezes, estamos falando de ThreadPoolExecutor. Apenas os parâmetros que afetam o trabalho são alterados. Melhor juntos: Java e a classe Thread.  Parte V — Executor, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Como vimos anteriormente, ThreadPoolExecutoré o que geralmente é criado dentro dos métodos de fábrica. A funcionalidade é afetada pelos argumentos que passamos como o número máximo e mínimo de threads, bem como o tipo de fila que está sendo usado. Mas qualquer implementação da java.util.concurrent.BlockingQueueinterface pode ser usada. Falando nisso ThreadPoolExecutor, devemos mencionar alguns recursos interessantes. Por exemplo, você não pode enviar tarefas para um ThreadPoolExecutorse não houver espaço disponível:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Este código irá falhar com um erro como este:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Em outras palavras, tasknão pode ser submetido, porque SynchronousQueueé projetado para que seja realmente composto por um único elemento e não nos permite colocar mais nada nele. Podemos ver que temos zero queued tasks("tarefas na fila = 0") aqui. Mas não há nada de estranho nisso, porque essa é uma característica especial de SynchronousQueue, que na verdade é uma fila de 1 elemento que está sempre vazia! Quando uma thread coloca um elemento na fila, ela espera até que outra thread pegue o elemento da fila. Assim, podemos substituí-lo por new LinkedBlockingQueue<>(1)e o erro será alterado para agora mostrar queued tasks = 1. Como a fila tem apenas 1 elemento, não podemos adicionar um segundo elemento. E é isso que faz com que o programa falhe. Continuando nossa discussão sobre fila, vale a pena notar que oThreadPoolExecutorA classe tem métodos adicionais para atender a fila. Por exemplo, o threadPoolExecutor.purge()método removerá todas as tarefas canceladas da fila para liberar espaço na fila. Outra função interessante relacionada à fila é o manipulador de tarefas rejeitadas:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Neste exemplo, nosso manipulador simplesmente exibe Rejectedcada vez que uma tarefa na fila é rejeitada. Conveniente, não é? Além disso, ThreadPoolExecutorpossui uma subclasse interessante: ScheduledThreadPoolExecutor, que é um ScheduledExecutorService. Ele fornece a capacidade de executar uma tarefa com base em um cronômetro.

ScheduledExecutorService

ScheduledExecutorService(que é um tipo de ExecutorService) nos permite executar tarefas em um cronograma. Vejamos um exemplo:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Tudo é simples aqui. As tarefas são enviadas e, em seguida, obtemos um arquivo java.util.concurrent.ScheduledFuture. Um cronograma também pode ser útil na seguinte situação:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Aqui enviamos uma Runnabletarefa para execução em uma frequência fixa ("FixedRate") com um certo atraso inicial. Neste caso, após 1 segundo, a tarefa passará a ser executada a cada 2 segundos. Existe uma opção semelhante:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Mas neste caso, as tarefas são executadas com um intervalo específico ENTRE cada execução. Ou seja, o taskserá executado após 1 segundo. Então, assim que for concluído, 2 segundos se passarão e uma nova tarefa será iniciada. Aqui estão alguns recursos adicionais sobre este tópico: Melhor juntos: Java e a classe Thread.  Parte V — Executor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Além dos pools de threads acima, há mais um. Podemos dizer honestamente que é um pouco especial. É chamado de pool de roubo de trabalho. Resumindo, work-stealing é um algoritmo no qual threads ociosas começam a receber tarefas de outras threads ou tarefas de uma fila compartilhada. Vejamos um exemplo:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Se executarmos esse código, ele ExecutorServicecriará 5 threads para nós, porque cada thread será colocado na fila de espera para o objeto de bloqueio. Já descobrimos monitores e bloqueios em Better juntos: Java e a classe Thread. Parte II — Sincronização . Agora vamos substituir Executors.newCachedThreadPool()por Executors.newWorkStealingPool(). O que vai mudar? Veremos que nossas tarefas são executadas em menos de 5 threads. Lembra que CachedThreadPoolcria um thread para cada tarefa? Isso ocorre porque wait()o encadeamento foi bloqueado, as tarefas subsequentes desejam ser concluídas e novos encadeamentos foram criados para eles no pool. Com um pool roubado, os threads não ficam ociosos para sempre. Eles começam a realizar as tarefas de seus vizinhos. O que torna um WorkStealingPoolpool de threads tão diferente de outros? O fato de que o mágicoForkJoinPoolvive dentro dela:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Na verdade, há mais uma diferença. Por padrão, os encadeamentos criados para a ForkJoinPoolsão encadeamentos daemon, ao contrário dos encadeamentos criados por meio de um onrdinary ThreadPool. Em geral, você deve se lembrar de encadeamentos daemon, porque, por exemplo, CompletableFuturetambém usa encadeamentos daemon, a menos que você especifique o seu próprio ThreadFactoryque cria encadeamentos não daemon. Estas são as surpresas que podem estar à espreita em lugares inesperados! :)

ForkJoinPoolName

Nesta parte, falaremos novamente sobre ForkJoinPool(também chamado de estrutura fork/join), que vive "sob o capô" do WorkStealingPool. Em geral, a estrutura fork/join apareceu no Java 1.7. E mesmo que o Java 11 esteja próximo, ainda vale a pena lembrar. Esta não é a implementação mais comum, mas é bastante interessante. Há uma boa revisão sobre isso na web: Understanding Java Fork-Join Framework with Example . O ForkJoinPooldepende de java.util.concurrent.RecursiveTask. Há também java.util.concurrent.RecursiveAction. RecursiveActionnão retorna um resultado. Assim, RecursiveTaské semelhante a Callablee RecursiveActioné semelhante a unnable. Podemos ver que o nome inclui os nomes de dois métodos importantes: forke join. OforkO método inicia alguma tarefa de forma assíncrona em um thread separado. E o joinmétodo permite que você espere o trabalho ser feito. Para obter o melhor entendimento, você deve ler From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .

Resumo

Bem, isso encerra esta parte da revisão. Aprendemos que Executorfoi originalmente inventado para executar threads. Então, os criadores do Java decidiram continuar a ideia e criaram o ExecutorService. ExecutorServicenos permite enviar tarefas para execução usando submit()e invoke(), e também desligar o serviço. Como ExecutorServiceprecisa de implementações, eles escreveram uma classe com métodos de fábrica e a chamaram de Executors. Permite criar pools de threads ( ThreadPoolExecutor). Além disso, existem pools de threads que também nos permitem especificar um cronograma de execução. E um ForkJoinPoolse esconde atrás de um WorkStealingPool. Espero que você tenha achado o que escrevi acima não apenas interessante, mas também compreensível :) Fico sempre feliz em ouvir suas sugestões e comentários. Melhor juntos: Java e a classe Thread. Parte I — Threads de execução Melhor juntos: Java e a classe Thread. Parte II — Sincronização melhor juntos: Java e a classe Thread. Parte III — Interação melhor juntos: Java e a classe Thread. Parte IV — Callable, Future e amigos Melhor juntos: Java e a classe Thread. Parte VI — Atire!
Comentários
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION