CodeGym /Cursos /JAVA 25 SELF /Procesamiento paralelo de archivos: ForkJoin, Parallel St...

Procesamiento paralelo de archivos: ForkJoin, Parallel Streams

JAVA 25 SELF
Nivel 59 , Lección 1
Disponible

1. Streams paralelos (parallelStream): simple y cómodo

Si ya has trabajado con la API de Stream, sabes lo cómodo que es filtrar, transformar y recopilar colecciones. Una ventaja importante: cualquier flujo se puede hacer paralelo con una sola línea — activar parallel(), y los elementos de la colección empezarán a procesarse de forma concurrente.

Esto es especialmente útil para operaciones independientes sobre un conjunto de archivos: contar líneas, buscar una subcadena, copiar, comprimir, etc.

Ejemplo: recuento paralelo de líneas en todos los archivos del directorio

Opción 1: secuencial

import java.nio.file.*;
import java.io.IOException;
import java.util.List;

public class LogLineCounter {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        long totalLines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
            for (Path file : stream) {
                long lines = Files.lines(file).count();
                totalLines += lines;
            }
        }
        System.out.println("Total de líneas en todos los logs: " + totalLines);
    }
}

Comentario: todo se hace en orden, un archivo tras otro. Si hay muchos archivos y son grandes, habrá que esperar bastante.

Opción 2: ¡en paralelo!

import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;

public class LogLineCounterParallel {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        try (Stream<Path> files = Files.list(logDir)) {
            long totalLines = files
                .filter(path -> path.toString().endsWith(".log"))
                .parallel() // ¡ahí está toda la magia!
                .mapToLong(file -> {
                    try (Stream<String> lines = Files.lines(file)) {
                        return lines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                        return 0L;
                    }
                })
                .sum();
            System.out.println("Total de líneas en todos los logs: " + totalLines);
        }
    }
}

Comentario: la línea clave — .parallel(). En un procesador multinúcleo, por lo general, el programa funcionará notablemente más rápido.

¿Cómo funciona?

  • parallel() convierte un stream normal en paralelo. Bajo el capó se utiliza el pool común ForkJoinPool (por defecto hay tantos hilos como núcleos).
  • Cada archivo se procesa de forma independiente; los resultados se agregan mediante operaciones terminales (por ejemplo, sum()).
  • Si hay pocos archivos, puede que no haya aceleración; si hay cientos, la mejora suele ser notable.

¡Importante!

  • Los streams paralelos no aceleran en sí las operaciones de E/S; permiten realizar varias operaciones a la vez. En unidades rápidas (SSD) ayuda, en lentas (HDD) puedes topar con el cuello de botella del disco.

2. ForkJoinPool: «divide y vencerás» en la práctica

ForkJoin es un framework para cálculos paralelos de «divide y vencerás»: dividimos una tarea grande en subtareas, las ejecutamos en paralelo y combinamos los resultados. Esto lo gestiona un pool especial — ForkJoinPool. Es el que se utiliza «entre bambalinas» en los streams paralelos, pero también se puede gestionar directamente para mayor flexibilidad.

Este modelo es especialmente bueno para estructuras recursivas (árboles de directorios), grandes arreglos de datos y tareas que se descomponen fácilmente en partes independientes.

Ejemplo: búsqueda recursiva en un árbol de directorios

Buscaremos todos los archivos ".txt" (incluidas las carpetas anidadas) y contaremos la cantidad total de líneas.

import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;

public class FolderLineCounter extends RecursiveTask<Long> {
    private final Path dir;

    public FolderLineCounter(Path dir) {
        this.dir = dir;
    }

    @Override
    protected Long compute() {
        List<FolderLineCounter> subTasks = new ArrayList<>();
        long lines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
            for (Path entry : stream) {
                if (Files.isDirectory(entry)) {
                    FolderLineCounter task = new FolderLineCounter(entry);
                    task.fork(); // Lanzamos la subtarea
                    subTasks.add(task);
                } else if (entry.toString().endsWith(".txt")) {
                    try (Stream<String> fileLines = Files.lines(entry)) {
                        lines += fileLines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        // Recogemos los resultados de las subtareas
        for (FolderLineCounter task : subTasks) {
            lines += task.join();
        }
        return lines;
    }

    public static void main(String[] args) {
        Path root = Paths.get("big_folder");
        ForkJoinPool pool = new ForkJoinPool();
        FolderLineCounter counter = new FolderLineCounter(root);
        long totalLines = pool.invoke(counter);
        System.out.println("Total de líneas en todos los archivos .txt: " + totalLines);
    }
}

Qué sucede aquí:

  • Para cada carpeta se crea una tarea independiente (FolderLineCounter); para los subdirectorios — sus propias subtareas (fork()).
  • Los archivos se cuentan in situ; los resultados se suman tras join() de todas las subtareas.

¿Cuál es la ventaja de ForkJoin?

  • Funciona de manera eficiente con jerarquías grandes (árboles de directorios).
  • Aprovecha al máximo los núcleos del procesador.
  • Permite controlar con precisión el paralelismo y los límites de las tareas.

3. Escenarios prácticos de uso

Procesamiento masivo de archivos

Por ejemplo, hay que copiar miles de fotos a una carpeta de respaldo.

import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelFileCopier {
    public static void main(String[] args) throws Exception {
        Path sourceDir = Paths.get("photos");
        Path destDir = Paths.get("photos_backup");
        Files.createDirectories(destDir);

        List<Path> files = Files.list(sourceDir)
                .filter(Files::isRegularFile)
                .collect(Collectors.toList());

        files.parallelStream().forEach(file -> {
            try {
                Path destFile = destDir.resolve(file.getFileName());
                Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        System.out.println("¡Todos los archivos se han copiado!");
    }
}

Comentario: cada archivo se copia en un hilo separado. Con un gran número de archivos, la aceleración es notable.

Compresión/descompresión en paralelo

De forma análoga, se pueden paralelizar la compresión, el recálculo de hashes, la conversión de formato de imágenes, etc., mediante parallelStream() o con un ForkJoinPool propio.

4. Notas importantes y limitaciones

  • Las operaciones de E/S no siempre se benefician del paralelismo. Si el disco o la red son el cuello de botella, un centenar de tareas paralelas solo aumentará la competencia por el recurso.
  • No lances demasiados hilos. Por defecto, los streams paralelos usan el pool común ForkJoinPool.commonPool() con un nivel de paralelismo ≈ al número de núcleos. Se puede cambiar mediante la propiedad "java.util.concurrent.ForkJoinPool.common.parallelism", pero hazlo con conocimiento de causa.
  • No te olvides de la sincronización. Si varios hilos escriben en el mismo archivo/objeto, usa sincronización y colas; para archivos independientes, no hace falta sincronizar.

5. Breve introducción a FileChannel y al acceso posicional

Para escenarios avanzados (por ejemplo, lectura paralela de distintas partes de un archivo grande) utiliza java.nio.channels.FileChannel, que soporta lectura/escritura posicional.

Ejemplo: lectura de diferentes partes del archivo en distintos hilos

import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;

public class FileChunkReader implements Runnable {
    private final Path path;
    private final long position;
    private final int size;

    public FileChunkReader(Path path, long position, int size) {
        this.path = path;
        this.position = position;
        this.size = size;
    }

    @Override
    public void run() {
        try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(size);
            channel.read(buffer, position);
            // Procesar los datos
            System.out.println("Leídos " + buffer.position() + " bytes desde la posición " + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Comentario: lanza varias de estas tareas — cada una lee su propio rango. Pero cuidado: no todos los discos ni sistemas de archivos soportan bien una carga altamente paralela.

6. Errores típicos en el procesamiento paralelo de archivos

Error n.º 1: escritura paralela en un mismo archivo sin sincronización. Los datos se mezclan y se corrompen. Utiliza colas, bufferización y sincronización de la escritura.

Error n.º 2: demasiado paralelismo. Los streams paralelos en hardware modesto o con archivos pequeños añaden sobrecarga y pueden ralentizar la ejecución.

Error n.º 3: ignorar los errores de E/S. En los streams paralelos es fácil «perder» excepciones: trátalas dentro de las lambdas, haz logging y ten en cuenta los fallos.

Error n.º 4: recursos sin cerrar. Usa siempre try-with-resources para flujos/canales; de lo contrario tendrás fugas y errores extraños.

Error n.º 5: esperar «magia» de parallel(). El paralelismo acelera solo cuando hay suficiente carga de trabajo y recursos disponibles (CPU, disco rápido). La llamada a parallel() por sí sola no es una bala de plata.

Comentarios
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION