1. Stream paralleli (parallelStream): semplice e comodo
Se avete già lavorato con l’Stream API, sapete quanto sia comodo filtrare, trasformare e raccogliere le collezioni. Un vantaggio importante: qualsiasi stream può diventare parallelo letteralmente con una riga — attivate parallel(), e gli elementi della collezione inizieranno a essere elaborati in concorrenza.
Questo è particolarmente utile per operazioni indipendenti su un insieme di file: contare le righe, cercare una sottostringa, copiare, comprimere ecc.
Esempio: conteggio parallelo delle righe in tutti i file di una directory
Variante 1: in sequenza
import java.nio.file.*;
import java.io.IOException;
import java.util.List;
public class LogLineCounter {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
long totalLines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
for (Path file : stream) {
long lines = Files.lines(file).count();
totalLines += lines;
}
}
System.out.println("Numero totale di righe in tutti i log: " + totalLines);
}
}
Commento: tutto viene eseguito in sequenza, un file dopo l’altro. Se i file sono molti e grandi, l’attesa sarà lunga.
Variante 2: in parallelo!
import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;
public class LogLineCounterParallel {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
try (Stream<Path> files = Files.list(logDir)) {
long totalLines = files
.filter(path -> path.toString().endsWith(".log"))
.parallel() // ecco tutta la magia!
.mapToLong(file -> {
try (Stream<String> lines = Files.lines(file)) {
return lines.count();
} catch (IOException e) {
e.printStackTrace();
return 0L;
}
})
.sum();
System.out.println("Numero totale di righe in tutti i log: " + totalLines);
}
}
}
Commento: la riga chiave — .parallel(). Su un processore multi-core, di norma, il programma funzionerà sensibilmente più velocemente.
Come funziona?
- parallel() trasforma uno stream normale in parallelo. Sotto il cofano si usa il pool comune ForkJoinPool (per impostazione predefinita il numero di thread è pari ai core).
- Ogni file viene elaborato indipendentemente, i risultati sono aggregati tramite operazioni terminali (ad esempio, sum()).
- Se i file sono pochi — potrebbe non esserci un’accelerazione; se sono centinaia — il guadagno è di solito evidente.
Importante!
- Gli stream paralleli non velocizzano le operazioni di I/O in sé; permettono di eseguirne diverse contemporaneamente. Su supporti veloci (SSD) questo aiuta, su quelli lenti (HDD) si può incorrere nel collo di bottiglia del disco.
2. ForkJoinPool: «divide et impera» in pratica
ForkJoin è un framework per il calcolo parallelo «divide et impera»: si scompone un compito grande in sottocompiti, li si esegue in parallelo e si combinano i risultati. A gestire il tutto c’è un pool speciale — ForkJoinPool. È proprio quello usato «dietro le quinte» dagli stream paralleli, ma lo si può controllare direttamente per una maggiore flessibilità.
Questo modello è particolarmente adatto per strutture ricorsive (alberi di directory), grandi array di dati e attività che si decompongono facilmente in parti indipendenti.
Esempio: ricerca ricorsiva nell’albero delle directory
Troviamo tutti i file ".txt" (incluse le sottocartelle) e contiamo il numero totale di righe.
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;
public class FolderLineCounter extends RecursiveTask<Long> {
private final Path dir;
public FolderLineCounter(Path dir) {
this.dir = dir;
}
@Override
protected Long compute() {
List<FolderLineCounter> subTasks = new ArrayList<>();
long lines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path entry : stream) {
if (Files.isDirectory(entry)) {
FolderLineCounter task = new FolderLineCounter(entry);
task.fork(); // Avviamo una sottoattività
subTasks.add(task);
} else if (entry.toString().endsWith(".txt")) {
try (Stream<String> fileLines = Files.lines(entry)) {
lines += fileLines.count();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
// Raccogliamo i risultati dalle sottoattività
for (FolderLineCounter task : subTasks) {
lines += task.join();
}
return lines;
}
public static void main(String[] args) {
Path root = Paths.get("big_folder");
ForkJoinPool pool = new ForkJoinPool();
FolderLineCounter counter = new FolderLineCounter(root);
long totalLines = pool.invoke(counter);
System.out.println("Numero totale di righe in tutti i file .txt: " + totalLines);
}
}
Cosa succede qui:
- Per ogni cartella viene creata un’attività separata (FolderLineCounter), per le sottocartelle — le proprie sottoattività (fork()).
- I file vengono conteggiati sul posto, i risultati si sommano dopo il join() di tutte le sottoattività.
Quali sono i vantaggi di ForkJoin?
- Funziona in modo efficiente con grandi gerarchie (alberi di directory).
- Massimizza il carico sui core della CPU.
- Permette di controllare con precisione la parallelizzazione e i confini dei compiti.
3. Casi d’uso pratici
Elaborazione massiva di file
Per esempio, è necessario copiare migliaia di foto in una cartella di backup.
import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelFileCopier {
public static void main(String[] args) throws Exception {
Path sourceDir = Paths.get("photos");
Path destDir = Paths.get("photos_backup");
Files.createDirectories(destDir);
List<Path> files = Files.list(sourceDir)
.filter(Files::isRegularFile)
.collect(Collectors.toList());
files.parallelStream().forEach(file -> {
try {
Path destFile = destDir.resolve(file.getFileName());
Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println("Tutti i file sono stati copiati!");
}
}
Commento: ogni file viene copiato in un thread separato. Con un numero elevato di file, l’accelerazione è evidente.
Compressione/decompressione parallela
Analogamente è possibile parallelizzare la compressione, il calcolo degli hash, la conversione del formato delle immagini ecc. tramite parallelStream() o un proprio ForkJoinPool.
4. Avvertenze importanti e limitazioni
- Le operazioni di I/O non traggono sempre vantaggio dal parallelismo. Se il disco o la rete sono il collo di bottiglia, centinaia di attività parallele aumenteranno solo la competizione per la risorsa.
- Non avviate troppi thread. Per impostazione predefinita gli stream paralleli usano il pool comune ForkJoinPool.commonPool() con un livello di parallelismo ≈ al numero di core. Si può modificare tramite la proprietà "java.util.concurrent.ForkJoinPool.common.parallelism", ma fatelo con consapevolezza.
- Non dimenticate la sincronizzazione. Se più thread scrivono nello stesso file/oggetto — usate sincronizzazione e code; per file indipendenti — la sincronizzazione non è necessaria.
5. Introduzione rapida a FileChannel e all’accesso posizionale
Per scenari avanzati (ad esempio, lettura parallela di parti diverse di un unico file grande) usate java.nio.channels.FileChannel, che supporta lettura/scrittura posizionale.
Esempio: lettura di parti diverse di un file in thread diversi
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;
public class FileChunkReader implements Runnable {
private final Path path;
private final long position;
private final int size;
public FileChunkReader(Path path, long position, int size) {
this.path = path;
this.position = position;
this.size = size;
}
@Override
public void run() {
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(size);
channel.read(buffer, position);
// Elaborazione dei dati
System.out.println("Letti " + buffer.position() + " byte a partire dalla posizione " + position);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Commento: avviate più attività di questo tipo — ciascuna legge il proprio intervallo. Ma attenzione: non tutti i dischi e i filesystem gradiscono un carico parallelo elevato.
6. Errori tipici nell’elaborazione parallela dei file
Errore n. 1: scrittura parallela in un unico file senza sincronizzazione. I dati si mescolano e si corrompono. Usate code, buffering e sincronizzazione della scrittura.
Errore n. 2: troppo parallelismo. Gli stream paralleli su hardware modesto o con file piccoli introducono overhead e possono rallentare l’esecuzione.
Errore n. 3: ignorare gli errori di I/O. Negli stream paralleli è facile «perdere» le eccezioni — gestitele all’interno delle lambda, fate logging e tenete conto dei guasti.
Errore n. 4: risorse non chiuse. Usate sempre try-with-resources per stream/canali, altrimenti avrete leak ed errori strani.
Errore n. 5: aspettarsi la «magia» da parallel(). Il parallelismo accelera solo con un carico di lavoro sufficiente e risorse disponibili (CPU, disco veloce). La sola chiamata a parallel() non è una bacchetta magica.
GO TO FULL VERSION