CodeGym /Kurse /JAVA 25 SELF /ForkJoinPool und RecursiveTask: rekursive Aufgaben

ForkJoinPool und RecursiveTask: rekursive Aufgaben

JAVA 25 SELF
Level 54 , Lektion 3
Verfügbar

1. ForkJoinPool: Was ist das und wozu wird er benötigt

ForkJoinPool ist ein spezieller Thread-Pool, der den Ansatz „Teile und herrsche“ (divide and conquer) umsetzt. Seine Aufgabe ist es, die Arbeit möglichst effizient zu parallelisieren, wenn sich eine große Aufgabe in eine Reihe unabhängiger Teilaufgaben zerlegen lässt, die parallel ausgeführt und anschließend zusammengeführt werden.

  • Fork (teilen) – die Aufgabe wird in Teilaufgaben aufgeteilt.
  • Join (zusammenführen) – die Ergebnisse der Teilaufgaben werden zum Gesamtergebnis aggregiert.

ForkJoinPool ist das Herz der parallelen Streams in Java: Wenn Sie list.parallelStream() schreiben, wird intern genau er verwendet. Sie können ihn aber auch direkt einsetzen und so mehr Kontrolle erhalten.

Wann ist ForkJoinPool besonders nützlich

ForkJoinPool spielt seine Stärken bei Aufgaben aus, die sich leicht in unabhängige Teile zerlegen lassen: zum Beispiel die Verarbeitung sehr großer Arrays, bei der jedes Segment separat bearbeitet und die Ergebnisse anschließend zusammengeführt werden.

  • Die Aufgabe lässt sich leicht in unabhängige Teilaufgaben zerlegen: Sortieren, Suchen, Summieren.
  • Die Teilaufgaben sind ungefähr gleich groß und voneinander unabhängig.
  • Alle CPU-Kerne sollen für maximale Geschwindigkeit ausgenutzt werden.
+---------------------+
|    Große Aufgabe    |
+---------------------+
          |
          v
+---------+---------+
|  Teilaufgabe 1    |
|  Teilaufgabe 2    |
|  ...              |
+-------------------+
          |
          v
+---------+---------+
|  Ergebnisse       |
+-------------------+

Genau so funktioniert „Teile und herrsche“: zerlegen – parallel berechnen – zusammenführen.

2. RecursiveTask und RecursiveAction: zwei Seiten derselben Medaille

Im ForkJoinPool werden Aufgaben über spezielle Klassen formuliert, die sich in Teilaufgaben zerlegen und deren Ergebnisse zusammenführen können. RecursiveTask<T> liefert ein Ergebnis zurück, RecursiveAction nicht. In der Praxis wird RecursiveTask häufiger verwendet, um etwa Summe, Maximum oder Anzahl zurückzugeben.

Um eine solche Aufgabe zu erstellen, erben wir und implementieren die Methode compute(). Darin beschreiben wir die Logik: Ist die Aufgabe klein, lösen wir sie sofort; ist sie groß, teilen wir sie in Teilaufgaben, starten diese parallel über fork() und führen die Ergebnisse über join() zusammen. So entsteht natürlicher rekursiver Parallelismus.

3. Syntax und Beispiel: parallele Berechnung der Arraysumme

Angenommen, wir haben ein großes Zahlenarray und möchten die Summe aller Elemente schnell berechnen.

Schritt 1. Aufgabenklasse

import java.util.concurrent.RecursiveTask;

public class ArraySumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 1_000; // Schwellenwert zum Aufteilen der Aufgabe
    private final int[] array;
    private final int start, end;

    public ArraySumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // Wenn die Aufgabe klein ist - direkt berechnen
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            // Aufgabe in zwei Teilaufgaben aufteilen
            int mid = (start + end) / 2;
            ArraySumTask leftTask = new ArraySumTask(array, start, mid);
            ArraySumTask rightTask = new ArraySumTask(array, mid, end);

            // Teilaufgaben parallel starten
            leftTask.fork(); // Asynchron
            long rightResult = rightTask.compute(); // Synchron
            long leftResult = leftTask.join(); // Auf Abschluss der linken warten

            // Ergebnis zusammenführen
            return leftResult + rightResult;
        }
    }
}
  • Ist die Aufgabe klein (kleiner als der Schwellenwert THRESHOLD) – berechnen wir die Summe mit einer normalen Schleife.
  • Ist sie groß, teilen wir sie in zwei Teile, starten eine asynchron per fork(), berechnen die andere synchron per compute() und führen anschließend per join() zusammen.

Schritt 2. Aufgabe über ForkJoinPool starten

import java.util.concurrent.ForkJoinPool;

public class ForkJoinSumDemo {
    public static void main(String[] args) {
        int[] numbers = new int[10_000_000];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = 1; // Der Einfachheit halber - die Summe sollte der Arraylänge entsprechen
        }

        ForkJoinPool pool = new ForkJoinPool(); // Standardmäßig - entsprechend der Kernanzahl

        ArraySumTask task = new ArraySumTask(numbers, 0, numbers.length);

        long result = pool.invoke(task); // Aufgabe starten

        System.out.println("Summe der Arrayelemente: " + result);
    }
}

Wie funktioniert das?

  • ForkJoinPool entscheidet selbst, wie viele Threads verwendet werden (in der Regel entsprechend der Anzahl der Kerne).
  • Die Aufgabe wird automatisch in Teilaufgaben zerlegt, die jeweils auf einem eigenen Kern ausgeführt werden können.
  • Die Leistung ist in der Regel höher als bei sequentiellem Code (insbesondere bei großen Datenmengen und Mehrkernsystemen).

4. Wie der ForkJoinPool funktioniert: ein Blick „unter die Haube“

Work-Stealing (‚Arbeit stehlen‘)

ForkJoinPool implementiert „Work-Stealing“: Wenn einem Thread die Aufgaben ausgehen, „stiehlt“ er Arbeit von einem anderen Thread. Das sorgt für eine effiziente Lastverteilung und eine gute Auslastung aller Kerne.

Grundalgorithmus

  • Die Hauptaufgabe wird in Teilaufgaben zerlegt.
  • Die Teilaufgaben werden in spezialisierte Warteschlangen gelegt.
  • Threads entnehmen Aufgaben aus ihren eigenen Warteschlangen und „suchen“ bei Leerlauf Arbeit bei den Nachbarn.
  • Wenn alles erledigt ist, werden die Ergebnisse zusammengeführt.

Ablaufschema

flowchart TD
    A[Hauptaufgabe] --> B1[Teilaufgabe 1]
    A --> B2[Teilaufgabe 2]
    B1 --> C1[Kleine Aufgabe 1]
    B1 --> C2[Kleine Aufgabe 2]
    B2 --> C3[Kleine Aufgabe 3]
    B2 --> C4[Kleine Aufgabe 4]
    C1 --> D[Zusammenführung der Ergebnisse]
    C2 --> D
    C3 --> D
    C4 --> D

5. RecursiveAction – wenn kein Ergebnis zurückgegeben werden muss

Wenn etwas einfach parallel ausgeführt werden soll und kein Ergebnis zurückgegeben wird, verwenden Sie RecursiveAction. Typische Beispiele sind das parallele Befüllen eines Arrays, das Drucken oder In-Place-Sortieren usw.

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction {
    private static final int THRESHOLD = 100;
    private final int[] array;
    private final int start, end;

    public PrintTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                System.out.print(array[i] + " ");
            }
        } else {
            int mid = (start + end) / 2;
            invokeAll(
                new PrintTask(array, start, mid),
                new PrintTask(array, mid, end)
            );
        }
    }
}

6. Praxis: parallele Suche nach dem Maximum in einem Array

import java.util.concurrent.RecursiveTask;

public class MaxFindTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 1000;
    private final int[] array;
    private final int start, end;

    public MaxFindTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int max = array[start];
            for (int i = start + 1; i < end; i++) {
                if (array[i] > max) max = array[i];
            }
            return max;
        } else {
            int mid = (start + end) / 2;
            MaxFindTask left = new MaxFindTask(array, start, mid);
            MaxFindTask right = new MaxFindTask(array, mid, end);
            left.fork();
            int rightResult = right.compute();
            int leftResult = left.join();
            return Math.max(leftResult, rightResult);
        }
    }
}

Ausführung:

import java.util.concurrent.ForkJoinPool;

public class ForkJoinMaxDemo {
    public static void main(String[] args) {
        int[] array = new int[5_000_000];
        for (int i = 0; i < array.length; i++) {
            array[i] = (int)(Math.random() * 1_000_000);
        }

        ForkJoinPool pool = new ForkJoinPool();
        MaxFindTask task = new MaxFindTask(array, 0, array.length);

        int max = pool.invoke(task);

        System.out.println("Maximalwert: " + max);
    }
}

7. Vorteile und Einschränkungen des ForkJoinPool

Vorteile

  • Automatische Lastverteilung. Work-Stealing ermöglicht die effiziente Nutzung aller Kerne.
  • Bequemlichkeit. Es ist nicht nötig, Threads manuell zu erstellen und zu verwalten.
  • Hohe Leistung. Besonders bei großen Aufgaben und Mehrkernsystemen.
  • Flexibilität. Aufgaben lassen sich in so viele Teile zerlegen, wie nötig.

Einschränkungen

  • Starke Kopplung zwischen Teilaufgaben. Wenn Teilaufgaben häufig aufeinander warten, sinkt der Nutzen.
  • Zu feinkörnige Aufgaben. Overhead für Aufteilung/Synchronisation kann den Vorteil des Parallelismus „auffressen“.
  • Seiteneffekte. Gemeinsame veränderliche Variablen dürfen ohne Synchronisation nicht geändert werden – sonst drohen Race Conditions.
  • Anwendbarkeit. Geeignet für Aufgaben, die in unabhängige Teile zerlegt werden können.

8. Typische Fehler bei der Arbeit mit ForkJoinPool und RecursiveTask

Fehler Nr. 1: Zu feine Aufteilung der Aufgabe. Ist der Schwellenwert (THRESHOLD) zu klein, entstehen sehr viele winzige Aufgaben – der Aufwand für Erstellung und Synchronisation übersteigt den Gewinn durch Parallelität. Experimentieren Sie mit dem Schwellenwert: optimale Werte liegen oft im Bereich von Tausenden oder Zehntausenden Elementen.

Fehler Nr. 2: Verwendung gemeinsamer veränderlicher Variablen. Wenn Teilaufgaben in eine gemeinsame Variable ohne Synchronisation schreiben, entstehen Datenrennen (race condition). Geben Sie Ergebnisse über compute() zurück und führen Sie nur in join() zusammen.

Fehler Nr. 3: Falsche Nutzung von fork/join. Wird fork() oder join() vergessen, läuft die Teilaufgabe nicht parallel oder das Ergebnis „geht verloren“. Achten Sie sorgfältig auf die Aufrufreihenfolge.

Fehler Nr. 4: Start einer ForkJoinTask außerhalb eines ForkJoinPool. Wenn compute() direkt aufgerufen wird, läuft die Aufgabe im aktuellen Thread – ohne Parallelität. Für die echte Magie verwenden Sie pool.invoke() oder pool.submit().

Fehler Nr. 5: Ausnahmen ignorieren. Tritt in einer Aufgabe eine Ausnahme auf, zeigt sie sich beim Aufruf von join() oder invoke(). Vergessen Sie nicht, Fehler zu behandeln.

Fehler Nr. 6: Einsatz von ForkJoinPool für blockierende Aufgaben. ForkJoinPool eignet sich schlecht für Aufgaben, die häufig blockieren (I/O warten usw.). In solchen Fällen ist ExecutorService die bessere Wahl.

Kommentare
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION