introduzione
Quindi, sappiamo che Java ha thread. Puoi leggerlo nella recensione intitolata Better together: Java and the Thread class. Parte I — Fili di esecuzione .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Come puoi vedere, il codice per avviare un'attività è piuttosto tipico, ma dobbiamo ripeterlo per una nuova attività. Una soluzione è inserirla in un metodo separato, ad es execute(Runnable runnable)
. Ma i creatori di Java hanno considerato la nostra situazione e hanno ideato l' Executor
interfaccia:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Questo codice è chiaramente più conciso: ora scriviamo semplicemente il codice per avviare il Runnable
thread. È fantastico, vero? Ma è solo l'inizio: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
interfaccia ha una ExecutorService
sottointerfaccia. Il Javadoc per questa interfaccia dice che an ExecutorService
descrive un particolare Executor
che fornisce metodi per chiudere il file Executor
. Consente inoltre di ottenere un java.util.concurrent.Future
per tracciare il processo di esecuzione. In precedenza, in Better together: Java e la classe Thread. Parte IV — Callable, Future e amici , abbiamo brevemente esaminato le funzionalità di Future
. Se l'hai dimenticato o non l'hai mai letto, ti suggerisco di rinfrescarti la memoria ;) Cos'altro dice il Javadoc? Ci dice che abbiamo una java.util.concurrent.Executors
fabbrica speciale che ci consente di creare implementazioni predefinite di ExecutorService
.
ExecutorService
Ripassiamo. DobbiamoExecutor
eseguire (cioè richiamare execute()
) un determinato task su un thread, e il codice che crea il thread ci è nascosto. Abbiamo ExecutorService
- uno specifico Executor
che ha diverse opzioni per controllare i progressi. E abbiamo la Executors
fabbrica che ci consente di creare un file ExecutorService
. Ora facciamolo da soli:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Puoi vedere che abbiamo specificato un pool di thread fisso la cui dimensione è 2. Quindi inviamo le attività al pool una per una. Ogni attività restituisce un String
contenente il nome del thread ( currentThread().GetName()
). È importante chiudere ExecutorService
alla fine, perché altrimenti il nostro programma non finirà. La Executors
fabbrica ha metodi di fabbrica aggiuntivi. Ad esempio, possiamo creare un pool costituito da un solo thread ( newSingleThreadExecutor
) o un pool che include una cache ( newCachedThreadPool
) da cui i thread vengono rimossi dopo che sono rimasti inattivi per 1 minuto. In realtà, questi ExecutorService
sono supportati da una coda di blocco , in cui vengono inseriti i compiti e da cui vengono eseguiti i compiti. Ulteriori informazioni sul blocco delle code sono disponibili in questo video . Puoi anche leggere questorecensione su BlockingQueue . E controlla la risposta alla domanda "Quando preferire LinkedBlockingQueue su ArrayBlockingQueue?" In termini più semplici, a BlockingQueue
blocca un thread in due casi:
- il thread tenta di ottenere elementi da una coda vuota
- il thread tenta di inserire gli elementi in una coda piena
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
O
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Come possiamo vedere, le implementazioni di ExecutorService
vengono create all'interno dei metodi factory. E per la maggior parte, stiamo parlando di ThreadPoolExecutor
. Vengono modificati solo i parametri che influenzano il lavoro. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Come abbiamo visto in precedenza,ThreadPoolExecutor
è ciò che di solito viene creato all'interno dei metodi factory. La funzionalità è influenzata dagli argomenti che passiamo come numero massimo e minimo di thread, nonché dal tipo di coda utilizzata. Ma qualsiasi implementazione dell'interfaccia java.util.concurrent.BlockingQueue
può essere utilizzata. A proposito di ThreadPoolExecutor
, dovremmo menzionare alcune caratteristiche interessanti. Ad esempio, non puoi inviare attività a ThreadPoolExecutor
se non c'è spazio disponibile:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Questo codice andrà in crash con un errore come questo:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
In altre parole, task
non può essere inviato, perché SynchronousQueue
è progettato in modo tale da essere costituito da un unico elemento e non permette di inserirci altro. queued tasks
Possiamo vedere che qui abbiamo zero ("attività in coda = 0"). Ma non c'è niente di strano in questo, perché questa è una caratteristica speciale di SynchronousQueue
, che in realtà è una coda di 1 elemento sempre vuota! Quando un thread inserisce un elemento nella coda, attende finché un altro thread non prende l'elemento dalla coda. Di conseguenza, possiamo sostituirlo con new LinkedBlockingQueue<>(1)
e l'errore cambierà in now show queued tasks = 1
. Poiché la coda è composta da un solo elemento, non possiamo aggiungere un secondo elemento. Ed è questo che fa fallire il programma. Continuando la nostra discussione sulla coda, vale la pena notare che il fileThreadPoolExecutor
class ha metodi aggiuntivi per servire la coda. Ad esempio, il threadPoolExecutor.purge()
metodo rimuoverà tutte le attività annullate dalla coda per liberare spazio nella coda. Un'altra interessante funzione relativa alla coda è il gestore delle attività rifiutate:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
In questo esempio, il nostro gestore visualizza semplicemente Rejected
ogni volta che un'attività in coda viene rifiutata. Comodo, no? Inoltre, ThreadPoolExecutor
ha un'interessante sottoclasse: ScheduledThreadPoolExecutor
, che è un ScheduledExecutorService
. Fornisce la possibilità di eseguire un'attività in base a un timer.
ScheduledExecutorService
ScheduledExecutorService
(che è un tipo di ExecutorService
) ci consente di eseguire attività in base a una pianificazione. Diamo un'occhiata a un esempio:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Tutto è semplice qui. Le attività vengono inviate e quindi otteniamo un file java.util.concurrent.ScheduledFuture
. Un programma può anche essere utile nella seguente situazione:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Qui inviamo un'attività Runnable
per l'esecuzione a una frequenza fissa ("FixedRate") con un certo ritardo iniziale. In questo caso, dopo 1 secondo, l'attività inizierà ad essere eseguita ogni 2 secondi. C'è un'opzione simile:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Ma in questo caso, le attività vengono eseguite con un intervallo specifico TRA ogni esecuzione. Cioè, task
verrà eseguito dopo 1 secondo. Quindi, non appena sarà completato, passeranno 2 secondi e quindi verrà avviata una nuova attività. Ecco alcune risorse aggiuntive su questo argomento:
- Un'introduzione ai pool di thread in Java
- Introduzione ai pool di thread in Java
- Steeplechase multithreading Java: annullamento di attività negli esecutori
- Utilizzo di esecutori Java per attività in background

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
LavoroStealingPool
Oltre ai pool di thread di cui sopra, ce n'è uno in più. Possiamo onestamente dire che è un po' speciale. Si chiama piscina per rubare il lavoro. In breve, il furto di lavoro è un algoritmo in cui i thread inattivi iniziano a prendere attività da altri thread o attività da una coda condivisa. Diamo un'occhiata a un esempio:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Se eseguiamo questo codice, creeremo ExecutorService
5 thread per noi, perché ogni thread verrà inserito nella coda di attesa per l'oggetto lock. Abbiamo già individuato monitor e lock in Better together: Java e la classe Thread. Parte II — Sincronizzazione . Ora sostituiamo Executors.newCachedThreadPool()
con Executors.newWorkStealingPool()
. Cosa cambierà? Vedremo che le nostre attività vengono eseguite su meno di 5 thread. Ricordi che CachedThreadPool
crea un thread per ogni attività? Questo perché wait()
ha bloccato il thread, le attività successive vogliono essere completate e sono stati creati nuovi thread per loro nel pool. Con un pool rubato, i thread non rimangono inattivi per sempre. Cominciano a svolgere i compiti dei loro vicini. Cosa rende WorkStealingPool
così diverso da altri pool di thread? Il fatto che il magicoForkJoinPool
ci vive dentro:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
In realtà, c'è un'altra differenza. Per impostazione predefinita, i thread creati per a ForkJoinPool
sono thread daemon, a differenza dei thread creati tramite un onrdinary ThreadPool
. In generale, dovresti ricordare i thread daemon, perché, ad esempio, CompletableFuture
usa anche i thread daemon a meno che tu non specifichi il tuo ThreadFactory
che crea thread non daemon. Queste sono le sorprese che potrebbero nascondersi in luoghi inaspettati! :)
ForkJoinPool
In questa parte parleremo ancora diForkJoinPool
(chiamato anche framework fork/join), che vive "sotto il cofano" di WorkStealingPool
. In generale, il framework fork/join è apparso in Java 1.7. E anche se Java 11 è a portata di mano, vale comunque la pena ricordarlo. Questa non è l'implementazione più comune, ma è piuttosto interessante. C'è una buona recensione su questo sul web: Understanding Java Fork-Join Framework with Examples . Il ForkJoinPool
si basa su java.util.concurrent.RecursiveTask
. C'è anche java.util.concurrent.RecursiveAction
. RecursiveAction
non restituisce un risultato. Quindi, RecursiveTask
è simile a Callable
ed RecursiveAction
è simile a unnable
. Possiamo vedere che il nome include i nomi di due metodi importanti: fork
e join
. ILfork
Il metodo avvia alcune attività in modo asincrono su un thread separato. E il join
metodo ti consente di aspettare che il lavoro sia terminato. Per ottenere la migliore comprensione, dovresti leggere From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .
Riepilogo
Bene, questo conclude questa parte della recensione. Abbiamo appreso cheExecutor
è stato originariamente inventato per eseguire thread. Quindi i creatori di Java hanno deciso di continuare l'idea e hanno inventato ExecutorService
. ExecutorService
ci consente di inviare attività per l'esecuzione utilizzando submit()
e invoke()
e di arrestare anche il servizio. Poiché ExecutorService
necessita di implementazioni, hanno scritto una classe con metodi factory e l'hanno chiamata Executors
. Ti consente di creare pool di thread ( ThreadPoolExecutor
). Inoltre, ci sono pool di thread che ci consentono anche di specificare un programma di esecuzione. E si ForkJoinPool
nasconde dietro a WorkStealingPool
. Spero che tu abbia trovato quello che ho scritto sopra non solo interessante, ma anche comprensibile :) Sono sempre felice di sentire i tuoi suggerimenti e commenti. Meglio insieme: Java e la classe Thread. Parte I — Thread di esecuzione Meglio insieme: Java e la classe Thread. Parte II — Sincronizzazione Meglio insieme: Java e la classe Thread. Parte III — Interazione Meglio insieme: Java e la classe Thread. Parte IV — Callable, Future e friends Better together: Java e la classe Thread. Parte VI - Spara via!
GO TO FULL VERSION