Introduction
Donc, nous savons que Java a des threads. Vous pouvez lire à ce sujet dans la revue intitulée Mieux ensemble : Java et la classe Thread. Partie I — Threads d'exécution . Reprenons le code typique :
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Comme vous pouvez le voir, le code pour démarrer une tâche est assez typique, mais nous devons le répéter pour une nouvelle tâche. Une solution consiste à le mettre dans une méthode distincte, par exemple execute(Runnable runnable)
. Mais les créateurs de Java ont réfléchi à notre sort et ont proposé l' Executor
interface :
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Ce code est clairement plus concis : maintenant, nous écrivons simplement du code pour démarrer le Runnable
sur le thread. C'est super, n'est-ce pas ? Mais c'est seulement le début:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
interface a une ExecutorService
sous-interface. Le Javadoc de cette interface indique qu'un ExecutorService
décrit un particulier Executor
qui fournit des méthodes pour arrêter le Executor
. Il permet également d'obtenir un java.util.concurrent.Future
afin de suivre le processus d'exécution. Précédemment, dans Mieux ensemble : Java et la classe Thread. Partie IV — Callable, Future et friends , nous avons brièvement passé en revue les fonctionnalités de Future
. Si vous l'avez oublié ou ne l'avez jamais lu, je vous suggère de vous rafraîchir la mémoire ;) Que dit d'autre le Javadoc ? Il nous indique que nous avons une java.util.concurrent.Executors
usine spéciale qui nous permet de créer des implémentations par défaut de ExecutorService
.
ExécuteurService
Revoyons. Nous devonsExecutor
exécuter (c'est-à-dire appeler execute()
) une certaine tâche sur un thread, et le code qui crée le thread nous est caché. Nous avons ExecutorService
- un spécifique Executor
qui a plusieurs options pour contrôler les progrès. Et nous avons l' Executors
usine qui nous permet de créer un fichier ExecutorService
. Maintenant, faisons-le nous-mêmes :
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Vous pouvez voir que nous avons spécifié un pool de threads fixe dont la taille est de 2. Ensuite, nous soumettons les tâches au pool une par une. Chaque tâche renvoie un String
contenant le nom du thread ( currentThread().GetName()
). Il est important de fermer le ExecutorService
tout à la fin, sinon notre programme ne se terminera pas. L' Executors
usine a des méthodes d'usine supplémentaires. Par exemple, nous pouvons créer un pool composé d'un seul thread ( newSingleThreadExecutor
) ou un pool qui inclut un cache ( newCachedThreadPool
) à partir duquel les threads sont supprimés après avoir été inactifs pendant 1 minute. En réalité, ceux-ci ExecutorService
sont soutenus par une file d'attente de blocage , dans laquelle les tâches sont placées et à partir de laquelle les tâches sont exécutées. Vous trouverez plus d'informations sur le blocage des files d'attente dans cette vidéo . Vous pouvez aussi lire ceciavis sur BlockingQueue . Et consultez la réponse à la question "Quand préférer LinkedBlockingQueue à ArrayBlockingQueue ?" En termes simples, a BlockingQueue
bloque un thread dans deux cas :
- le thread tente d'obtenir des éléments d'une file d'attente vide
- le thread tente de placer des éléments dans une file d'attente complète
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ou
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Comme nous pouvons le voir, les implémentations de ExecutorService
sont créées à l'intérieur des méthodes d'usine. Et pour la plupart, nous parlons de ThreadPoolExecutor
. Seuls les paramètres affectant le travail sont modifiés.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Comme nous l'avons vu précédemment,ThreadPoolExecutor
c'est ce qui est généralement créé dans les méthodes d'usine. La fonctionnalité est affectée par les arguments que nous transmettons comme nombre maximum et minimum de threads, ainsi que par le type de file d'attente utilisé. Mais n'importe quelle implémentation de l' java.util.concurrent.BlockingQueue
interface peut être utilisée. En parlant de ThreadPoolExecutor
, nous devons mentionner quelques fonctionnalités intéressantes. Par exemple, vous ne pouvez pas soumettre de tâches à un ThreadPoolExecutor
s'il n'y a pas d'espace disponible :
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Ce code plantera avec une erreur comme celle-ci :
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
En d'autres termes, task
ne peut pas être soumis, car SynchronousQueue
il est conçu de telle sorte qu'il se compose en fait d'un seul élément et ne nous permet pas d'y ajouter quoi que ce soit de plus. Nous pouvons voir que nous avons zéro queued tasks
("tâches en file d'attente = 0") ici. Mais il n'y a rien d'étrange à cela, car c'est une particularité de SynchronousQueue
, qui est en fait une file d'attente à 1 élément toujours vide ! Lorsqu'un thread place un élément dans la file d'attente, il attendra qu'un autre thread prenne l'élément de la file d'attente. En conséquence, nous pouvons le remplacer par new LinkedBlockingQueue<>(1)
et l'erreur sera modifiée pour afficher maintenant queued tasks = 1
. Parce que la file d'attente n'est qu'un élément, nous ne pouvons pas ajouter un deuxième élément. Et c'est ce qui fait échouer le programme. Poursuivant notre discussion sur la file d'attente, il convient de noter que leThreadPoolExecutor
La classe a des méthodes supplémentaires pour gérer la file d'attente. Par exemple, la threadPoolExecutor.purge()
méthode supprimera toutes les tâches annulées de la file d'attente afin de libérer de l'espace dans la file d'attente. Une autre fonction intéressante liée à la file d'attente est le gestionnaire des tâches rejetées :
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Dans cet exemple, notre gestionnaire affiche simplement Rejected
chaque fois qu'une tâche dans la file d'attente est rejetée. Pratique, n'est-ce pas ? De plus, ThreadPoolExecutor
a une sous-classe intéressante : ScheduledThreadPoolExecutor
, qui est un ScheduledExecutorService
. Il offre la possibilité d'effectuer une tâche basée sur une minuterie.
ScheduledExecutorService
ScheduledExecutorService
(qui est un type de ExecutorService
) nous permet d'exécuter des tâches selon un calendrier. Regardons un exemple :
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Tout est simple ici. Les tâches sont soumises, puis nous obtenons un fichier java.util.concurrent.ScheduledFuture
. Un calendrier peut également être utile dans la situation suivante :
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ici, nous soumettons une Runnable
tâche pour exécution à une fréquence fixe ("FixedRate") avec un certain délai initial. Dans ce cas, après 1 seconde, la tâche commencera à être exécutée toutes les 2 secondes. Il existe une option similaire :
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Mais dans ce cas, les tâches sont exécutées avec un intervalle spécifique ENTRE chaque exécution. C'est-à-dire que le task
sera exécuté après 1 seconde. Ensuite, dès qu'il est terminé, 2 secondes s'écouleront, puis une nouvelle tâche sera lancée. Voici quelques ressources supplémentaires sur ce sujet :
- Une introduction aux pools de threads en Java
- Introduction aux pools de threads en Java
- Java Multithreading Steeplechase : annulation de tâches dans les exécuteurs
- Utilisation des exécuteurs Java pour les tâches en arrière-plan
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
TravailVolerPiscine
En plus des pools de threads ci-dessus, il y en a un de plus. On peut honnêtement dire que c'est un peu spécial. C'est ce qu'on appelle un pool de voleurs de travail. En bref, le vol de travail est un algorithme dans lequel les threads inactifs commencent à prendre des tâches d'autres threads ou des tâches d'une file d'attente partagée. Regardons un exemple :
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Si nous exécutons ce code, le ExecutorService
créera 5 threads pour nous, car chaque thread sera placé dans la file d'attente de l'objet de verrouillage. Nous avons déjà compris les moniteurs et les verrous dans Better Together : Java et la classe Thread. Partie II — Synchronisation . Remplaçons maintenant Executors.newCachedThreadPool()
par Executors.newWorkStealingPool()
. Qu'est-ce qui va changer ? Nous verrons que nos tâches sont exécutées sur moins de 5 threads. N'oubliez pas que CachedThreadPool
crée un fil pour chaque tâche ? En effet wait()
, le thread a été bloqué, les tâches suivantes doivent être terminées et de nouveaux threads ont été créés pour eux dans le pool. Avec un pool de vol, les threads ne restent pas inactifs indéfiniment. Ils commencent à effectuer les tâches de leurs voisins. Qu'est-ce qui rend a WorkStealingPool
si différent des autres pools de threads ? Le fait que la magieForkJoinPool
vit à l'intérieur:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
En fait, il y a encore une différence. Par défaut, les threads créés pour a ForkJoinPool
sont des threads démons, contrairement aux threads créés via un onrdinary ThreadPool
. En général, vous devez vous souvenir des threads démons, car, par exemple, CompletableFuture
utilise également des threads démons à moins que vous ne spécifiiez le vôtre ThreadFactory
qui crée des threads non démons. Ce sont les surprises qui peuvent se cacher dans des endroits inattendus ! :)
ForkRejoindrePool
Dans cette partie, nous reparlerons deForkJoinPool
(également appelé framework fork/join), qui vit "sous le capot" de WorkStealingPool
. En général, le framework fork/join est apparu dans Java 1.7. Et même si Java 11 est à portée de main, il convient de s'en souvenir. Ce n'est pas l'implémentation la plus courante, mais c'est assez intéressant. Il y a une bonne critique à ce sujet sur le web : Understanding Java Fork-Join Framework with Examples . Le ForkJoinPool
s'appuie sur java.util.concurrent.RecursiveTask
. Il y a aussi java.util.concurrent.RecursiveAction
. RecursiveAction
ne renvoie pas de résultat. Ainsi, RecursiveTask
est similaire à Callable
, et RecursiveAction
est similaire à unnable
. Nous pouvons voir que le nom inclut les noms de deux méthodes importantes : fork
et join
. Lefork
La méthode démarre une tâche de manière asynchrone sur un thread séparé. Et la join
méthode vous permet d'attendre que le travail soit fait. Pour obtenir la meilleure compréhension, vous devriez lire From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .
Résumé
Eh bien, cela conclut cette partie de l'examen. Nous avons appris qu'ilExecutor
a été inventé à l'origine pour exécuter des threads. Ensuite, les créateurs de Java ont décidé de poursuivre l'idée et ont proposé ExecutorService
. ExecutorService
nous permet de soumettre des tâches pour exécution à l'aide submit()
de et invoke()
, et également d'arrêter le service. Parce qu'il ExecutorService
a besoin d'implémentations, ils ont écrit une classe avec des méthodes d'usine et l'ont appelée Executors
. Il vous permet de créer des pools de threads ( ThreadPoolExecutor
). De plus, il existe des pools de threads qui nous permettent également de spécifier un calendrier d'exécution. Et un ForkJoinPool
se cache derrière un WorkStealingPool
. J'espère que vous avez trouvé ce que j'ai écrit ci-dessus non seulement intéressant, mais aussi compréhensible :) Je suis toujours heureux d'entendre vos suggestions et commentaires. Mieux ensemble : Java et la classe Thread. Partie I — Threads d'exécution Mieux ensemble : Java et la classe Thread. Partie II — Synchronisation Mieux ensemble : Java et la classe Thread. Partie III — Interaction Mieux ensemble : Java et la classe Thread. Partie IV — Callable, Future et friends Mieux ensemble : Java et la classe Thread. Partie VI — Tirez !
GO TO FULL VERSION