CodeGym /課程 /JAVA 25 SELF /自訂 Collector 與 Spliterator

自訂 Collector 與 Spliterator

JAVA 25 SELF
等級 33 , 課堂 4
開放

1. 自訂 Collector:何時、如何自己撰寫

在 Java 的 Stream API 中,將串流轉換為集合或彙總結果時,會使用 Collector 介面。通常你會使用 Collectors 類別中現成的收集器(toList()toMap()groupingBy() 等),但有時需求較特殊——此時就可以撰寫自己的 Collector。

Collector 是一個物件,用來描述如何從串流元素組裝出最終結果。它定義了四(其實是五)個關鍵組件:

  • supplier — 建立一個新的容器用來收集元素(例如新的清單或映射)。
  • accumulator — 將下一個元素加入容器。
  • combiner — 合併兩個容器(對平行串流很重要!)。
  • finisher — 把容器轉成最終結果(例如令其不可變,或轉成另一種型別)。
  • characteristics — 一組旗標,描述收集器的特性(例如是否支援並行、是否改變結果型別等)。

簽名:

Collector<T, A, R>
  • T — 串流元素的型別,
  • A — 中間累加器的型別,
  • R — 結果型別。

2. 範例:用於 MultiMap (Map<K, List<V>>) 的 Collector

假設你想把一個由 Pair<K, V> 組成的串流收集成 Map<K, List<V>>(multi-map),讓每個鍵對應到一個值的清單。

實作範例:

public static <K, V> Collector<Pair<K, V>, ?, Map<K, List<V>>> toMultiMap() {
    return Collector.of(
        HashMap::new, // supplier
        (map, pair) -> map.computeIfAbsent(pair.key(), k -> new ArrayList<>()).add(pair.value()), // accumulator
        (map1, map2) -> { // combiner
            map2.forEach((k, vList) -> map1.merge(k, vList, (l1, l2) -> { l1.addAll(l2); return l1; }));
            return map1;
        },
        Function.identity(), // finisher
        Collector.Characteristics.UNORDERED
    );
}

用法:

List<Pair<String, Integer>> pairs = List.of(
    new Pair<>("a", 1), new Pair<>("b", 2), new Pair<>("a", 3)
);

Map<String, List<Integer>> multiMap = pairs.stream().collect(toMultiMap());
// multiMap: {a=[1, 3], b=[2]}

3. 範例:取出前 N 名元素的 Collector

假設你想把串流收集成只包含前 N 個最大元素的清單(例如 Top‑5,降序)。

實作:

public static <T> Collector<T, ?, List<T>> topN(int n, Comparator<? super T> comparator) {
    return Collector.of(
        () -> new PriorityQueue<>(n, comparator), // supplier
        (pq, t) -> {
            pq.offer(t);
            if (pq.size() > n) pq.poll(); // 刪除最小值
        },
        (pq1, pq2) -> {
            pq2.forEach(t -> {
                pq1.offer(t);
                if (pq1.size() > n) pq1.poll();
            });
            return pq1;
        },
        pq -> {
            List<T> result = new ArrayList<>(pq);
            result.sort(comparator.reversed()); // 按降序
            return result;
        },
        Collector.Characteristics.UNORDERED
    );
}

用法:

List<Integer> top3 = Stream.of(5, 1, 9, 3, 7, 2).collect(topN(3, Comparator.naturalOrder()));
// top3: [9, 7, 5]

4. 何時不該自己寫 Collector

  • 如果可以用標準收集器與 downstream 操作的組合來表達需求(如 groupingBymappingflatMappingcollectingAndThen 等),建議優先使用它們
  • 自訂 Collector 僅適用於真正非標準的情境(特殊資料結構、複雜彙總、Top‑N、multi-map 等)。
  • 不要為了寫而寫 Collector——這會讓維護與測試更複雜。

範例:

// 與其自己為 Map<K, Set<V>> 寫 Collector:
.collect(Collectors.groupingBy(
    Pair::key,
    Collectors.mapping(Pair::value, Collectors.toSet())
))

5. 自訂 Spliterator:為什麼以及怎麼做

Spliterator 是一種專門用於高效遍歷與切分集合(或其他資料來源)的介面,特別適合平行處理。與一般的迭代器不同,Spliterator 可以「切分」(split)集合為彼此獨立的部分,供平行處理使用。

關鍵方法:

  • tryAdvance(Consumer<? super T> action) — 處理下一個元素。
  • trySplit() — 嘗試把集合切分為兩部分(回傳其中一部分的 Spliterator)。
  • estimateSize() — 剩餘元素數量的估計值。
  • characteristics() — 特性的位元遮罩(ORDEREDSIZEDSUBSIZED 等)。

trySplit:切分策略

平衡切分 對平行串流很重要:trySplit 應該回傳大小大致相當的部分,以便讓各工作執行緒負載更均衡。

如果無法切分(例如元素太少)——就回傳 null

範例:分批讀取檔案的 Spliterator
假設你有一個很大的檔案,希望每次處理 1000 行(以批次方式),避免一次把所有內容載入記憶體。

public class ChunkedLineSpliterator implements Spliterator<List<String>> {
    private final BufferedReader reader;
    private final int chunkSize;

    public ChunkedLineSpliterator(BufferedReader reader, int chunkSize) {
        this.reader = reader;
        this.chunkSize = chunkSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<String>> action) {
        List<String> chunk = new ArrayList<>(chunkSize);
        try {
            String line;
            for (int i = 0; i < chunkSize && (line = reader.readLine()) != null; i++) {
                chunk.add(line);
            }
            if (chunk.isEmpty()) return false;
            action.accept(chunk);
            return true;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override
    public Spliterator<List<String>> trySplit() {
        // 對於從檔案串流式讀取,切分沒有意義 — 回傳 null
        return null;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE; // 事先未知
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL;
    }
}

用法:

try (BufferedReader reader = Files.newBufferedReader(Path.of("big.txt"))) {
    StreamSupport.stream(new ChunkedLineSpliterator(reader, 1000), false)
        .forEach(chunk -> processChunk(chunk));
}

Spliterator 的特性

  • ORDERED — 元素具有特定順序(例如清單)。
  • SIZED — 已知確切的元素數量。
  • SUBSIZED — 由 trySplit 取得的所有 Spliterator 也都具有 SIZED
  • IMMUTABLE — 遍歷期間來源不會改變。
  • CONCURRENT — 來源支援安全的平行修改。
  • DISTINCTSORTEDNONNULL — 其他屬性。

重要:正確標註特性會影響串流的最佳化。

6. 範例

  • 分批(chunk)讀取檔案 — 讓你能分段處理大型檔案,而不必一次把全部內容載入記憶體。
  • 以最少配置進行解析 — 當你解析位元組/字元串流並希望最小化臨時物件的建立時,可以實作一個回傳來源陣列「視窗」或「切片」的 Spliterator。

範例:逐行解析 CSV 的 Spliterator

public class CsvLineSpliterator implements Spliterator<String[]> {
    private final BufferedReader reader;

    public CsvLineSpliterator(BufferedReader reader) {
        this.reader = reader;
    }

    @Override
    public boolean tryAdvance(Consumer<? super String[]> action) {
        try {
            String line = reader.readLine();
            if (line == null) return false;
            action.accept(line.split(","));
            return true;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override
    public Spliterator<String[]> trySplit() {
        return null; // 序列式解析
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL;
    }
}

7. 與 parallel() 整合 — 如何安全地進行

  • 如果你的 Spliterator 支援平行切分(trySplit 回傳的不是 null),且特性包含 SIZED/SUBSIZED,Stream API 就能有效地平行化處理。
  • 對於串流式來源(檔案、socket),通常不支援切分——請使用序列串流。
  • 對於集合與陣列——請實作平衡的切分策略(例如把陣列對半分)。

範例:用於陣列的 Spliterator

public class ArraySpliterator<T> implements Spliterator<T> {
    private final T[] array;
    private int start, end;

    public ArraySpliterator(T[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (start < end) {
            action.accept(array[start++]);
            return true;
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        int mid = (start + end) >>> 1;
        if (mid == start) return null;
        ArraySpliterator<T> split = new ArraySpliterator<>(array, start, mid);
        start = mid;
        return split;
    }

    @Override
    public long estimateSize() {
        return end - start;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | IMMUTABLE;
    }
}

用法:

String[] arr = {"a", "b", "c", "d"};
StreamSupport.stream(new ArraySpliterator<>(arr, 0, arr.length), true)
    .forEach(System.out::println);
1
問卷/小測驗
優化與集合的合作,等級 33,課堂 4
未開放
優化與集合的合作
優化與集合的合作
留言
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION