CodeGym /課程 /JAVA 25 SELF /檔案系統的平行走訪:Files.walk + parallel() 與 ForkJoin

檔案系統的平行走訪:Files.walk + parallel() 與 ForkJoin

JAVA 25 SELF
等級 59 , 課堂 2
開放

1. 問題:如何高效處理目錄中的大量檔案

在現代應用程式中,常常需要處理某個資料夾及其子目錄中的大量檔案。例如:

  • 計算專案中所有 ".java" 檔案的總行數。
  • 找出過去一個月內被修改的所有檔案。
  • 依據特定條件複製或刪除檔案。

如果檔案數量不多,一般的迴圈就足夠。但當數量達到成千上萬,尤其每個檔案都要執行「重」操作(讀取、解析、分析)時,所需時間會顯著增加。

問題: 如何加速大量檔案的處理?
解答: 使用平行化——在多個執行緒中同時處理檔案。

2. 走訪檔案系統的工具

Files.walk()

自 Java 8 起,提供了方便的目錄樹走訪方法——Files.walk()(位於 java.nio.file 套件)。它會回傳 Stream<Path>,涵蓋從指定目錄開始的所有檔案與資料夾。

範例:

import java.nio.file.*;
import java.util.stream.Stream;

Path start = Paths.get("src");
try (Stream<Path> stream = Files.walk(start)) {
    stream.forEach(System.out::println);
}
  • Files.walk(start)——回傳所有檔案與資料夾(包含子目錄)的串流。
  • 可以指定最大走訪深度:Files.walk(start, 3)

Files.find()

若需要一開始就依條件過濾(例如只要 ".java" 檔),可使用 Files.find()

import java.nio.file.*;
import java.util.stream.Stream;

Path start = Paths.get("src");

try (Stream<Path> stream = Files.find(
        start,
        Integer.MAX_VALUE,
        (path, attr) -> path.toString().endsWith(".java"))) {
    stream.forEach(System.out::println);
}
  • Files.find() 接受一個過濾器(BiPredicate<Path, BasicFileAttributes>),可取得路徑與檔案屬性。

3. 平行處理:parallel() 與 ForkJoinPool

平行串流:.parallel()

任何 Stream 都有 parallel() 方法。呼叫後,元素處理會在多個執行緒中進行。

Files.walk(start)
    .parallel()
    .forEach(path -> processFile(path));

每個檔案都將(在可能時)被平行處理,對於「重」操作特別有效:讀取、解析、計算。

內部如何運作?ForkJoinPool

平行串流使用共用的執行緒池——ForkJoinPool.commonPool()。這是個「聰明」的池,會在執行緒間分配工作。

  • 預設執行緒數量 = 可用處理器數量:Runtime.getRuntime().availableProcessors()
  • 「fork/join」平行模型很適合彼此獨立的任務——例如處理個別檔案。

何時使用 .parallel()

  • 當每個檔案的處理彼此獨立時。
  • 當操作很「重」(大量耗用 CPU 或長時間等待 IO)時。
  • 當檔案很多(數百、數千)時。

不建議 使用平行串流的情況:

  • 檔案很少時(平行化的額外開銷可能大於收益)。
  • 需要嚴格的處理順序或元素之間存在相依時。

4. 替代方案與平行化調校

何時改用 ExecutorService 更好?

平行串流適合簡單情境。但若需要:

  • 精準控制執行緒數(對 IO-bound 任務,執行緒數通常可多於核心數)。
  • 管理佇列、取消、重試與錯誤處理。
  • 建立更複雜的工作管線。

此時請使用 ExecutorService

import java.nio.file.*;
import java.util.concurrent.*;

ExecutorService executor = Executors.newFixedThreadPool(8);
Files.walk(start)
    .filter(Files::isRegularFile)
    .forEach(path -> executor.submit(() -> processFile(path)));
executor.shutdown();

調校 ForkJoinPool

預設共有池使用與處理器數相同的執行緒數。可在第一次使用平行串流之前,透過系統屬性進行調整:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
  • 設定後,所有平行串流最多會使用到 16 個執行緒。

CPU-bound vs IO-bound 任務

  • CPU-bound:高度使用處理器(數學運算、解析、壓縮)。執行緒數 ≈ 核心數。
  • IO-bound:多為磁碟/網路等待。執行緒數常可多於核心數。

IO-bound 任務,平行串流不一定最佳——常常使用自訂、執行緒數較多的 ExecutorService 更有利。

5. 範例:平行搜尋與處理檔案

以平行走訪的方式,計算專案中所有 ".java" 檔案的總行數。

import java.nio.file.*;
import java.util.stream.*;
import java.io.IOException;

public class LineCounter {
    public static void main(String[] args) throws IOException {
        Path start = Paths.get("src");

        long totalLines = Files.walk(start)
            .parallel() // 平行處理!
            .filter(p -> p.toString().endsWith(".java"))
            .mapToLong(LineCounter::countLines)
            .sum();

        System.out.println("程式碼總行數: " + totalLines);
    }

    // 用於計算檔案行數的方法
    private static long countLines(Path path) {
        try (Stream<String> lines = Files.lines(path)) {
            return lines.count();
        } catch (IOException e) {
            System.err.println("讀取檔案時發生錯誤: " + path);
            return 0;
        }
    }
}

發生了什麼:

  • Files.walk(start)——走訪所有路徑。
  • parallel()——啟用平行處理。
  • filter(...)——只保留 ".java" 檔案。
  • mapToLong(...)——計算每個檔案的行數。
  • sum()——彙總結果。

優點: 能同時利用多個執行緒,且程式碼仍然精簡。

6. 重要細節與常見陷阱

  • 不是所有任務都會因平行化而加速。 對於小型資料集或很快的操作,額外開銷可能讓程式更慢。
  • 務必關閉資源。 處理檔案時使用 try-with-resources——避免描述元洩漏。例如在 Files.lines(path) 周圍使用 try(...)
  • 巢狀平行化。 在其他平行工作中再啟動平行串流(nested parallelism)通常效率不佳,且可能導致效能退化。
  • 副作用。 避免未同步地寫入共用結構/檔案。偏好對元素的「純」操作。

7. 圖示:平行走訪檔案如何運作

flowchart TD
    A["Files.walk(start)"] --> B["Stream<Path>"]
    B --> C{".parallel()?"}
    C -- 否 --> D[一般的 forEach]
    C -- 是 --> E["平行 forEach (ForkJoinPool)"]
    E --> F[在多個執行緒中處理檔案]

8. 檔案平行處理的常見錯誤

錯誤 1: 對於小任務使用平行串流——額外開銷大於收益。

錯誤 2: 期待平行串流對 IO-bound 的加速效果與 CPU-bound 一樣。對 IO 通常需要 ExecutorService 且執行緒池較大。

錯誤 3: 未處理 lambda 中的例外——若未處理 IOException,串流可能中斷且結果不完整。

錯誤 4: 寫入共用變數或檔案時的競爭——請同步化存取或避免副作用。

錯誤 5: 忘記關閉資源——對所有檔案操作使用 try-with-resources

錯誤 6: 在第一次使用後才嘗試變更 ForkJoinPool.commonPool()——透過 System.setProperty(...) 的設定必須事先完成。

錯誤 7: 在平行串流內再使用平行串流——往往導致效能退化。

留言
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION