1. Wie man mehrere Threads bei der Dateiverarbeitung koordiniert
Wenn Sie mit großen Dateien oder komplexer Datenverarbeitung arbeiten, entsteht oft die Aufgabe, die Arbeit auf mehrere Threads aufzuteilen. Zum Beispiel liest ein Thread Zeilen aus einer Datei, während andere diese Zeilen verarbeiten (Wörter suchen, Statistiken berechnen usw.).
Wo liegt das Problem?
- Wenn alle Threads mit derselben Ressource arbeiten (z. B. mit einer Datei oder einer gemeinsamen Collection), sind Fehler leicht möglich: Einige Threads können andere „überholen“, es kommt zu Race Conditions, Daten gehen verloren, der Speicher wird überlastet.
- Wenn es viele Threads gibt, aber keine Koordination – dann stehen die einen herum, während die anderen überlastet sind.
Es braucht eine Lösung, die:
- einen sicheren Datenaustausch zwischen Threads ermöglicht,
- kein Speicherüberlaufen zulässt (wenn die Verarbeitung langsamer ist als das Einlesen),
- und die Arbeit aller Threads leicht beendet, sobald die Daten zu Ende sind.
2. Pattern „Producer–Consumer“ (Erzeuger–Verbraucher)
Producer–Consumer ist ein klassisches Pattern, das Threads hilft, reibungslos zusammenzuarbeiten, ohne sich gegenseitig zu stören.
Es gibt zwei Rollen: Der Producer (Erzeuger) erzeugt Daten – beispielsweise liest er Zeilen aus einer Datei oder erhält Nachrichten aus dem Netzwerk – und legt sie in eine gemeinsame Warteschlange. Der Consumer (Verbraucher) nimmt die Daten aus dieser Warteschlange und verarbeitet sie: zählt Wörter, speichert sie in einer Datenbank oder schreibt sie in eine andere Datei.
Die Hauptidee ist, dass diese beiden Thread-Typen unabhängig arbeiten. Der Producer kann schneller lesen, als der Consumer verarbeiten kann, oder umgekehrt – und trotzdem blockiert niemand dauerhaft. Dazwischen befindet sich ein Puffer – eine Warteschlange, die das Arbeitstempo ausgleicht.
Visuelle Darstellung
[Datei] --(liest)--> [Producer] --(legt in die Queue)--> [BlockingQueue] --(nimmt)--> [Consumer] --(verarbeitet)
3. Implementierung mit BlockingQueue
In Java eignet sich für einen solchen Austausch ideal das Interface BlockingQueue (zum Beispiel die Implementierung ArrayBlockingQueue).
Was ist eine BlockingQueue?
Eine BlockingQueue ist eine Thread-sichere Warteschlange mit begrenzter Größe, die sich selbst um die Synchronisation kümmert. Wenn der Producer versucht, ein Element hinzuzufügen, die Warteschlange aber bereits voll ist, wird der Thread automatisch blockiert und wartet, bis wieder Platz vorhanden ist. Analog dazu wartet der Consumer, wenn er ein Element entnehmen will und die Warteschlange leer ist, bis jemand etwas hineinlegt.
Dieser Mechanismus löst automatisch das klassische Problem von Race Conditions und Speicherüberlauf: Producer fluten die Warteschlange nicht mit überflüssigen Elementen, und Consumer versuchen nicht, mit „Leere“ zu arbeiten. Alle Threads erledigen ruhig und abgestimmt ihre Arbeit.
Beispiel zum Erstellen einer Warteschlange
import java.util.concurrent.*;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); // Puffer für 100 Elemente
100 ist die maximale Anzahl von Elementen in der Warteschlange. Wenn der Producer schneller ist, wartet er, bis der Consumer einen Teil der Daten verarbeitet hat.
4. Thread-Koordination: Backpressure und Beendigung
Begrenzung der Warteschlangengröße (Backpressure)
Backpressure ist ein Mechanismus, der verhindert, dass der Producer den gesamten Speicher „flutet“, wenn der Consumer mit der Verarbeitung nicht hinterherkommt.
- Wenn die Warteschlange voll ist, „bremst“ der Producer automatisch (die Methode put() blockiert).
- Wenn die Warteschlange leer ist, wartet der Consumer (die Methode take() blockiert).
So kann das System auch bei unterschiedlichen Thread-Geschwindigkeiten stabil arbeiten.
Beendigung: „poison pill“ (vergiftete Pille)
Wenn der Producer das Einlesen der Datei abgeschlossen hat, muss er den Consumern mitteilen, dass keine Daten mehr kommen und sie beenden sollen.
Lösung:
- In die Warteschlange wird ein spezielles Objekt gelegt – eine „poison pill“ (z. B. die Zeichenkette "__END__" oder ein anderer spezieller Wert, aber nicht null).
- Wenn der Consumer die „poison pill“ erhält, weiß er, dass er beenden soll.
Wenn es mehrere Consumer gibt, muss so viele „poison pills“ eingelegt werden, wie es Consumer gibt!
5. Beispiel-Pipeline: Datei lesen und Zeilen verarbeiten
Lassen Sie uns eine einfache Pipeline implementieren:
- Ein Thread liest Zeilen aus einer Datei und legt sie in die Warteschlange.
- Mehrere Threads nehmen Zeilen aus der Warteschlange, zählen die Anzahl der Wörter und geben das Ergebnis aus.
Schritt 1. Producer – Dateileser
import java.io.*;
import java.util.concurrent.*;
public class FileProducer implements Runnable {
private final BlockingQueue<String> queue;
private final File file;
private final int consumerCount;
private final String POISON_PILL = "__END__";
public FileProducer(BlockingQueue<String> queue, File file, int consumerCount) {
this.queue = queue;
this.file = file;
this.consumerCount = consumerCount;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
while ((line = reader.readLine()) != null) {
queue.put(line); // wenn die Warteschlange voll ist — warten
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
// Poison Pill für jeden Consumer einfügen
try {
for (int i = 0; i < consumerCount; i++) {
queue.put(POISON_PILL);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Schritt 2. Consumer – Zeilenverarbeiter
import java.util.concurrent.BlockingQueue;
public class LineConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final String POISON_PILL = "__END__";
public LineConsumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String line = queue.take(); // wenn die Warteschlange leer ist — warten
if (POISON_PILL.equals(line)) {
break; // beenden
}
int wordCount = line.trim().isEmpty() ? 0 : line.trim().split("\\s+").length;
System.out.println(Thread.currentThread().getName() + ": " + wordCount + " Wörter in der Zeile: " + line);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Schritt 3. Pipeline über Executors starten
import java.io.File;
import java.util.concurrent.*;
public class PipelineDemo {
public static void main(String[] args) throws Exception {
int consumerCount = 3;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
File file = new File("input.txt"); // Ihre Datei
// Producer starten
Thread producer = new Thread(new FileProducer(queue, file, consumerCount));
producer.start();
// Consumers über ExecutorService starten
ExecutorService consumers = Executors.newFixedThreadPool(consumerCount);
for (int i = 0; i < consumerCount; i++) {
consumers.submit(new LineConsumer(queue));
}
// Auf das Ende des Producers warten
producer.join();
// Auf das Ende der Consumers warten
consumers.shutdown();
consumers.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("Verarbeitung abgeschlossen!");
}
}
6. Visualisierung: Schema der Pipeline
flowchart LR
A[Producer: liest Datei] -- put() --> Q[BlockingQueue]
Q -- take() --> C1[Consumer 1: zählt Wörter]
Q -- take() --> C2[Consumer 2: zählt Wörter]
Q -- take() --> C3[Consumer 3: zählt Wörter]
A -.->|poison pill| Q
Q -.->|poison pill| C1
Q -.->|poison pill| C2
Q -.->|poison pill| C3
7. Nützliche Feinheiten und Best Practices
- Begrenzen Sie die Größe der Warteschlange – das schützt vor Speicherüberlauf.
- Verwenden Sie eine „poison pill“ zum Beenden – sonst können Consumer für immer hängen bleiben.
- Verwenden Sie null nicht als „poison pill“, wenn in der Warteschlange echte null-Werte vorkommen können. Besser eine spezielle Zeichenkette oder ein Objekt.
- Behandeln Sie InterruptedException – das ist wichtig für das saubere Beenden von Threads.
- Verwenden Sie ExecutorService zur Verwaltung des Thread-Pools – das ist einfacher und sicherer, als Threads manuell zu erstellen.
8. Typische Fehler bei der Implementierung einer Pipeline
Fehler Nr. 1: Unbegrenzte Warteschlange → OutOfMemoryError. Wenn man eine LinkedBlockingQueue ohne Größenbegrenzung verwendet, kann der Producer den gesamten Speicher „fluten“, wenn der Consumer nicht hinterherkommt.
Fehler Nr. 2: Keine „poison pill“ eingelegt → Consumer hängen. Wenn man die „vergifteten Pillen“ vergisst, warten die Consumer-Threads ewig auf neue Daten.
Fehler Nr. 3: Nur eine „poison pill“ eingelegt, aber es gibt mehrere Consumer. Jeder Consumer-Thread braucht seine eigene „Pille“ – sonst beenden sich nicht alle.
Fehler Nr. 4: InterruptedException nicht behandelt. Wenn ein Thread unterbrochen wird, sollte er sauber beendet werden (Unterbrechungsflag wiederherstellen), sonst sind „hängende“ Threads möglich.
Fehler Nr. 5: Gemeinsame Variable zwischen Threads ohne Synchronisation. Versuchen Sie nicht, das Rad neu zu erfinden – verwenden Sie BlockingQueue statt normaler Listen oder Arrays.
GO TO FULL VERSION