CodeGym /Blog Java /Random-FR /Mieux ensemble : Java et la classe Thread. Partie V — Exe...
John Squirrels
Niveau 41
San Francisco

Mieux ensemble : Java et la classe Thread. Partie V — Executor, ThreadPool, Fork/Join

Publié dans le groupe Random-FR

Introduction

Donc, nous savons que Java a des threads. Vous pouvez lire à ce sujet dans la revue intitulée Mieux ensemble : Java et la classe Thread. Partie I — Threads d'exécution . Mieux ensemble : Java et la classe Thread.  Partie V — Executor, ThreadPool, Fork/Join - 1Reprenons le code typique :

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Comme vous pouvez le voir, le code pour démarrer une tâche est assez typique, mais nous devons le répéter pour une nouvelle tâche. Une solution consiste à le mettre dans une méthode distincte, par exemple execute(Runnable runnable). Mais les créateurs de Java ont réfléchi à notre sort et ont proposé l' Executorinterface :

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Ce code est clairement plus concis : maintenant, nous écrivons simplement du code pour démarrer le Runnablesur le thread. C'est super, n'est-ce pas ? Mais c'est seulement le début: Mieux ensemble : Java et la classe Thread.  Partie V — Executor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Comme vous pouvez le voir, l' Executorinterface a une ExecutorServicesous-interface. Le Javadoc de cette interface indique qu'un ExecutorServicedécrit un particulier Executorqui fournit des méthodes pour arrêter le Executor. Il permet également d'obtenir un java.util.concurrent.Futureafin de suivre le processus d'exécution. Précédemment, dans Mieux ensemble : Java et la classe Thread. Partie IV — Callable, Future et friends , nous avons brièvement passé en revue les fonctionnalités de Future. Si vous l'avez oublié ou ne l'avez jamais lu, je vous suggère de vous rafraîchir la mémoire ;) Que dit d'autre le Javadoc ? Il nous indique que nous avons une java.util.concurrent.Executorsusine spéciale qui nous permet de créer des implémentations par défaut de ExecutorService.

ExécuteurService

Revoyons. Nous devons Executorexécuter (c'est-à-dire appeler execute()) une certaine tâche sur un thread, et le code qui crée le thread nous est caché. Nous avons ExecutorService- un spécifique Executorqui a plusieurs options pour contrôler les progrès. Et nous avons l' Executorsusine qui nous permet de créer un fichier ExecutorService. Maintenant, faisons-le nous-mêmes :

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Vous pouvez voir que nous avons spécifié un pool de threads fixe dont la taille est de 2. Ensuite, nous soumettons les tâches au pool une par une. Chaque tâche renvoie un Stringcontenant le nom du thread ( currentThread().GetName()). Il est important de fermer le ExecutorServicetout à la fin, sinon notre programme ne se terminera pas. L' Executorsusine a des méthodes d'usine supplémentaires. Par exemple, nous pouvons créer un pool composé d'un seul thread ( newSingleThreadExecutor) ou un pool qui inclut un cache ( newCachedThreadPool) à partir duquel les threads sont supprimés après avoir été inactifs pendant 1 minute. En réalité, ceux-ci ExecutorServicesont soutenus par une file d'attente de blocage , dans laquelle les tâches sont placées et à partir de laquelle les tâches sont exécutées. Vous trouverez plus d'informations sur le blocage des files d'attente dans cette vidéo . Vous pouvez aussi lire ceciavis sur BlockingQueue . Et consultez la réponse à la question "Quand préférer LinkedBlockingQueue à ArrayBlockingQueue ?" En termes simples, a BlockingQueuebloque un thread dans deux cas :
  • le thread tente d'obtenir des éléments d'une file d'attente vide
  • le thread tente de placer des éléments dans une file d'attente complète
Si nous regardons la mise en œuvre des méthodes d'usine, nous pouvons voir comment elles fonctionnent. Par exemple:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
ou

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Comme nous pouvons le voir, les implémentations de ExecutorServicesont créées à l'intérieur des méthodes d'usine. Et pour la plupart, nous parlons de ThreadPoolExecutor. Seuls les paramètres affectant le travail sont modifiés. Mieux ensemble : Java et la classe Thread.  Partie V — Executor, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Comme nous l'avons vu précédemment, ThreadPoolExecutorc'est ce qui est généralement créé dans les méthodes d'usine. La fonctionnalité est affectée par les arguments que nous transmettons comme nombre maximum et minimum de threads, ainsi que par le type de file d'attente utilisé. Mais n'importe quelle implémentation de l' java.util.concurrent.BlockingQueueinterface peut être utilisée. En parlant de ThreadPoolExecutor, nous devons mentionner quelques fonctionnalités intéressantes. Par exemple, vous ne pouvez pas soumettre de tâches à un ThreadPoolExecutors'il n'y a pas d'espace disponible :

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Ce code plantera avec une erreur comme celle-ci :

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
En d'autres termes, taskne peut pas être soumis, car SynchronousQueueil est conçu de telle sorte qu'il se compose en fait d'un seul élément et ne nous permet pas d'y ajouter quoi que ce soit de plus. Nous pouvons voir que nous avons zéro queued tasks("tâches en file d'attente = 0") ici. Mais il n'y a rien d'étrange à cela, car c'est une particularité de SynchronousQueue, qui est en fait une file d'attente à 1 élément toujours vide ! Lorsqu'un thread place un élément dans la file d'attente, il attendra qu'un autre thread prenne l'élément de la file d'attente. En conséquence, nous pouvons le remplacer par new LinkedBlockingQueue<>(1)et l'erreur sera modifiée pour afficher maintenant queued tasks = 1. Parce que la file d'attente n'est qu'un élément, nous ne pouvons pas ajouter un deuxième élément. Et c'est ce qui fait échouer le programme. Poursuivant notre discussion sur la file d'attente, il convient de noter que leThreadPoolExecutorLa classe a des méthodes supplémentaires pour gérer la file d'attente. Par exemple, la threadPoolExecutor.purge()méthode supprimera toutes les tâches annulées de la file d'attente afin de libérer de l'espace dans la file d'attente. Une autre fonction intéressante liée à la file d'attente est le gestionnaire des tâches rejetées :

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Dans cet exemple, notre gestionnaire affiche simplement Rejectedchaque fois qu'une tâche dans la file d'attente est rejetée. Pratique, n'est-ce pas ? De plus, ThreadPoolExecutora une sous-classe intéressante : ScheduledThreadPoolExecutor, qui est un ScheduledExecutorService. Il offre la possibilité d'effectuer une tâche basée sur une minuterie.

ScheduledExecutorService

ScheduledExecutorService(qui est un type de ExecutorService) nous permet d'exécuter des tâches selon un calendrier. Regardons un exemple :

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Tout est simple ici. Les tâches sont soumises, puis nous obtenons un fichier java.util.concurrent.ScheduledFuture. Un calendrier peut également être utile dans la situation suivante :

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Ici, nous soumettons une Runnabletâche pour exécution à une fréquence fixe ("FixedRate") avec un certain délai initial. Dans ce cas, après 1 seconde, la tâche commencera à être exécutée toutes les 2 secondes. Il existe une option similaire :

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Mais dans ce cas, les tâches sont exécutées avec un intervalle spécifique ENTRE chaque exécution. C'est-à-dire que le tasksera exécuté après 1 seconde. Ensuite, dès qu'il est terminé, 2 secondes s'écouleront, puis une nouvelle tâche sera lancée. Voici quelques ressources supplémentaires sur ce sujet : Mieux ensemble : Java et la classe Thread.  Partie V — Executor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

TravailVolerPiscine

En plus des pools de threads ci-dessus, il y en a un de plus. On peut honnêtement dire que c'est un peu spécial. C'est ce qu'on appelle un pool de voleurs de travail. En bref, le vol de travail est un algorithme dans lequel les threads inactifs commencent à prendre des tâches d'autres threads ou des tâches d'une file d'attente partagée. Regardons un exemple :

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Si nous exécutons ce code, le ExecutorServicecréera 5 threads pour nous, car chaque thread sera placé dans la file d'attente de l'objet de verrouillage. Nous avons déjà compris les moniteurs et les verrous dans Better Together : Java et la classe Thread. Partie II — Synchronisation . Remplaçons maintenant Executors.newCachedThreadPool()par Executors.newWorkStealingPool(). Qu'est-ce qui va changer ? Nous verrons que nos tâches sont exécutées sur moins de 5 threads. N'oubliez pas que CachedThreadPoolcrée un fil pour chaque tâche ? En effet wait(), le thread a été bloqué, les tâches suivantes doivent être terminées et de nouveaux threads ont été créés pour eux dans le pool. Avec un pool de vol, les threads ne restent pas inactifs indéfiniment. Ils commencent à effectuer les tâches de leurs voisins. Qu'est-ce qui rend a WorkStealingPoolsi différent des autres pools de threads ? Le fait que la magieForkJoinPoolvit à l'intérieur:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
En fait, il y a encore une différence. Par défaut, les threads créés pour a ForkJoinPoolsont des threads démons, contrairement aux threads créés via un onrdinary ThreadPool. En général, vous devez vous souvenir des threads démons, car, par exemple, CompletableFutureutilise également des threads démons à moins que vous ne spécifiiez le vôtre ThreadFactoryqui crée des threads non démons. Ce sont les surprises qui peuvent se cacher dans des endroits inattendus ! :)

ForkRejoindrePool

Dans cette partie, nous reparlerons de ForkJoinPool(également appelé framework fork/join), qui vit "sous le capot" de WorkStealingPool. En général, le framework fork/join est apparu dans Java 1.7. Et même si Java 11 est à portée de main, il convient de s'en souvenir. Ce n'est pas l'implémentation la plus courante, mais c'est assez intéressant. Il y a une bonne critique à ce sujet sur le web : Understanding Java Fork-Join Framework with Examples . Le ForkJoinPools'appuie sur java.util.concurrent.RecursiveTask. Il y a aussi java.util.concurrent.RecursiveAction. RecursiveActionne renvoie pas de résultat. Ainsi, RecursiveTaskest similaire à Callable, et RecursiveActionest similaire à unnable. Nous pouvons voir que le nom inclut les noms de deux méthodes importantes : forket join. LeforkLa méthode démarre une tâche de manière asynchrone sur un thread séparé. Et la joinméthode vous permet d'attendre que le travail soit fait. Pour obtenir la meilleure compréhension, vous devriez lire From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .

Résumé

Eh bien, cela conclut cette partie de l'examen. Nous avons appris qu'il Executora été inventé à l'origine pour exécuter des threads. Ensuite, les créateurs de Java ont décidé de poursuivre l'idée et ont proposé ExecutorService. ExecutorServicenous permet de soumettre des tâches pour exécution à l'aide submit()de et invoke(), et également d'arrêter le service. Parce qu'il ExecutorServicea besoin d'implémentations, ils ont écrit une classe avec des méthodes d'usine et l'ont appelée Executors. Il vous permet de créer des pools de threads ( ThreadPoolExecutor). De plus, il existe des pools de threads qui nous permettent également de spécifier un calendrier d'exécution. Et un ForkJoinPoolse cache derrière un WorkStealingPool. J'espère que vous avez trouvé ce que j'ai écrit ci-dessus non seulement intéressant, mais aussi compréhensible :) Je suis toujours heureux d'entendre vos suggestions et commentaires. Mieux ensemble : Java et la classe Thread. Partie I — Threads d'exécution Mieux ensemble : Java et la classe Thread. Partie II — Synchronisation Mieux ensemble : Java et la classe Thread. Partie III — Interaction Mieux ensemble : Java et la classe Thread. Partie IV — Callable, Future et friends Mieux ensemble : Java et la classe Thread. Partie VI — Tirez !
Commentaires
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION