CodeGym /コース /JAVA 25 SELF /ファイルシステムの並列走査: Files.walk + parallel() と ForkJoin

ファイルシステムの並列走査: Files.walk + parallel() と ForkJoin

JAVA 25 SELF
レベル 59 , レッスン 2
使用可能

1. 課題: ディレクトリ内の多数のファイルを効率的に処理するには

現代のアプリケーションでは、フォルダとそのサブディレクトリ内の大量のファイルを処理する必要がよくあります。例えば:

  • プロジェクト内のすべての ".java"ファイルの総行数を数える。
  • 過去1か月に変更されたすべてのファイルを見つける。
  • 特定の条件に基づいてファイルをコピーまたは削除する。

ファイルが少ない場合は通常のループで十分です。しかし数千、数万となり、各ファイルで「重い」処理(読み取り、パース、分析)を行う場合、実行時間は大きく増加します。

質問: 大量のファイル処理をどう高速化するか?
回答: 並列化を使い、ファイルを複数スレッドで同時に処理する。

2. ファイルシステムを走査するためのツール

Files.walk()

Java 8以降、ディレクトリツリーを走査する便利な方法 — メソッド Files.walk()(パッケージ java.nio.file)が追加されました。これは Stream<Path> を返し、指定したディレクトリを起点にすべてのファイルとフォルダを列挙します。

例:

import java.nio.file.*;
import java.util.stream.Stream;

Path start = Paths.get("src");
try (Stream<Path> stream = Files.walk(start)) {
    stream.forEach(System.out::println);
}
  • Files.walk(start) — サブディレクトリを含む、すべてのファイルとフォルダのストリームを返します。
  • 最大の走査深度を指定できます: Files.walk(start, 3)

Files.find()

あらかじめ条件でフィルタしたい場合(例えば、".java"ファイルのみ)、Files.find() を使います:

import java.nio.file.*;
import java.util.stream.Stream;

Path start = Paths.get("src");

try (Stream<Path> stream = Files.find(
        start,
        Integer.MAX_VALUE,
        (path, attr) -> path.toString().endsWith(".java"))) {
    stream.forEach(System.out::println);
}
  • Files.find() はフィルタ(BiPredicate<Path, BasicFileAttributes>)を受け取り、パスとファイル属性を引数に取ります.

3. 並列処理: parallel() と ForkJoinPool

並列ストリーム: .parallel()

任意の Stream には parallel() メソッドがあります。これを呼び出すと、要素の処理は複数スレッドで行われます。

Files.walk(start)
    .parallel()
    .forEach(path -> processFile(path));

各ファイルは可能な限り並列に処理されます。特に「重い」処理(読み取り、パース、計算)で効果的です。

内部ではどう動くのか? ForkJoinPool

並列ストリームは共通スレッドプール — ForkJoinPool.commonPool() を使用します。これはタスクをスレッド間で分配する「賢い」プールです。

  • デフォルトのスレッド数は利用可能なプロセッサ数と同じ: Runtime.getRuntime().availableProcessors()
  • 「fork/join」モデルは、個々のファイル処理のような独立したタスクに適しています。

いつ .parallel() を使うべきか?

  • 各ファイルの処理が互いに独立しているとき。
  • 処理が「重い」(CPU を強く使う、または I/O 待ちが長い)とき。
  • ファイル数が多い(数百〜数千)とき。

次のような場合は 並列ストリームを使わないほうがよい:

  • ファイルが少ない場合(並列化のオーバーヘッドがメリットを上回る可能性)。
  • 厳密な順序や要素間の依存が必要な場合。

4. 代替手段と並列度のチューニング

いつ ExecutorService を使うべきか?

並列ストリームはシンプルなケースには有効です。しかし次のような場合は:

  • スレッド数を厳密に制御したい(IO-bound タスクでは、コア数より多いスレッドが有利なことが多い)。
  • キュー・キャンセル・リトライ・エラー処理を柔軟に管理したい。
  • より複雑なタスクパイプラインを構築したい。

その場合は ExecutorService を使います:

import java.nio.file.*;
import java.util.concurrent.*;

ExecutorService executor = Executors.newFixedThreadPool(8);
Files.walk(start)
    .filter(Files::isRegularFile)
    .forEach(path -> executor.submit(() -> processFile(path)));
executor.shutdown();

ForkJoinPool のチューニング

共通プールのデフォルトのスレッド数はプロセッサ数と同じです。これはシステムプロパティで変更できます(並列ストリームを最初に使用する前に設定する必要があります):

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
  • 設定後、すべての並列ストリームは最大 16 スレッドを使用します。

CPU-bound vs IO-bound タスク

  • CPU-bound: CPU を強く消費する(数値計算、パース、圧縮)。スレッド数はコア数と同程度が目安。
  • IO-bound: ディスク/ネットワーク待ちが多い。コア数より多いスレッド数が有利なことが多い。

並列ストリームは IO-bound タスクに最適とは限りません。より大きなプールを持つ独自の ExecutorService のほうが有利なことが多いです。

5. 例: ファイルの並列検索と処理

並列走査を使って、プロジェクト内のすべての ".java"ファイルの総行数を数えます。

import java.nio.file.*;
import java.util.stream.*;
import java.io.IOException;

public class LineCounter {
    public static void main(String[] args) throws IOException {
        Path start = Paths.get("src");

        long totalLines = Files.walk(start)
            .parallel() // 並列処理!
            .filter(p -> p.toString().endsWith(".java"))
            .mapToLong(LineCounter::countLines)
            .sum();

        System.out.println("コードの総行数: " + totalLines);
    }

    // ファイル内の行数を数えるメソッド
    private static long countLines(Path path) {
        try (Stream<String> lines = Files.lines(path)) {
            return lines.count();
        } catch (IOException e) {
            System.err.println("ファイル読み取りエラー: " + path);
            return 0;
        }
    }
}

ここで行っていること:

  • Files.walk(start) — すべてのパスを走査。
  • parallel() — 並列処理を有効化。
  • filter(...)".java"ファイルだけを残す。
  • mapToLong(...) — 各ファイルの行数を数える。
  • sum() — 結果を合計。

利点: 複数スレッドを活用しつつ、コードは簡潔なままです.

6. 重要な注意点とよくあるミス

  • すべてのタスクが並列化で速くなるわけではありません。 小規模なファイル集合や高速な処理では、オーバーヘッドによりかえって遅くなることがあります。
  • リソースを必ず閉じる。 ファイル操作では try-with-resources を使い、ディスクリプタがリークしないようにしましょう。例えば、Files.lines(path)try(...) で使う。
  • ネストした並列化。 他の並列タスクの内部で並列ストリーム(nested parallelism)を実行するのは効果が薄く、性能劣化を招くことがあります。
  • 副作用。 共有のデータ構造やファイルへの書き込みを同期なしで行うのは避ける。要素ごとの「純粋」な処理を心掛ける。

7. 図解: ファイルの並列走査がどのように動くか

flowchart TD
    A["Files.walk(start)"] --> B["Stream<Path>"]
    B --> C{".parallel()?"}
    C -- いいえ --> D[通常の forEach]
    C -- はい --> E["並列 forEach (ForkJoinPool)"]
    E --> F[複数スレッドでファイルを処理]

8. ファイルの並列処理でありがちなミス

誤り No.1: 小さなタスクに並列ストリームを使う — オーバーヘッドが利点を上回る。

誤り No.2: 並列ストリームが IO-boundCPU-bound と同じように高速化すると期待する。I/O では大きめのプールを持つ ExecutorService のほうが有利なことが多い。

誤り No.3: ラムダ内の例外を未処理のままにする — IOException を適切に扱わないとストリームが途中で途切れ、結果が不完全になることがあります。

誤り No.4: 共有変数やファイルへの書き込みでのレースコンディション — アクセスを同期するか、副作用を避ける。

誤り No.5: リソースを閉じ忘れる — すべてのファイル操作で try-with-resources を使う。

誤り No.6: 最初の使用後に ForkJoinPool.commonPool() を変更しようとする — System.setProperty(...) による設定は事前に行う必要があります。

誤り No.7: 並列ストリームの中でさらに並列ストリームを使う — 多くの場合、性能劣化を招きます。

コメント
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION