1. 自訂 Collector:何時、如何自己撰寫
在 Java 的 Stream API 中,將串流轉換為集合或彙總結果時,會使用 Collector 介面。通常你會使用 Collectors 類別中現成的收集器(toList()、toMap()、groupingBy() 等),但有時需求較特殊——此時就可以撰寫自己的 Collector。
Collector 是一個物件,用來描述如何從串流元素組裝出最終結果。它定義了四(其實是五)個關鍵組件:
- supplier — 建立一個新的容器用來收集元素(例如新的清單或映射)。
- accumulator — 將下一個元素加入容器。
- combiner — 合併兩個容器(對平行串流很重要!)。
- finisher — 把容器轉成最終結果(例如令其不可變,或轉成另一種型別)。
- characteristics — 一組旗標,描述收集器的特性(例如是否支援並行、是否改變結果型別等)。
簽名:
Collector<T, A, R>
- T — 串流元素的型別,
- A — 中間累加器的型別,
- R — 結果型別。
2. 範例:用於 MultiMap (Map<K, List<V>>) 的 Collector
假設你想把一個由 Pair<K, V> 組成的串流收集成 Map<K, List<V>>(multi-map),讓每個鍵對應到一個值的清單。
實作範例:
public static <K, V> Collector<Pair<K, V>, ?, Map<K, List<V>>> toMultiMap() {
return Collector.of(
HashMap::new, // supplier
(map, pair) -> map.computeIfAbsent(pair.key(), k -> new ArrayList<>()).add(pair.value()), // accumulator
(map1, map2) -> { // combiner
map2.forEach((k, vList) -> map1.merge(k, vList, (l1, l2) -> { l1.addAll(l2); return l1; }));
return map1;
},
Function.identity(), // finisher
Collector.Characteristics.UNORDERED
);
}
用法:
List<Pair<String, Integer>> pairs = List.of(
new Pair<>("a", 1), new Pair<>("b", 2), new Pair<>("a", 3)
);
Map<String, List<Integer>> multiMap = pairs.stream().collect(toMultiMap());
// multiMap: {a=[1, 3], b=[2]}
3. 範例:取出前 N 名元素的 Collector
假設你想把串流收集成只包含前 N 個最大元素的清單(例如 Top‑5,降序)。
實作:
public static <T> Collector<T, ?, List<T>> topN(int n, Comparator<? super T> comparator) {
return Collector.of(
() -> new PriorityQueue<>(n, comparator), // supplier
(pq, t) -> {
pq.offer(t);
if (pq.size() > n) pq.poll(); // 刪除最小值
},
(pq1, pq2) -> {
pq2.forEach(t -> {
pq1.offer(t);
if (pq1.size() > n) pq1.poll();
});
return pq1;
},
pq -> {
List<T> result = new ArrayList<>(pq);
result.sort(comparator.reversed()); // 按降序
return result;
},
Collector.Characteristics.UNORDERED
);
}
用法:
List<Integer> top3 = Stream.of(5, 1, 9, 3, 7, 2).collect(topN(3, Comparator.naturalOrder()));
// top3: [9, 7, 5]
4. 何時不該自己寫 Collector
- 如果可以用標準收集器與 downstream 操作的組合來表達需求(如 groupingBy、mapping、flatMapping、collectingAndThen 等),建議優先使用它們。
- 自訂 Collector 僅適用於真正非標準的情境(特殊資料結構、複雜彙總、Top‑N、multi-map 等)。
- 不要為了寫而寫 Collector——這會讓維護與測試更複雜。
範例:
// 與其自己為 Map<K, Set<V>> 寫 Collector:
.collect(Collectors.groupingBy(
Pair::key,
Collectors.mapping(Pair::value, Collectors.toSet())
))
5. 自訂 Spliterator:為什麼以及怎麼做
Spliterator 是一種專門用於高效遍歷與切分集合(或其他資料來源)的介面,特別適合平行處理。與一般的迭代器不同,Spliterator 可以「切分」(split)集合為彼此獨立的部分,供平行處理使用。
關鍵方法:
- tryAdvance(Consumer<? super T> action) — 處理下一個元素。
- trySplit() — 嘗試把集合切分為兩部分(回傳其中一部分的 Spliterator)。
- estimateSize() — 剩餘元素數量的估計值。
- characteristics() — 特性的位元遮罩(ORDERED、SIZED、SUBSIZED 等)。
trySplit:切分策略
平衡切分 對平行串流很重要:trySplit 應該回傳大小大致相當的部分,以便讓各工作執行緒負載更均衡。
如果無法切分(例如元素太少)——就回傳 null。
範例:分批讀取檔案的 Spliterator
假設你有一個很大的檔案,希望每次處理 1000 行(以批次方式),避免一次把所有內容載入記憶體。
public class ChunkedLineSpliterator implements Spliterator<List<String>> {
private final BufferedReader reader;
private final int chunkSize;
public ChunkedLineSpliterator(BufferedReader reader, int chunkSize) {
this.reader = reader;
this.chunkSize = chunkSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<String>> action) {
List<String> chunk = new ArrayList<>(chunkSize);
try {
String line;
for (int i = 0; i < chunkSize && (line = reader.readLine()) != null; i++) {
chunk.add(line);
}
if (chunk.isEmpty()) return false;
action.accept(chunk);
return true;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public Spliterator<List<String>> trySplit() {
// 對於從檔案串流式讀取,切分沒有意義 — 回傳 null
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // 事先未知
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
用法:
try (BufferedReader reader = Files.newBufferedReader(Path.of("big.txt"))) {
StreamSupport.stream(new ChunkedLineSpliterator(reader, 1000), false)
.forEach(chunk -> processChunk(chunk));
}
Spliterator 的特性
- ORDERED — 元素具有特定順序(例如清單)。
- SIZED — 已知確切的元素數量。
- SUBSIZED — 由 trySplit 取得的所有 Spliterator 也都具有 SIZED。
- IMMUTABLE — 遍歷期間來源不會改變。
- CONCURRENT — 來源支援安全的平行修改。
- DISTINCT、SORTED、NONNULL — 其他屬性。
重要:正確標註特性會影響串流的最佳化。
6. 範例
- 分批(chunk)讀取檔案 — 讓你能分段處理大型檔案,而不必一次把全部內容載入記憶體。
- 以最少配置進行解析 — 當你解析位元組/字元串流並希望最小化臨時物件的建立時,可以實作一個回傳來源陣列「視窗」或「切片」的 Spliterator。
範例:逐行解析 CSV 的 Spliterator
public class CsvLineSpliterator implements Spliterator<String[]> {
private final BufferedReader reader;
public CsvLineSpliterator(BufferedReader reader) {
this.reader = reader;
}
@Override
public boolean tryAdvance(Consumer<? super String[]> action) {
try {
String line = reader.readLine();
if (line == null) return false;
action.accept(line.split(","));
return true;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public Spliterator<String[]> trySplit() {
return null; // 序列式解析
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
7. 與 parallel() 整合 — 如何安全地進行
- 如果你的 Spliterator 支援平行切分(trySplit 回傳的不是 null),且特性包含 SIZED/SUBSIZED,Stream API 就能有效地平行化處理。
- 對於串流式來源(檔案、socket),通常不支援切分——請使用序列串流。
- 對於集合與陣列——請實作平衡的切分策略(例如把陣列對半分)。
範例:用於陣列的 Spliterator
public class ArraySpliterator<T> implements Spliterator<T> {
private final T[] array;
private int start, end;
public ArraySpliterator(T[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (start < end) {
action.accept(array[start++]);
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {
int mid = (start + end) >>> 1;
if (mid == start) return null;
ArraySpliterator<T> split = new ArraySpliterator<>(array, start, mid);
start = mid;
return split;
}
@Override
public long estimateSize() {
return end - start;
}
@Override
public int characteristics() {
return ORDERED | SIZED | SUBSIZED | IMMUTABLE;
}
}
用法:
String[] arr = {"a", "b", "c", "d"};
StreamSupport.stream(new ArraySpliterator<>(arr, 0, arr.length), true)
.forEach(System.out::println);
GO TO FULL VERSION