1. 回顧 Stream API
你已經熟悉 Stream API——這是一種處理集合的便利方式,可讓你撰寫精簡且易讀的資料處理程式碼:過濾、排序、計數等。
經典範例如下:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
System.out.println(sum); // 6(2 + 4)
在這個範例中,集合被轉成串流(stream()),從中只保留偶數,接著將其轉為 int,最後呼叫 sum() 進行加總。
Stream API 讓程式碼更短且更具表達力:與其逐步描述如何處理,不如直接說明你想要的結果。此外,必要時只要一行就能切換到平行處理。
2. 平行串流:語法與運作原理
如何把串流變成平行?
很簡單:使用 parallelStream() 取代 stream()。或對既有串流呼叫 .parallel()。
List<Integer> numbers = ...;
int sum = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
或如下:
numbers.stream()
.parallel() // 轉為平行串流
.filter(...)
.map(...)
.sum();
底層發生了什麼?
- 集合會自動被拆分成多個區塊。
- 每個區塊在不同的執行緒中處理(使用 ForkJoinPool——專用的執行緒池)。
- 結果再合併為最終值。
也就是說,如果你的處理器有多核心,處理就會真正並行進行——例如,過濾與加總可同時在多個核心上執行。
何時特別有用?
- 處理大型集合(數萬個元素以上)。
- 每個元素都需進行複雜計算。
- 不需要嚴格保留處理順序。
範例:順序串流 vs 平行串流
來看看處理大型陣列的簡單範例。
import java.util.*;
import java.util.stream.*;
public class ParallelStreamDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());
// 順序串流
long time1 = System.currentTimeMillis();
long count1 = numbers.stream()
.filter(n -> n % 2 == 0)
.count();
long time2 = System.currentTimeMillis();
System.out.println("順序: " + (time2 - time1) + " 毫秒,偶數:" + count1);
// 平行串流
long time3 = System.currentTimeMillis();
long count2 = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.count();
long time4 = System.currentTimeMillis();
System.out.println("平行: " + (time4 - time3) + " 毫秒,偶數:" + count2);
}
}
請在自己的電腦上試試——在多核心處理器上,平行串流通常會更快。不過也不一定!細節請見下文。
3. 運作方式:ForkJoinPool 與自動分割
平行串流在底層使用 ForkJoinPool.commonPool(),它會自動管理執行緒數量(通常等於可用處理器核心數)。
示意如下:
+-----------------------------+
| 您的集合 |
+-----------------------------+
| 1 | 2 | 3 | ... | 1000 萬 |
+----+----+----+-----+--------+
| | | |
v v v v
[執行緒1][執行緒2]...[執行緒N]
| | | |
+----+----+-----------+
|
[結果合併]
每個執行緒各自處理一塊資料,最後再把結果彙總。
4. 限制與坑點
平行串流不是能把一切「加速」的魔法按鈕。 有時甚至會變慢!
什麼情況下不划算:
- 集合很小(少於約 ~1000 個元素)。
- 對每個元素的操作非常快(例如只做 n * 2)。
- 你非常在意處理順序(例如要按順序寫入檔案)。
為什麼? 建立與同步執行緒也需要時間。如果任務本身「很小」,平行化的額外開銷可能超過它帶來的收益。
副作用是平行化的天敵
若在串流內的操作會修改外部變數,務必小心!
錯誤範例:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] sum = {0};
numbers.parallelStream().forEach(n -> sum[0] += n);
System.out.println(sum[0]); // ???(你預期是 15,實際可能是任何數)
為何如此?因為多個執行緒同時修改同一個變數——會發生競態條件(race condition)。最終值可能不正確。
正確作法——使用會回傳結果的串流方法:
int sum = numbers.parallelStream().mapToInt(n -> n).sum();
不是所有集合都同樣適合「平行化」
有些集合(例如一般的 ArrayList)很容易被切分;但 LinkedList 或具有無限元素的串流(例如 Stream.generate(...))就不太適合。
5. 實作:效能比較
範例:尋找最大值
import java.util.*;
import java.util.stream.*;
public class ParallelMaxDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 30_000_000)
.boxed()
.collect(Collectors.toList());
// 順序
long t1 = System.currentTimeMillis();
int max1 = numbers.stream().max(Integer::compareTo).get();
long t2 = System.currentTimeMillis();
System.out.println("順序: " + (t2 - t1) + " 毫秒,最大值 = " + max1);
// 平行
long t3 = System.currentTimeMillis();
int max2 = numbers.parallelStream().max(Integer::compareTo).get();
long t4 = System.currentTimeMillis();
System.out.println("平行: " + (t4 - t3) + " 毫秒,最大值 = " + max2);
}
}
會看到什麼? 在現代多核心處理器上,平行串流通常更快。但如果把 30_000_000 改成 1000,差異就不明顯——有時平行甚至更慢!
6. 使用範例:過濾、彙總、排序
過濾與計數
List<String> names = Arrays.asList("Anna", "Boris", "Vova", "Grisha", "Dora", "Egor", "Zoya");
long count = names.parallelStream()
.filter(name -> name.length() == 4)
.count();
System.out.println("長度為 4 的名字: " + count);
分組
List<String> words = Arrays.asList("貓", "鯨", "貓", "狗", "鯨", "貓");
Map<String, Long> freq = words.parallelStream()
.collect(Collectors.groupingBy(
w -> w,
Collectors.counting()
));
System.out.println(freq); // {狗=1, 鯨=2, 貓=3}
排序(但平行不一定有加速!)
List<Integer> bigList = IntStream.rangeClosed(1, 5_000_000)
.boxed()
.collect(Collectors.toList());
long t1 = System.currentTimeMillis();
List<Integer> sorted = bigList.parallelStream()
.sorted()
.collect(Collectors.toList());
long t2 = System.currentTimeMillis();
System.out.println("平行排序: " + (t2 - t1) + " 毫秒");
7. 重要細節與建議
何時適合使用 parallelStream()
- 集合很大(數萬個元素以上)。
- 對元素的操作「很重」(複雜計算、檔案/網路 I/O)。
- 沒有順序相依。
- 沒有副作用(不會修改外部變數)。
何時不要使用 parallelStream()
- 集合很小。
- 操作很快。
- 必須嚴格保序。
- 需要存取共用變數(請考慮 thread-safe 集合或其他做法)。
如何知道使用了多少執行緒?
預設為處理器核心數:Runtime.getRuntime().availableProcessors()。可透過系統屬性修改:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
只有在了解其後果時才這麼做——否則可能把處理器塞滿,反而變慢。
8. 使用平行串流的常見錯誤
錯誤 1:在 forEach 內產生副作用
許多人會想:「我就用平行把列表填滿!」
List<Integer> result = new ArrayList<>();
IntStream.range(0, 1_000)
.parallel()
.forEach(result::add); // 危險!
System.out.println(result.size()); // 結果是不確定的!
為什麼不好? ArrayList 不是執行緒安全的,從多個執行緒同時新增元素,結果不可預期:可能遺漏、重複,或丟出例外。
解法: 使用串流的收集方法(collect),它們會自行妥善處理,或使用特殊的集合。
List<Integer> result = IntStream.range(0, 1_000)
.parallel()
.boxed()
.collect(Collectors.toList());
錯誤 2:在小任務上期待加速
平行不是免費的!如果集合很小,平行串流可能因排程與同步的額外開銷而更慢。
錯誤 3:打亂順序
若你重視元素的順序(例如寫入檔案),請不要使用平行串流——順序不保證(或會變慢)。
錯誤 4:使用「不適合」的集合
有些集合(例如 LinkedList、非標準資料結構)不易切分,平行化效率會降低。
錯誤 5:忽略收集結果時的執行緒安全
如果你手動彙集結果(例如自行加入到列表),請使用具執行緒安全的集合(CopyOnWriteArrayList、ConcurrentLinkedQueue)或使用串流的收集方法。
GO TO FULL VERSION