CodeGym /課程 /JAVA 25 SELF /平行串流:語法與應用

平行串流:語法與應用

JAVA 25 SELF
等級 54 , 課堂 2
開放

1. 回顧 Stream API

你已經熟悉 Stream API——這是一種處理集合的便利方式,可讓你撰寫精簡且易讀的資料處理程式碼:過濾、排序、計數等。

經典範例如下:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

int sum = numbers.stream()
    .filter(n -> n % 2 == 0)
    .mapToInt(n -> n)
    .sum();

System.out.println(sum); // 6(2 + 4)

在這個範例中,集合被轉成串流(stream()),從中只保留偶數,接著將其轉為 int,最後呼叫 sum() 進行加總。

Stream API 讓程式碼更短且更具表達力:與其逐步描述如何處理,不如直接說明你想要的結果。此外,必要時只要一行就能切換到平行處理。

2. 平行串流:語法與運作原理

如何把串流變成平行?

很簡單:使用 parallelStream() 取代 stream()。或對既有串流呼叫 .parallel()

List<Integer> numbers = ...;

int sum = numbers.parallelStream()
    .filter(n -> n % 2 == 0)
    .mapToInt(n -> n)
    .sum();

或如下:

numbers.stream()
    .parallel() // 轉為平行串流
    .filter(...)
    .map(...)
    .sum();

底層發生了什麼?

  • 集合會自動被拆分成多個區塊。
  • 每個區塊在不同的執行緒中處理(使用 ForkJoinPool——專用的執行緒池)。
  • 結果再合併為最終值。

也就是說,如果你的處理器有多核心,處理就會真正並行進行——例如,過濾與加總可同時在多個核心上執行。

何時特別有用?

  • 處理大型集合(數萬個元素以上)。
  • 每個元素都需進行複雜計算。
  • 不需要嚴格保留處理順序。

範例:順序串流 vs 平行串流

來看看處理大型陣列的簡單範例。

import java.util.*;
import java.util.stream.*;

public class ParallelStreamDemo {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
                                         .boxed()
                                         .collect(Collectors.toList());

        // 順序串流
        long time1 = System.currentTimeMillis();
        long count1 = numbers.stream()
            .filter(n -> n % 2 == 0)
            .count();
        long time2 = System.currentTimeMillis();
        System.out.println("順序: " + (time2 - time1) + " 毫秒,偶數:" + count1);

        // 平行串流
        long time3 = System.currentTimeMillis();
        long count2 = numbers.parallelStream()
            .filter(n -> n % 2 == 0)
            .count();
        long time4 = System.currentTimeMillis();
        System.out.println("平行: " + (time4 - time3) + " 毫秒,偶數:" + count2);
    }
}

請在自己的電腦上試試——在多核心處理器上,平行串流通常會更快。不過也不一定!細節請見下文。

3. 運作方式:ForkJoinPool 與自動分割

平行串流在底層使用 ForkJoinPool.commonPool(),它會自動管理執行緒數量(通常等於可用處理器核心數)。

示意如下:

+-----------------------------+
|          您的集合           |
+-----------------------------+
| 1  | 2  | 3  | ... | 1000 萬 |
+----+----+----+-----+--------+
   |    |    |           |
   v    v    v           v
[執行緒1][執行緒2]...[執行緒N]
   |    |    |           |
   +----+----+-----------+
        |
      [結果合併]

每個執行緒各自處理一塊資料,最後再把結果彙總。

4. 限制與坑點

平行串流不是能把一切「加速」的魔法按鈕。 有時甚至會變慢!

什麼情況下不划算:

  • 集合很小(少於約 ~1000 個元素)。
  • 對每個元素的操作非常快(例如只做 n * 2)。
  • 你非常在意處理順序(例如要按順序寫入檔案)。

為什麼? 建立與同步執行緒也需要時間。如果任務本身「很小」,平行化的額外開銷可能超過它帶來的收益。

副作用是平行化的天敵

若在串流內的操作會修改外部變數,務必小心!

錯誤範例:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] sum = {0};

numbers.parallelStream().forEach(n -> sum[0] += n);

System.out.println(sum[0]); // ???(你預期是 15,實際可能是任何數)

為何如此?因為多個執行緒同時修改同一個變數——會發生競態條件(race condition)。最終值可能不正確。

正確作法——使用會回傳結果的串流方法:

int sum = numbers.parallelStream().mapToInt(n -> n).sum();

不是所有集合都同樣適合「平行化」

有些集合(例如一般的 ArrayList)很容易被切分;但 LinkedList 或具有無限元素的串流(例如 Stream.generate(...))就不太適合。

5. 實作:效能比較

範例:尋找最大值

import java.util.*;
import java.util.stream.*;

public class ParallelMaxDemo {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.rangeClosed(1, 30_000_000)
                                         .boxed()
                                         .collect(Collectors.toList());

        // 順序
        long t1 = System.currentTimeMillis();
        int max1 = numbers.stream().max(Integer::compareTo).get();
        long t2 = System.currentTimeMillis();
        System.out.println("順序: " + (t2 - t1) + " 毫秒,最大值 = " + max1);

        // 平行
        long t3 = System.currentTimeMillis();
        int max2 = numbers.parallelStream().max(Integer::compareTo).get();
        long t4 = System.currentTimeMillis();
        System.out.println("平行: " + (t4 - t3) + " 毫秒,最大值 = " + max2);
    }
}

會看到什麼? 在現代多核心處理器上,平行串流通常更快。但如果把 30_000_000 改成 1000,差異就不明顯——有時平行甚至更慢!

6. 使用範例:過濾、彙總、排序

過濾與計數

List<String> names = Arrays.asList("Anna", "Boris", "Vova", "Grisha", "Dora", "Egor", "Zoya");

long count = names.parallelStream()
    .filter(name -> name.length() == 4)
    .count();

System.out.println("長度為 4 的名字: " + count);

分組

List<String> words = Arrays.asList("貓", "鯨", "貓", "狗", "鯨", "貓");

Map<String, Long> freq = words.parallelStream()
    .collect(Collectors.groupingBy(
        w -> w,
        Collectors.counting()
    ));

System.out.println(freq); // {狗=1, 鯨=2, 貓=3}

排序(但平行不一定有加速!)

List<Integer> bigList = IntStream.rangeClosed(1, 5_000_000)
                                 .boxed()
                                 .collect(Collectors.toList());

long t1 = System.currentTimeMillis();
List<Integer> sorted = bigList.parallelStream()
    .sorted()
    .collect(Collectors.toList());
long t2 = System.currentTimeMillis();

System.out.println("平行排序: " + (t2 - t1) + " 毫秒");

7. 重要細節與建議

何時適合使用 parallelStream()

  • 集合很大(數萬個元素以上)。
  • 對元素的操作「很重」(複雜計算、檔案/網路 I/O)。
  • 沒有順序相依。
  • 沒有副作用(不會修改外部變數)。

何時不要使用 parallelStream()

  • 集合很小。
  • 操作很快。
  • 必須嚴格保序。
  • 需要存取共用變數(請考慮 thread-safe 集合或其他做法)。

如何知道使用了多少執行緒?

預設為處理器核心數:Runtime.getRuntime().availableProcessors()。可透過系統屬性修改:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

只有在了解其後果時才這麼做——否則可能把處理器塞滿,反而變慢。

8. 使用平行串流的常見錯誤

錯誤 1:在 forEach 內產生副作用
許多人會想:「我就用平行把列表填滿!」

List<Integer> result = new ArrayList<>();
IntStream.range(0, 1_000)
    .parallel()
    .forEach(result::add); // 危險!
System.out.println(result.size()); // 結果是不確定的!

為什麼不好? ArrayList 不是執行緒安全的,從多個執行緒同時新增元素,結果不可預期:可能遺漏、重複,或丟出例外。

解法: 使用串流的收集方法(collect),它們會自行妥善處理,或使用特殊的集合。

List<Integer> result = IntStream.range(0, 1_000)
    .parallel()
    .boxed()
    .collect(Collectors.toList());

錯誤 2:在小任務上期待加速
平行不是免費的!如果集合很小,平行串流可能因排程與同步的額外開銷而更慢。

錯誤 3:打亂順序
若你重視元素的順序(例如寫入檔案),請不要使用平行串流——順序不保證(或會變慢)。

錯誤 4:使用「不適合」的集合
有些集合(例如 LinkedList、非標準資料結構)不易切分,平行化效率會降低。

錯誤 5:忽略收集結果時的執行緒安全
如果你手動彙集結果(例如自行加入到列表),請使用具執行緒安全的集合(CopyOnWriteArrayListConcurrentLinkedQueue)或使用串流的收集方法。

留言
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION