CodeGym /Cours /JAVA 25 SELF /ForkJoinPool et RecursiveTask : tâches récursives

ForkJoinPool et RecursiveTask : tâches récursives

JAVA 25 SELF
Niveau 54 , Leçon 3
Disponible

1. ForkJoinPool : qu’est-ce que c’est et pourquoi l’utiliser

ForkJoinPool est un pool de threads spécial qui met en œuvre l’approche « diviser pour régner » (divide and conquer). Son objectif est de paralléliser le travail aussi efficacement que possible lorsqu’une grande tâche peut être divisée en un ensemble de sous-tâches indépendantes, exécutées en parallèle, puis de fusionner les résultats.

  • Fork (diviser) — la tâche est scindée en sous-tâches.
  • Join (fusionner) — les résultats des sous-tâches sont agrégés en un résultat final.

ForkJoinPool est le cœur des streams parallèles en Java : lorsque vous écrivez list.parallelStream(), c’est lui qui est utilisé sous le capot. Mais vous pouvez aussi l’utiliser directement pour avoir davantage de contrôle.

Quand ForkJoinPool est particulièrement utile

ForkJoinPool brille sur les tâches faciles à découper en parties indépendantes : par exemple, le traitement de très grands tableaux où chaque fragment est traité séparément, puis les résultats sont fusionnés.

  • La tâche se découpe facilement en sous-tâches indépendantes : tri, recherche, sommation.
  • Les sous-tâches sont d’un volume comparable et ne dépendent pas les unes des autres.
  • Vous voulez exploiter tous les cœurs du processeur pour une vitesse maximale.
+---------------------+
|   Grande tâche      |
+---------------------+
          |
          v
+---------+---------+
|  Sous-tâche 1     |
|  Sous-tâche 2     |
|  ...              |
+-------------------+
          |
          v
+---------+---------+
|  Résultats        |
+-------------------+

C’est exactement ainsi que fonctionne « diviser pour régner » : on divise — on calcule en parallèle — on fusionne.

2. RecursiveTask et RecursiveAction : les deux faces d’une même médaille

Dans ForkJoinPool, les tâches sont définies via des classes spéciales capables de se découper en sous-tâches et de fusionner les résultats. RecursiveTask<T> retourne un résultat, tandis que RecursiveAction n’en retourne pas. En pratique, on utilise plus souvent RecursiveTask, par exemple pour retourner une somme, un maximum ou un compte.

Pour créer une telle tâche, on hérite et on implémente la méthode compute(). On y décrit la logique : si la tâche est petite — on la résout immédiatement ; si elle est grande — on la divise en sous-tâches, on les lance en parallèle via fork() et on fusionne les résultats à l’aide de join(). C’est ainsi que naît un parallélisme récursif naturel.

3. Syntaxe et exemple : calcul parallèle de la somme d’un tableau

Supposons que nous ayons un grand tableau de nombres et que nous voulions calculer rapidement la somme de tous les éléments.

Étape 1. Classe de la tâche

import java.util.concurrent.RecursiveTask;

public class ArraySumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 1_000; // Seuil de découpage de la tâche
    private final int[] array;
    private final int start, end;

    public ArraySumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // Si la tâche est petite — on calcule directement
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // On divise la tâche en deux sous-tâches
            int mid = (start + end) / 2;
            ArraySumTask leftTask = new ArraySumTask(array, start, mid);
            ArraySumTask rightTask = new ArraySumTask(array, mid, end);

            // On lance les sous-tâches en parallèle
            leftTask.fork(); // Asynchrone
            long rightResult = rightTask.compute(); // Synchrone
            long leftResult = leftTask.join(); // On attend la fin de celle de gauche

            // On fusionne le résultat
            return leftResult + rightResult;
        }
    }
}
  • Si la tâche est petite (inférieure au seuil THRESHOLD) — on calcule la somme avec une boucle classique.
  • Si elle est grande — on la divise en deux, on en lance une de façon asynchrone via fork(), on calcule l’autre de façon synchrone via compute(), puis on fusionne via join().

Étape 2. Lancement de la tâche via ForkJoinPool

import java.util.concurrent.ForkJoinPool;

public class ForkJoinSumDemo {
    public static void main(String[] args) {
        int[] numbers = new int[10_000_000];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = 1; // Pour simplifier — la somme doit être égale à la longueur du tableau
        }

        ForkJoinPool pool = new ForkJoinPool(); // Par défaut — selon le nombre de cœurs

        ArraySumTask task = new ArraySumTask(numbers, 0, numbers.length);

        long result = pool.invoke(task); // Lancement de la tâche

        System.out.println("Somme des éléments du tableau : " + result);
    }
}

Comment cela fonctionne-t-il ?

  • ForkJoinPool décide lui-même combien de threads utiliser (généralement — selon le nombre de cœurs).
  • La tâche est automatiquement divisée en sous-tâches, chacune pouvant s’exécuter sur un cœur distinct.
  • Les performances sont généralement supérieures à celles d’un code séquentiel (surtout sur de gros volumes de données et des systèmes multicoeurs).

4. Comment fonctionne ForkJoinPool : un peu « sous le capot »

Work-stealing (vol de travail)

ForkJoinPool met en œuvre le « vol de travail » : si un thread a terminé ses tâches, il « vole » du travail à un autre thread. Cela assure un équilibrage efficace de la charge et l’utilisation de tous les cœurs.

Algorithme de base

  • La tâche principale est divisée en sous-tâches.
  • Les sous-tâches sont placées dans des files spécialisées.
  • Les threads prennent les tâches dans leurs propres files, et lorsqu’elles sont vides — ils « cherchent » du travail chez les voisins.
  • Une fois tout exécuté, les résultats sont fusionnés.

Schéma de fonctionnement

flowchart TD
    A[Tâche principale] --> B1[Sous-tâche 1]
    A --> B2[Sous-tâche 2]
    B1 --> C1[Petite tâche 1]
    B1 --> C2[Petite tâche 2]
    B2 --> C3[Petite tâche 3]
    B2 --> C4[Petite tâche 4]
    C1 --> D[Fusion des résultats]
    C2 --> D
    C3 --> D
    C4 --> D

5. RecursiveAction — si aucun résultat n’est à retourner

Si vous devez simplement exécuter quelque chose en parallèle sans retourner de résultat, utilisez RecursiveAction. Exemples typiques — paralléliser le remplissage d’un tableau, l’impression, le tri « en place », etc.

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction {
    private static final int THRESHOLD = 100;
    private final int[] array;
    private final int start, end;

    public PrintTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                System.out.print(array[i] + " ");
            }
        } else {
            int mid = (start + end) / 2;
            invokeAll(
                new PrintTask(array, start, mid),
                new PrintTask(array, mid, end)
            );
        }
    }
}

6. Pratique : recherche parallèle de la valeur maximale dans un tableau

import java.util.concurrent.RecursiveTask;

public class MaxFindTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 1000;
    private final int[] array;
    private final int start, end;

    public MaxFindTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int max = array[start];
            for (int i = start + 1; i < end; i++) {
                if (array[i] > max) max = array[i];
            }
            return max;
        } else {
            int mid = (start + end) / 2;
            MaxFindTask left = new MaxFindTask(array, start, mid);
            MaxFindTask right = new MaxFindTask(array, mid, end);
            left.fork();
            int rightResult = right.compute();
            int leftResult = left.join();
            return Math.max(leftResult, rightResult);
        }
    }
}

Exécution :

import java.util.concurrent.ForkJoinPool;

public class ForkJoinMaxDemo {
    public static void main(String[] args) {
        int[] array = new int[5_000_000];
        for (int i = 0; i < array.length; i++) {
            array[i] = (int)(Math.random() * 1_000_000);
        }

        ForkJoinPool pool = new ForkJoinPool();
        MaxFindTask task = new MaxFindTask(array, 0, array.length);

        int max = pool.invoke(task);

        System.out.println("Valeur maximale : " + max);
    }
}

7. Avantages et limites de ForkJoinPool

Avantages

  • Équilibrage de charge automatique. Le work-stealing permet d’utiliser efficacement tous les cœurs.
  • Simplicité. Pas besoin de créer et de gérer les threads manuellement.
  • Haute performance. Surtout sur de grandes tâches et des systèmes multicoeurs.
  • Flexibilité. Vous pouvez découper les tâches en autant de parties que nécessaire.

Limitations

  • Forte dépendance entre sous-tâches. Si les sous-tâches s’attendent souvent, le gain diminue.
  • Tâches trop fines. Les surcoûts de découpage/synchronisation peuvent « manger » l’avantage du parallélisme.
  • Effets de bord. Il ne faut pas modifier des variables partagées sans synchronisation — vous obtiendrez des conditions de course.
  • Applicabilité. Adapté aux tâches divisibles en parties indépendantes.

8. Erreurs courantes avec ForkJoinPool et RecursiveTask

Erreur n° 1 : Découpage trop fin de la tâche. Si le seuil (THRESHOLD) est trop petit, vous obtiendrez une myriade de micro-tâches — les coûts de création et de synchronisation dépasseront le bénéfice du parallélisme. Expérimentez avec le seuil : les valeurs optimales se situent souvent à des milliers ou des dizaines de milliers d’éléments.

Erreur n° 2 : Utilisation de variables partagées mutables. Si les sous-tâches écrivent dans une variable partagée sans synchronisation — vous obtiendrez des conditions de course (race condition). Retournez le résultat via compute() et fusionnez uniquement avec join().

Erreur n° 3 : Mauvaise utilisation de fork/join. Vous avez oublié d’appeler fork() ou join() — et la sous-tâche ne s’exécutera pas en parallèle ou le résultat sera « perdu ». Surveillez attentivement l’ordre des appels.

Erreur n° 4 : Démarrage d’une ForkJoinTask en dehors d’un ForkJoinPool. Si vous appelez simplement compute() sur la tâche, elle s’exécutera dans le thread courant, sans parallélisme. Pour la vraie magie, utilisez pool.invoke() ou pool.submit().

Erreur n° 5 : Ignorer les exceptions. Si une exception se produit dans la tâche, elle se manifestera lors de l’appel de join() ou invoke(). N’oubliez pas de gérer les erreurs.

Erreur n° 6 : Utiliser ForkJoinPool pour des tâches bloquantes. ForkJoinPool est peu adapté aux tâches qui se bloquent souvent (attente d’I/O, etc.). Dans ces cas, il vaut mieux utiliser un ExecutorService.

Commentaires
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION