1. 병렬 스트림 (parallelStream): 쉽고 편리하게
이미 Stream API를 사용해 보았다면, 컬렉션을 필터링하고 변환하고 수집하는 일이 얼마나 편한지 알고 있을 것입니다. 중요한 보너스: 어떤 스트림이든 단 한 줄로 병렬로 만들 수 있습니다 — parallel()을 켜면 컬렉션의 요소가 동시에 처리됩니다.
이는 파일 모음에 대해 서로 독립적인 작업(줄 수 세기, 부분 문자열 찾기, 복사, 압축 등)에 특히 유용합니다.
예시: 디렉터리 내 모든 파일의 줄 수를 병렬로 세기
방법 1: 순차 처리
import java.nio.file.*;
import java.io.IOException;
import java.util.List;
public class LogLineCounter {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
long totalLines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
for (Path file : stream) {
long lines = Files.lines(file).count();
totalLines += lines;
}
}
System.out.println("모든 로그의 총 줄 수: " + totalLines);
}
}
코멘트: 모든 작업이 차례로, 파일을 하나씩 처리됩니다. 파일이 많고 크다면 오래 기다려야 합니다.
방법 2: 병렬 처리!
import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;
public class LogLineCounterParallel {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
try (Stream<Path> files = Files.list(logDir)) {
long totalLines = files
.filter(path -> path.toString().endsWith(".log"))
.parallel() // 바로 이 한 줄이 핵심!
.mapToLong(file -> {
try (Stream<String> lines = Files.lines(file)) {
return lines.count();
} catch (IOException e) {
e.printStackTrace();
return 0L;
}
})
.sum();
System.out.println("모든 로그의 총 줄 수: " + totalLines);
}
}
}
코멘트: 핵심 한 줄은 — .parallel(). 다중 코어 프로세서에서는 일반적으로 눈에 띄게 빨라집니다.
어떻게 동작하나요?
- parallel()은 일반 스트림을 병렬 스트림으로 바꿉니다. 내부적으로 공용 ForkJoinPool을 사용합니다(기본 스레드 수는 보통 코어 수와 같습니다).
- 각 파일은 독립적으로 처리되며, 결과는 종단 연산(예: sum())으로 집계됩니다.
- 파일이 적으면 가속 효과가 없을 수 있고, 수백 개라면 대개 이득이 분명합니다.
중요!
- 병렬 스트림은 I/O 연산 자체를 빠르게 하지는 않습니다. 여러 작업을 동시에 진행할 수 있게 해 줄 뿐입니다. 빠른 매체(SSD)에서는 도움이 되지만, 느린 매체(HDD)에서는 디스크 병목에 걸릴 수 있습니다.
2. ForkJoinPool: 분할 정복의 실전 적용
ForkJoin은 분할 정복 방식의 병렬 계산 프레임워크입니다: 큰 작업을 하위 작업으로 나누어 병렬로 실행하고 결과를 합칩니다. 이를 관리하는 전용 풀이 바로 ForkJoinPool입니다. 병렬 스트림의 “무대 뒤”에서도 사용되지만, 더 큰 유연성이 필요하면 직접 제어할 수도 있습니다.
이 모델은 특히 재귀적 구조(디렉터리 트리), 대규모 데이터 배열, 독립적인 부분으로 쉽게 분해할 수 있는 작업에 적합합니다.
예시: 디렉터리 트리를 재귀적으로 탐색
모든 ".txt"-파일(하위 폴더 포함)을 찾아 전체 줄 수를 계산해 봅니다.
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;
public class FolderLineCounter extends RecursiveTask<Long> {
private final Path dir;
public FolderLineCounter(Path dir) {
this.dir = dir;
}
@Override
protected Long compute() {
List<FolderLineCounter> subTasks = new ArrayList<>();
long lines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path entry : stream) {
if (Files.isDirectory(entry)) {
FolderLineCounter task = new FolderLineCounter(entry);
task.fork(); // 하위 작업을 시작합니다
subTasks.add(task);
} else if (entry.toString().endsWith(".txt")) {
try (Stream<String> fileLines = Files.lines(entry)) {
lines += fileLines.count();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
// 하위 작업들의 결과를 합칩니다
for (FolderLineCounter task : subTasks) {
lines += task.join();
}
return lines;
}
public static void main(String[] args) {
Path root = Paths.get("big_folder");
ForkJoinPool pool = new ForkJoinPool();
FolderLineCounter counter = new FolderLineCounter(root);
long totalLines = pool.invoke(counter);
System.out.println("모든 .txt 파일의 총 줄 수: " + totalLines);
}
}
여기서 하는 일:
- 각 폴더마다 별도의 작업(FolderLineCounter)을 만들고, 하위 디렉터리는 자체 하위 작업(fork())을 생성합니다.
- 파일은 그 자리에서 계산하고, 모든 하위 작업의 join() 이후 결과를 합산합니다.
ForkJoin의 장점은?
- 큰 계층 구조(디렉터리 트리)에 효과적입니다.
- CPU 코어를 최대한 활용합니다.
- 병렬화 수준과 작업 경계를 세밀하게 제어할 수 있습니다.
3. 실전 활용 시나리오
대량 파일 처리
예를 들어, 수천 장의 사진을 백업 폴더로 복사해야 할 때.
import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelFileCopier {
public static void main(String[] args) throws Exception {
Path sourceDir = Paths.get("photos");
Path destDir = Paths.get("photos_backup");
Files.createDirectories(destDir);
List<Path> files = Files.list(sourceDir)
.filter(Files::isRegularFile)
.collect(Collectors.toList());
files.parallelStream().forEach(file -> {
try {
Path destFile = destDir.resolve(file.getFileName());
Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println("모든 파일을 복사했습니다!");
}
}
코멘트: 각 파일이 별도의 스레드에서 복사됩니다. 파일 수가 많을수록 가속이 눈에 띕니다.
병렬 압축/해제
마찬가지로 parallelStream()이나 자체 ForkJoinPool로 압축, 해시 재계산, 이미지 포맷 변환 등을 병렬화할 수 있습니다.
4. 중요한 주의사항 및 제한
- I/O 연산은 항상 병렬 처리의 이점을 얻지 못합니다. 디스크나 네트워크가 병목이면, 수백 개의 병렬 작업은 자원 경쟁만 늘립니다.
- 스레드를 과도하게 생성하지 마세요. 기본적으로 병렬 스트림은 공용 ForkJoinPool.commonPool()을 사용하며 병렬성 수준은 대략 코어 수와 같습니다. 이는 속성 "java.util.concurrent.ForkJoinPool.common.parallelism"으로 변경할 수 있지만, 충분히 이해한 뒤에만 조정하세요.
- 동기화를 잊지 마세요. 여러 스레드가 동일한 파일/객체에 쓰는 경우 동기화와 큐를 사용하세요. 서로 독립적인 파일에는 동기화가 필요 없습니다.
5. FileChannel과 위치 기반 접근 간단 소개
고급 시나리오(예: 하나의 큰 파일을 여러 부분으로 병렬 읽기)에서는 위치 기반 읽기/쓰기를 지원하는 java.nio.channels.FileChannel을 사용하세요.
예시: 다른 스레드에서 파일의 서로 다른 부분 읽기
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;
public class FileChunkReader implements Runnable {
private final Path path;
private final long position;
private final int size;
public FileChunkReader(Path path, long position, int size) {
this.path = path;
this.position = position;
this.size = size;
}
@Override
public void run() {
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(size);
channel.read(buffer, position);
// 데이터 처리
System.out.println("읽은 바이트 수: " + buffer.position() + ", 시작 위치: " + position);
} catch (Exception e) {
e.printStackTrace();
}
}
}
코멘트: 이런 작업을 여러 개 실행하면 각 작업이 자신의 범위를 읽습니다. 다만 주의하세요 — 모든 디스크와 파일 시스템이 강한 병렬 부하를 좋아하는 것은 아닙니다.
6. 파일 병렬 처리에서 흔한 실수
실수 №1: 동기화 없이 하나의 파일에 병렬로 쓰기. 데이터가 뒤섞여 손상됩니다. 큐, 버퍼링, 쓰기 동기화를 사용하세요.
실수 №2: 과도한 병렬성. 성능이 약한 하드웨어/작은 파일에서는 병렬 스트림의 오버헤드로 인해 오히려 느려질 수 있습니다.
실수 №3: I/O 오류를 무시함. 병렬 스트림에서는 예외가 “사라지기” 쉽습니다 — 람다 내부에서 처리하고, 로깅하며, 실패를 고려하세요.
실수 №4: 자원을 닫지 않음. 스트림/채널에는 항상 try-with-resources를 사용하세요. 그렇지 않으면 누수와 이상한 오류를 겪게 됩니다.
실수 №5: parallel()에 ‘마법’을 기대함. 병렬성은 충분한 작업량과 가용 자원(CPU, 빠른 디스크)이 있을 때만 가속합니다. parallel() 호출 자체는 은총알이 아닙니다.
GO TO FULL VERSION