1. Ôn lại Stream API
Bạn đã quen với Stream API — đây là cách thuận tiện để làm việc với collection, cho phép viết mã gọn gàng, dễ hiểu để xử lý dữ liệu: lọc, sắp xếp, đếm, v.v.
Đây là một ví dụ kinh điển:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
System.out.println(sum); // 6 (2 + 4)
Trong ví dụ này, collection được biến thành stream (stream()), chỉ các số chẵn được chọn, sau đó chúng được chuyển thành int, và kết quả được cộng lại bằng sum().
Stream API giúp mã ngắn gọn và biểu đạt hơn: thay vì mô tả từng bước cách thực hiện, bạn chỉ cần nói cái gì muốn nhận. Ngoài ra, khi cần bạn có thể dễ dàng chuyển sang xử lý song song — chỉ với một dòng.
2. Stream song song: cú pháp và nguyên lý hoạt động
Làm thế nào để biến stream thành song song?
Rất đơn giản: thay vì stream() hãy dùng parallelStream(). Hoặc gọi phương thức .parallel() trên stream hiện có.
List<Integer> numbers = ...;
int sum = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
Hoặc như thế này:
numbers.stream()
.parallel() // chuyển thành stream song song
.filter(...)
.map(...)
.sum();
Bên trong diễn ra điều gì?
- Collection tự động được chia nhỏ thành các phần.
- Mỗi phần được xử lý trong một thread riêng (sử dụng ForkJoinPool — một pool thread chuyên dụng).
- Các kết quả được hợp nhất thành giá trị cuối cùng.
Nói cách khác, nếu bạn có CPU đa nhân, việc xử lý thực sự diễn ra song song — ví dụ, lọc và tính tổng có thể thực hiện đồng thời trên nhiều nhân.
Khi nào đặc biệt hữu ích?
- Xử lý collection lớn (hàng chục nghìn phần tử trở lên).
- Tính toán “nặng” cho mỗi phần tử.
- Không cần giữ thứ tự xử lý nghiêm ngặt.
Ví dụ: so sánh stream tuần tự và stream song song
Hãy xem ví dụ đơn giản với việc xử lý một mảng lớn.
import java.util.*;
import java.util.stream.*;
public class ParallelStreamDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());
// Stream tuần tự
long time1 = System.currentTimeMillis();
long count1 = numbers.stream()
.filter(n -> n % 2 == 0)
.count();
long time2 = System.currentTimeMillis();
System.out.println("Tuần tự: " + (time2 - time1) + " ms, số chẵn: " + count1);
// Stream song song
long time3 = System.currentTimeMillis();
long count2 = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.count();
long time4 = System.currentTimeMillis();
System.out.println("Song song: " + (time4 - time3) + " ms, số chẵn: " + count2);
}
}
Hãy thử chạy đoạn mã này trên máy của bạn — rất có thể stream song song sẽ xử lý collection nhanh hơn (đặc biệt nếu bạn có CPU đa nhân). Nhưng không phải lúc nào cũng vậy! Các lưu ý — ở phần dưới.
3. Cách nó hoạt động: ForkJoinPool và việc chia nhỏ tự động
Stream song song sử dụng bên dưới ForkJoinPool.commonPool(), tự động quản lý số lượng thread (thường tương đương số lõi CPU khả dụng).
Sơ đồ khái quát:
+-----------------------------+
| Bộ sưu tập của bạn |
+-----------------------------+
| 1 | 2 | 3 | ... | 10 triệu |
+----+----+----+-----+--------+
| | | |
v v v v
[Luồng1][Luồng2]...[LuồngN]
| | | |
+----+----+-----------+
|
[Hợp nhất kết quả]
Mỗi luồng xử lý phần của nó, sau đó kết quả được gộp lại.
4. Giới hạn và cạm bẫy
Stream song song không phải là nút “tăng tốc mọi thứ” thần kỳ. Đôi khi chúng còn làm chậm việc thực thi!
Khi không nên song song hóa:
- Collection nhỏ (tới khoảng ~1000 phần tử).
- Thao tác trên mỗi phần tử rất nhanh (ví dụ chỉ là n * 2).
- Bạn cần giữ thứ tự xử lý nghiêm ngặt (ví dụ, khi ghi nối tiếp vào tệp).
Tại sao? Việc tạo và đồng bộ thread cũng tốn thời gian. Nếu bản thân tác vụ “nhỏ”, chi phí phụ có thể lớn hơn lợi ích từ song song hóa.
Tác dụng phụ — kẻ thù của song song
Nếu các thao tác bên trong stream của bạn thay đổi biến bên ngoài, hãy hết sức cẩn thận!
Ví dụ xấu:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] sum = {0};
numbers.parallelStream().forEach(n -> sum[0] += n);
System.out.println(sum[0]); // ??? (bạn kỳ vọng 15, nhưng có thể nhận bất cứ gì)
Vì sao? Bởi nhiều thread đồng thời thay đổi cùng một biến — dẫn tới race condition (tranh chấp). Giá trị cuối cùng có thể sai.
Cách đúng — dùng các phương thức stream trả về kết quả:
int sum = numbers.parallelStream().mapToInt(n -> n).sum();
Không phải mọi collection đều “song song” tốt như nhau
Một số collection (ví dụ ArrayList thông thường) chia nhỏ rất tốt. Còn LinkedList hoặc stream vô hạn (ví dụ Stream.generate(...)) — thì không.
5. Thực hành: so sánh hiệu năng
Ví dụ: tìm số lớn nhất
import java.util.*;
import java.util.stream.*;
public class ParallelMaxDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 30_000_000)
.boxed()
.collect(Collectors.toList());
// Tuần tự
long t1 = System.currentTimeMillis();
int max1 = numbers.stream().max(Integer::compareTo).get();
long t2 = System.currentTimeMillis();
System.out.println("Tuần tự: " + (t2 - t1) + " ms, max = " + max1);
// Song song
long t3 = System.currentTimeMillis();
int max2 = numbers.parallelStream().max(Integer::compareTo).get();
long t4 = System.currentTimeMillis();
System.out.println("Song song: " + (t4 - t3) + " ms, max = " + max2);
}
}
Sẽ thấy gì? Trên các CPU đa nhân hiện đại, stream song song thường nhanh hơn. Nhưng nếu thay 30_000_000 bằng 1000, sẽ không có khác biệt — thậm chí đôi khi còn chậm hơn!
6. Ví dụ sử dụng: lọc, gom nhóm, sắp xếp
Lọc và đếm
List<String> names = Arrays.asList("Anya", "Boris", "Vasya", "Grisha", "Dasha", "Egor", "Zhenya");
long count = names.parallelStream()
.filter(name -> name.length() == 4)
.count();
System.out.println("Số tên dài 4: " + count);
Gom nhóm
List<String> words = Arrays.asList("mèo", "cá voi", "mèo", "chó", "cá voi", "mèo");
Map<String, Long> freq = words.parallelStream()
.collect(Collectors.groupingBy(
w -> w,
Collectors.counting()
));
System.out.println(freq); // {chó=1, cá voi=2, mèo=3}
Sắp xếp (nhưng ở đây song song không phải lúc nào cũng mang lại lợi ích!)
List<Integer> bigList = IntStream.rangeClosed(1, 5_000_000)
.boxed()
.collect(Collectors.toList());
long t1 = System.currentTimeMillis();
List<Integer> sorted = bigList.parallelStream()
.sorted()
.collect(Collectors.toList());
long t2 = System.currentTimeMillis();
System.out.println("Sắp xếp song song: " + (t2 - t1) + " ms");
7. Các lưu ý quan trọng và khuyến nghị
Khi nên dùng parallelStream()
- Collection lớn (hàng chục nghìn phần tử trở lên).
- Thao tác trên phần tử “nặng” (tính toán phức tạp, làm việc với tệp/mạng).
- Không phụ thuộc vào thứ tự phần tử.
- Không có tác dụng phụ (không thay đổi biến bên ngoài).
Khi KHÔNG nên dùng parallelStream()
- Collection nhỏ.
- Thao tác nhanh.
- Cần giữ thứ tự nghiêm ngặt.
- Có truy cập biến dùng chung (hãy cân nhắc collection an toàn luồng hoặc cách tiếp cận khác).
Làm sao biết có bao nhiêu thread được dùng?
Mặc định — bằng số lõi CPU: Runtime.getRuntime().availableProcessors(). Có thể thay đổi hành vi này qua thuộc tính hệ thống:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
Chỉ làm nếu bạn hiểu hệ quả — nếu không có thể “đầy tải” CPU và bị chậm.
8. Những lỗi thường gặp khi làm việc với stream song song
Lỗi #1: Tác dụng phụ bên trong forEach
Nhiều người nghĩ: “Giờ mình sẽ điền danh sách một cách song song!”
List<Integer> result = new ArrayList<>();
IntStream.range(0, 1_000)
.parallel()
.forEach(result::add); // NGUY HIỂM!
System.out.println(result.size()); // Kết quả — ngẫu nhiên!
Vì sao điều này tệ? ArrayList không an toàn luồng, và khi thêm đồng thời từ nhiều thread, kết quả không thể dự đoán: có thể thiếu, trùng, hoặc ném ngoại lệ.
Giải pháp: Dùng các phương thức thu thập của stream (collect) vốn tự đảm bảo an toàn, hoặc dùng collection chuyên dụng.
List<Integer> result = IntStream.range(0, 1_000)
.parallel()
.boxed()
.collect(Collectors.toList());
Lỗi #2: Trông đợi tăng tốc ở tác vụ nhỏ
Song song không miễn phí! Nếu collection nhỏ, stream song song có thể chậm hơn vì chi phí lập lịch và đồng bộ.
Lỗi #3: Sai lệch thứ tự
Nếu bạn cần thứ tự phần tử (ví dụ khi ghi vào tệp), đừng dùng stream song song — thứ tự không được đảm bảo (hoặc sẽ chậm hơn).
Lỗi #4: Dùng collection “không phù hợp”
Một số collection (ví dụ LinkedList, cấu trúc phi chuẩn) chia nhỏ kém — hiệu quả song song giảm.
Lỗi #5: Bỏ qua tính an toàn luồng khi thu thập kết quả
Nếu bạn tự thu thập kết quả (ví dụ thêm vào danh sách), hãy dùng collection an toàn luồng (CopyOnWriteArrayList, ConcurrentLinkedQueue) hoặc các phương thức thu thập của stream.
GO TO FULL VERSION