CodeGym /Corsi /JAVA 25 SELF /Elaborazione parallela dei file: ForkJoin, Parallel Strea...

Elaborazione parallela dei file: ForkJoin, Parallel Streams

JAVA 25 SELF
Livello 59 , Lezione 1
Disponibile

1. Stream paralleli (parallelStream): semplice e comodo

Se avete già lavorato con l’Stream API, sapete quanto sia comodo filtrare, trasformare e raccogliere le collezioni. Un vantaggio importante: qualsiasi stream può diventare parallelo letteralmente con una riga — attivate parallel(), e gli elementi della collezione inizieranno a essere elaborati in concorrenza.

Questo è particolarmente utile per operazioni indipendenti su un insieme di file: contare le righe, cercare una sottostringa, copiare, comprimere ecc.

Esempio: conteggio parallelo delle righe in tutti i file di una directory

Variante 1: in sequenza

import java.nio.file.*;
import java.io.IOException;
import java.util.List;

public class LogLineCounter {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        long totalLines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
            for (Path file : stream) {
                long lines = Files.lines(file).count();
                totalLines += lines;
            }
        }
        System.out.println("Numero totale di righe in tutti i log: " + totalLines);
    }
}

Commento: tutto viene eseguito in sequenza, un file dopo l’altro. Se i file sono molti e grandi, l’attesa sarà lunga.

Variante 2: in parallelo!

import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;

public class LogLineCounterParallel {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        try (Stream<Path> files = Files.list(logDir)) {
            long totalLines = files
                .filter(path -> path.toString().endsWith(".log"))
                .parallel() // ecco tutta la magia!
                .mapToLong(file -> {
                    try (Stream<String> lines = Files.lines(file)) {
                        return lines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                        return 0L;
                    }
                })
                .sum();
            System.out.println("Numero totale di righe in tutti i log: " + totalLines);
        }
    }
}

Commento: la riga chiave — .parallel(). Su un processore multi-core, di norma, il programma funzionerà sensibilmente più velocemente.

Come funziona?

  • parallel() trasforma uno stream normale in parallelo. Sotto il cofano si usa il pool comune ForkJoinPool (per impostazione predefinita il numero di thread è pari ai core).
  • Ogni file viene elaborato indipendentemente, i risultati sono aggregati tramite operazioni terminali (ad esempio, sum()).
  • Se i file sono pochi — potrebbe non esserci un’accelerazione; se sono centinaia — il guadagno è di solito evidente.

Importante!

  • Gli stream paralleli non velocizzano le operazioni di I/O in sé; permettono di eseguirne diverse contemporaneamente. Su supporti veloci (SSD) questo aiuta, su quelli lenti (HDD) si può incorrere nel collo di bottiglia del disco.

2. ForkJoinPool: «divide et impera» in pratica

ForkJoin è un framework per il calcolo parallelo «divide et impera»: si scompone un compito grande in sottocompiti, li si esegue in parallelo e si combinano i risultati. A gestire il tutto c’è un pool speciale — ForkJoinPool. È proprio quello usato «dietro le quinte» dagli stream paralleli, ma lo si può controllare direttamente per una maggiore flessibilità.

Questo modello è particolarmente adatto per strutture ricorsive (alberi di directory), grandi array di dati e attività che si decompongono facilmente in parti indipendenti.

Esempio: ricerca ricorsiva nell’albero delle directory

Troviamo tutti i file ".txt" (incluse le sottocartelle) e contiamo il numero totale di righe.

import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;

public class FolderLineCounter extends RecursiveTask<Long> {
    private final Path dir;

    public FolderLineCounter(Path dir) {
        this.dir = dir;
    }

    @Override
    protected Long compute() {
        List<FolderLineCounter> subTasks = new ArrayList<>();
        long lines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
            for (Path entry : stream) {
                if (Files.isDirectory(entry)) {
                    FolderLineCounter task = new FolderLineCounter(entry);
                    task.fork(); // Avviamo una sottoattività
                    subTasks.add(task);
                } else if (entry.toString().endsWith(".txt")) {
                    try (Stream<String> fileLines = Files.lines(entry)) {
                        lines += fileLines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        // Raccogliamo i risultati dalle sottoattività
        for (FolderLineCounter task : subTasks) {
            lines += task.join();
        }
        return lines;
    }

    public static void main(String[] args) {
        Path root = Paths.get("big_folder");
        ForkJoinPool pool = new ForkJoinPool();
        FolderLineCounter counter = new FolderLineCounter(root);
        long totalLines = pool.invoke(counter);
        System.out.println("Numero totale di righe in tutti i file .txt: " + totalLines);
    }
}

Cosa succede qui:

  • Per ogni cartella viene creata un’attività separata (FolderLineCounter), per le sottocartelle — le proprie sottoattività (fork()).
  • I file vengono conteggiati sul posto, i risultati si sommano dopo il join() di tutte le sottoattività.

Quali sono i vantaggi di ForkJoin?

  • Funziona in modo efficiente con grandi gerarchie (alberi di directory).
  • Massimizza il carico sui core della CPU.
  • Permette di controllare con precisione la parallelizzazione e i confini dei compiti.

3. Casi d’uso pratici

Elaborazione massiva di file

Per esempio, è necessario copiare migliaia di foto in una cartella di backup.

import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelFileCopier {
    public static void main(String[] args) throws Exception {
        Path sourceDir = Paths.get("photos");
        Path destDir = Paths.get("photos_backup");
        Files.createDirectories(destDir);

        List<Path> files = Files.list(sourceDir)
                .filter(Files::isRegularFile)
                .collect(Collectors.toList());

        files.parallelStream().forEach(file -> {
            try {
                Path destFile = destDir.resolve(file.getFileName());
                Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        System.out.println("Tutti i file sono stati copiati!");
    }
}

Commento: ogni file viene copiato in un thread separato. Con un numero elevato di file, l’accelerazione è evidente.

Compressione/decompressione parallela

Analogamente è possibile parallelizzare la compressione, il calcolo degli hash, la conversione del formato delle immagini ecc. tramite parallelStream() o un proprio ForkJoinPool.

4. Avvertenze importanti e limitazioni

  • Le operazioni di I/O non traggono sempre vantaggio dal parallelismo. Se il disco o la rete sono il collo di bottiglia, centinaia di attività parallele aumenteranno solo la competizione per la risorsa.
  • Non avviate troppi thread. Per impostazione predefinita gli stream paralleli usano il pool comune ForkJoinPool.commonPool() con un livello di parallelismo ≈ al numero di core. Si può modificare tramite la proprietà "java.util.concurrent.ForkJoinPool.common.parallelism", ma fatelo con consapevolezza.
  • Non dimenticate la sincronizzazione. Se più thread scrivono nello stesso file/oggetto — usate sincronizzazione e code; per file indipendenti — la sincronizzazione non è necessaria.

5. Introduzione rapida a FileChannel e all’accesso posizionale

Per scenari avanzati (ad esempio, lettura parallela di parti diverse di un unico file grande) usate java.nio.channels.FileChannel, che supporta lettura/scrittura posizionale.

Esempio: lettura di parti diverse di un file in thread diversi

import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;

public class FileChunkReader implements Runnable {
    private final Path path;
    private final long position;
    private final int size;

    public FileChunkReader(Path path, long position, int size) {
        this.path = path;
        this.position = position;
        this.size = size;
    }

    @Override
    public void run() {
        try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(size);
            channel.read(buffer, position);
            // Elaborazione dei dati
            System.out.println("Letti " + buffer.position() + " byte a partire dalla posizione " + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Commento: avviate più attività di questo tipo — ciascuna legge il proprio intervallo. Ma attenzione: non tutti i dischi e i filesystem gradiscono un carico parallelo elevato.

6. Errori tipici nell’elaborazione parallela dei file

Errore n. 1: scrittura parallela in un unico file senza sincronizzazione. I dati si mescolano e si corrompono. Usate code, buffering e sincronizzazione della scrittura.

Errore n. 2: troppo parallelismo. Gli stream paralleli su hardware modesto o con file piccoli introducono overhead e possono rallentare l’esecuzione.

Errore n. 3: ignorare gli errori di I/O. Negli stream paralleli è facile «perdere» le eccezioni — gestitele all’interno delle lambda, fate logging e tenete conto dei guasti.

Errore n. 4: risorse non chiuse. Usate sempre try-with-resources per stream/canali, altrimenti avrete leak ed errori strani.

Errore n. 5: aspettarsi la «magia» da parallel(). Il parallelismo accelera solo con un carico di lavoro sufficiente e risorse disponibili (CPU, disco veloce). La sola chiamata a parallel() non è una bacchetta magica.

Commenti
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION