CodeGym /행동 /JAVA 25 SELF /파일 병렬 처리: ForkJoin, 병렬 스트림

파일 병렬 처리: ForkJoin, 병렬 스트림

JAVA 25 SELF
레벨 59 , 레슨 1
사용 가능

1. 병렬 스트림 (parallelStream): 쉽고 편리하게

이미 Stream API를 사용해 보았다면, 컬렉션을 필터링하고 변환하고 수집하는 일이 얼마나 편한지 알고 있을 것입니다. 중요한 보너스: 어떤 스트림이든 단 한 줄로 병렬로 만들 수 있습니다 — parallel()을 켜면 컬렉션의 요소가 동시에 처리됩니다.

이는 파일 모음에 대해 서로 독립적인 작업(줄 수 세기, 부분 문자열 찾기, 복사, 압축 등)에 특히 유용합니다.

예시: 디렉터리 내 모든 파일의 줄 수를 병렬로 세기

방법 1: 순차 처리

import java.nio.file.*;
import java.io.IOException;
import java.util.List;

public class LogLineCounter {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        long totalLines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
            for (Path file : stream) {
                long lines = Files.lines(file).count();
                totalLines += lines;
            }
        }
        System.out.println("모든 로그의 총 줄 수: " + totalLines);
    }
}

코멘트: 모든 작업이 차례로, 파일을 하나씩 처리됩니다. 파일이 많고 크다면 오래 기다려야 합니다.

방법 2: 병렬 처리!

import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;

public class LogLineCounterParallel {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        try (Stream<Path> files = Files.list(logDir)) {
            long totalLines = files
                .filter(path -> path.toString().endsWith(".log"))
                .parallel() // 바로 이 한 줄이 핵심!
                .mapToLong(file -> {
                    try (Stream<String> lines = Files.lines(file)) {
                        return lines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                        return 0L;
                    }
                })
                .sum();
            System.out.println("모든 로그의 총 줄 수: " + totalLines);
        }
    }
}

코멘트: 핵심 한 줄은 — .parallel(). 다중 코어 프로세서에서는 일반적으로 눈에 띄게 빨라집니다.

어떻게 동작하나요?

  • parallel()은 일반 스트림을 병렬 스트림으로 바꿉니다. 내부적으로 공용 ForkJoinPool을 사용합니다(기본 스레드 수는 보통 코어 수와 같습니다).
  • 각 파일은 독립적으로 처리되며, 결과는 종단 연산(예: sum())으로 집계됩니다.
  • 파일이 적으면 가속 효과가 없을 수 있고, 수백 개라면 대개 이득이 분명합니다.

중요!

  • 병렬 스트림은 I/O 연산 자체를 빠르게 하지는 않습니다. 여러 작업을 동시에 진행할 수 있게 해 줄 뿐입니다. 빠른 매체(SSD)에서는 도움이 되지만, 느린 매체(HDD)에서는 디스크 병목에 걸릴 수 있습니다.

2. ForkJoinPool: 분할 정복의 실전 적용

ForkJoin은 분할 정복 방식의 병렬 계산 프레임워크입니다: 큰 작업을 하위 작업으로 나누어 병렬로 실행하고 결과를 합칩니다. 이를 관리하는 전용 풀이 바로 ForkJoinPool입니다. 병렬 스트림의 “무대 뒤”에서도 사용되지만, 더 큰 유연성이 필요하면 직접 제어할 수도 있습니다.

이 모델은 특히 재귀적 구조(디렉터리 트리), 대규모 데이터 배열, 독립적인 부분으로 쉽게 분해할 수 있는 작업에 적합합니다.

예시: 디렉터리 트리를 재귀적으로 탐색

모든 ".txt"-파일(하위 폴더 포함)을 찾아 전체 줄 수를 계산해 봅니다.

import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;

public class FolderLineCounter extends RecursiveTask<Long> {
    private final Path dir;

    public FolderLineCounter(Path dir) {
        this.dir = dir;
    }

    @Override
    protected Long compute() {
        List<FolderLineCounter> subTasks = new ArrayList<>();
        long lines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
            for (Path entry : stream) {
                if (Files.isDirectory(entry)) {
                    FolderLineCounter task = new FolderLineCounter(entry);
                    task.fork(); // 하위 작업을 시작합니다
                    subTasks.add(task);
                } else if (entry.toString().endsWith(".txt")) {
                    try (Stream<String> fileLines = Files.lines(entry)) {
                        lines += fileLines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 하위 작업들의 결과를 합칩니다
        for (FolderLineCounter task : subTasks) {
            lines += task.join();
        }
        return lines;
    }

    public static void main(String[] args) {
        Path root = Paths.get("big_folder");
        ForkJoinPool pool = new ForkJoinPool();
        FolderLineCounter counter = new FolderLineCounter(root);
        long totalLines = pool.invoke(counter);
        System.out.println("모든 .txt 파일의 총 줄 수: " + totalLines);
    }
}

여기서 하는 일:

  • 각 폴더마다 별도의 작업(FolderLineCounter)을 만들고, 하위 디렉터리는 자체 하위 작업(fork())을 생성합니다.
  • 파일은 그 자리에서 계산하고, 모든 하위 작업의 join() 이후 결과를 합산합니다.

ForkJoin의 장점은?

  • 큰 계층 구조(디렉터리 트리)에 효과적입니다.
  • CPU 코어를 최대한 활용합니다.
  • 병렬화 수준과 작업 경계를 세밀하게 제어할 수 있습니다.

3. 실전 활용 시나리오

대량 파일 처리

예를 들어, 수천 장의 사진을 백업 폴더로 복사해야 할 때.

import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelFileCopier {
    public static void main(String[] args) throws Exception {
        Path sourceDir = Paths.get("photos");
        Path destDir = Paths.get("photos_backup");
        Files.createDirectories(destDir);

        List<Path> files = Files.list(sourceDir)
                .filter(Files::isRegularFile)
                .collect(Collectors.toList());

        files.parallelStream().forEach(file -> {
            try {
                Path destFile = destDir.resolve(file.getFileName());
                Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        System.out.println("모든 파일을 복사했습니다!");
    }
}

코멘트: 각 파일이 별도의 스레드에서 복사됩니다. 파일 수가 많을수록 가속이 눈에 띕니다.

병렬 압축/해제

마찬가지로 parallelStream()이나 자체 ForkJoinPool로 압축, 해시 재계산, 이미지 포맷 변환 등을 병렬화할 수 있습니다.

4. 중요한 주의사항 및 제한

  • I/O 연산은 항상 병렬 처리의 이점을 얻지 못합니다. 디스크나 네트워크가 병목이면, 수백 개의 병렬 작업은 자원 경쟁만 늘립니다.
  • 스레드를 과도하게 생성하지 마세요. 기본적으로 병렬 스트림은 공용 ForkJoinPool.commonPool()을 사용하며 병렬성 수준은 대략 코어 수와 같습니다. 이는 속성 "java.util.concurrent.ForkJoinPool.common.parallelism"으로 변경할 수 있지만, 충분히 이해한 뒤에만 조정하세요.
  • 동기화를 잊지 마세요. 여러 스레드가 동일한 파일/객체에 쓰는 경우 동기화와 큐를 사용하세요. 서로 독립적인 파일에는 동기화가 필요 없습니다.

5. FileChannel과 위치 기반 접근 간단 소개

고급 시나리오(예: 하나의 큰 파일을 여러 부분으로 병렬 읽기)에서는 위치 기반 읽기/쓰기를 지원하는 java.nio.channels.FileChannel을 사용하세요.

예시: 다른 스레드에서 파일의 서로 다른 부분 읽기

import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;

public class FileChunkReader implements Runnable {
    private final Path path;
    private final long position;
    private final int size;

    public FileChunkReader(Path path, long position, int size) {
        this.path = path;
        this.position = position;
        this.size = size;
    }

    @Override
    public void run() {
        try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(size);
            channel.read(buffer, position);
            // 데이터 처리
            System.out.println("읽은 바이트 수: " + buffer.position() + ", 시작 위치: " + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

코멘트: 이런 작업을 여러 개 실행하면 각 작업이 자신의 범위를 읽습니다. 다만 주의하세요 — 모든 디스크와 파일 시스템이 강한 병렬 부하를 좋아하는 것은 아닙니다.

6. 파일 병렬 처리에서 흔한 실수

실수 №1: 동기화 없이 하나의 파일에 병렬로 쓰기. 데이터가 뒤섞여 손상됩니다. 큐, 버퍼링, 쓰기 동기화를 사용하세요.

실수 №2: 과도한 병렬성. 성능이 약한 하드웨어/작은 파일에서는 병렬 스트림의 오버헤드로 인해 오히려 느려질 수 있습니다.

실수 №3: I/O 오류를 무시함. 병렬 스트림에서는 예외가 “사라지기” 쉽습니다 — 람다 내부에서 처리하고, 로깅하며, 실패를 고려하세요.

실수 №4: 자원을 닫지 않음. 스트림/채널에는 항상 try-with-resources를 사용하세요. 그렇지 않으면 누수와 이상한 오류를 겪게 됩니다.

실수 №5: parallel()에 ‘마법’을 기대함. 병렬성은 충분한 작업량과 가용 자원(CPU, 빠른 디스크)이 있을 때만 가속합니다. parallel() 호출 자체는 은총알이 아닙니다.

코멘트
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION