Giới thiệu
Vì vậy, chúng ta biết rằng Java có luồng. Bạn có thể đọc về điều đó trong bài đánh giá có tựa đề Cùng nhau tốt hơn: Java và lớp Chủ đề. Phần I - Chủ đề thực hiện .
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
Như bạn có thể thấy, mã để bắt đầu một tác vụ khá điển hình, nhưng chúng ta phải lặp lại nó cho tác vụ mới. Một giải pháp là đặt nó trong một phương thức riêng, ví dụ: execute(Runnable runnable)
. Nhưng những người tạo ra Java đã xem xét hoàn cảnh khó khăn của chúng tôi và đưa ra giao Executor
diện:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
Mã này rõ ràng ngắn gọn hơn: bây giờ chúng ta chỉ cần viết mã để bắt đầu Runnable
chuỗi. Điều đó thật tuyệt phải không? Nhưng điều này chỉ là khởi đầu: 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
giao diện có một ExecutorService
giao diện phụ. Javadoc cho giao diện này nói rằng một ExecutorService
mô tả cụ thể Executor
cung cấp các phương thức để tắt Executor
. Nó cũng giúp bạn có thể lấy một thứ tự java.util.concurrent.Future
để theo dõi quá trình thực hiện. Trước đây, trong Tốt hơn cùng nhau: Java và lớp Chủ đề. Phần IV - Có thể gọi, Tương lai và bạn bè , chúng tôi đã xem xét ngắn gọn các khả năng của Future
. Nếu bạn quên hoặc chưa bao giờ đọc nó, tôi khuyên bạn nên làm mới bộ nhớ của mình;) Javadoc còn nói gì nữa? Nó cho chúng tôi biết rằng chúng tôi có một java.util.concurrent.Executors
nhà máy đặc biệt cho phép chúng tôi tạo các triển khai mặc định của ExecutorService
.
Người thi hànhDịch vụ
Hãy xem xét. Chúng ta phảiExecutor
thực thi (nghĩa là gọi execute()
) một tác vụ nhất định trên một luồng và mã tạo luồng bị ẩn đối với chúng ta. Chúng tôi có ExecutorService
— một công cụ cụ thể Executor
có một số tùy chọn để kiểm soát tiến độ. Và chúng tôi có Executors
nhà máy cho phép chúng tôi tạo tệp ExecutorService
. Bây giờ chúng ta hãy tự làm điều đó:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
Bạn có thể thấy rằng chúng tôi đã chỉ định một nhóm luồng cố định có kích thước là 2. Sau đó, chúng tôi gửi từng nhiệm vụ đến nhóm một. Mỗi tác vụ trả về một String
chứa tên chủ đề ( currentThread().GetName()
). Điều quan trọng là phải tắt máy ExecutorService
khi kết thúc, vì nếu không thì chương trình của chúng ta sẽ không kết thúc. Nhà Executors
máy có các phương pháp nhà máy bổ sung. Ví dụ: chúng ta có thể tạo một nhóm chỉ bao gồm một luồng ( newSingleThreadExecutor
) hoặc một nhóm bao gồm bộ đệm ( newCachedThreadPool
) mà các luồng sẽ bị xóa sau khi chúng không hoạt động trong 1 phút. Trên thực tế, những thứ này ExecutorService
được hỗ trợ bởi một hàng đợi chặn , trong đó các tác vụ được đặt vào và từ đó các tác vụ được thực thi. Thông tin thêm về việc chặn hàng đợi có thể được tìm thấy trong video này . Bạn cũng có thể đọc cái nàyđánh giá về BlockingQueue . Và kiểm tra câu trả lời cho câu hỏi "Khi nào thích LinkedBlockingQueue hơn ArrayBlockingQueue?" Nói một cách đơn giản nhất, một BlockingQueue
khối một luồng trong hai trường hợp:
- chuỗi cố gắng lấy các mục từ một hàng đợi trống
- chuỗi cố gắng đưa các mục vào hàng đợi đầy đủ
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
hoặc
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Như chúng ta có thể thấy, việc triển khai ExecutorService
được tạo bên trong các phương thức xuất xưởng. Và phần lớn, chúng ta đang nói về ThreadPoolExecutor
. Chỉ các tham số ảnh hưởng đến công việc được thay đổi. 
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
Như chúng ta đã thấy trước đó,ThreadPoolExecutor
là những gì thường được tạo bên trong các phương thức xuất xưởng. Chức năng bị ảnh hưởng bởi các đối số mà chúng tôi chuyển qua như số lượng luồng tối đa và tối thiểu, cũng như loại hàng đợi đang được sử dụng. Nhưng bất kỳ triển khai java.util.concurrent.BlockingQueue
giao diện nào cũng có thể được sử dụng. Nói về ThreadPoolExecutor
, chúng ta nên đề cập đến một số tính năng thú vị. Ví dụ: bạn không thể gửi tác vụ tới ThreadPoolExecutor
nếu không có chỗ trống:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Mã này sẽ bị lỗi với một lỗi như thế này:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Nói cách khác, task
không thể gửi được, bởi vì SynchronousQueue
nó được thiết kế sao cho nó thực sự bao gồm một phần tử duy nhất và không cho phép chúng tôi đưa thêm bất kỳ thứ gì vào đó. Chúng ta có thể thấy rằng chúng ta không có queued tasks
("tác vụ được xếp hàng = 0") ở đây. Nhưng không có gì lạ về điều này, bởi vì đây là một tính năng đặc biệt của SynchronousQueue
, mà thực tế là hàng đợi 1 phần tử luôn trống! Khi một luồng đặt một phần tử vào hàng đợi, nó sẽ đợi cho đến khi một luồng khác lấy phần tử đó từ hàng đợi. Theo đó, chúng ta có thể thay thế nó bằng new LinkedBlockingQueue<>(1)
và lỗi sẽ chuyển thành now show queued tasks = 1
. Vì hàng đợi chỉ có 1 phần tử nên chúng ta không thể thêm phần tử thứ hai. Và đó là nguyên nhân khiến chương trình bị lỗi. Tiếp tục thảo luận về hàng đợi, điều đáng chú ý làThreadPoolExecutor
lớp có các phương thức bổ sung để phục vụ hàng đợi. Ví dụ: threadPoolExecutor.purge()
phương thức sẽ xóa tất cả các tác vụ đã hủy khỏi hàng đợi để giải phóng dung lượng trong hàng đợi. Một chức năng thú vị khác liên quan đến hàng đợi là trình xử lý các tác vụ bị từ chối:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Trong ví dụ này, trình xử lý của chúng tôi chỉ hiển thị Rejected
mỗi khi một tác vụ trong hàng đợi bị từ chối. Tiện lợi phải không nào? Ngoài ra, ThreadPoolExecutor
có một lớp con thú vị: ScheduledThreadPoolExecutor
, đó là tệp ScheduledExecutorService
. Nó cung cấp khả năng thực hiện một tác vụ dựa trên bộ đếm thời gian.
Theo lịch trìnhExecutorDịch vụ
ScheduledExecutorService
(là một loại ExecutorService
) cho phép chúng tôi chạy các tác vụ theo lịch trình. Hãy xem xét một ví dụ:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Mọi thứ đều đơn giản ở đây. Các nhiệm vụ được gửi và sau đó chúng tôi nhận được tệp java.util.concurrent.ScheduledFuture
. Một lịch trình cũng có thể hữu ích trong tình huống sau:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Tại đây, chúng tôi gửi một Runnable
tác vụ để thực hiện ở tần suất cố định ("FixedRate") với độ trễ ban đầu nhất định. Trong trường hợp này, sau 1 giây, tác vụ sẽ bắt đầu được thực hiện sau mỗi 2 giây. Có một tùy chọn tương tự:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Nhưng trong trường hợp này, các tác vụ được thực hiện với một khoảng thời gian cụ thể GIỮA mỗi lần thực hiện. Tức là, ý chí task
sẽ được thực thi sau 1 giây. Sau đó, ngay sau khi hoàn thành, 2 giây sẽ trôi qua và sau đó một nhiệm vụ mới sẽ được bắt đầu. Dưới đây là một số tài nguyên bổ sung về chủ đề này:
- Giới thiệu về thread pool trong Java
- Giới thiệu về Thread Pools trong Java
- Java Multithreading Steeplechase: Hủy tác vụ trong Executor
- Sử dụng Java Executor cho các tác vụ nền

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
Công ViệcTrộm CắpHồ Bơi
Ngoài các nhóm chủ đề trên, còn có một nhóm nữa. Chúng tôi có thể thành thật nói rằng nó là một chút đặc biệt. Nó được gọi là một nhóm ăn cắp công việc. Nói tóm lại, đánh cắp công việc là một thuật toán trong đó các luồng nhàn rỗi bắt đầu nhận các tác vụ từ các luồng khác hoặc các tác vụ từ một hàng đợi được chia sẻ. Hãy xem xét một ví dụ:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
Nếu chúng ta chạy mã này, thì nó ExecutorService
sẽ tạo ra 5 luồng cho chúng ta, bởi vì mỗi luồng sẽ được đưa vào hàng đợi cho đối tượng khóa. Chúng ta đã cùng nhau tìm ra màn hình và khóa trong Tốt hơn: Java và lớp Chủ đề. Phần II — Đồng bộ hóa . Bây giờ hãy thay thế Executors.newCachedThreadPool()
bằng Executors.newWorkStealingPool()
. Điều gì sẽ thay đổi? Chúng ta sẽ thấy rằng các tác vụ của chúng ta được thực hiện trên ít hơn 5 luồng. Hãy nhớ rằng CachedThreadPool
tạo một chủ đề cho mỗi nhiệm vụ? Đó là do wait()
luồng bị chặn, các tác vụ tiếp theo muốn được hoàn thành và các luồng mới được tạo cho chúng trong nhóm. Với một nhóm ăn cắp, các chủ đề không đứng yên mãi mãi. Họ bắt đầu thực hiện nhiệm vụ của hàng xóm. Điều gì tạo nên WorkStealingPool
sự khác biệt so với các nhóm chủ đề khác? Thực tế là phép thuậtForkJoinPool
sống bên trong nó:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Trên thực tế, có một sự khác biệt nữa. Theo mặc định, các luồng được tạo cho a ForkJoinPool
là các luồng daemon, không giống như các luồng được tạo thông qua onrdinary ThreadPool
. Nói chung, bạn nên nhớ các chủ đề daemon, bởi vì, chẳng hạn, CompletableFuture
cũng sử dụng các chủ đề daemon trừ khi bạn chỉ định riêng của mình ThreadFactory
để tạo các chủ đề không phải daemon. Đây là những điều bất ngờ có thể ẩn nấp ở những nơi không ngờ tới! :)
Ngã BaTham GiaBể Bơi
Trong phần này, một lần nữa chúng ta sẽ nói vềForkJoinPool
(còn được gọi là khuôn khổ fork/join), tồn tại "dưới vỏ bọc" của WorkStealingPool
. Nói chung, khung fork/join đã xuất hiện trở lại trong Java 1.7. Và mặc dù Java 11 đã gần ra mắt, nhưng nó vẫn đáng để ghi nhớ. Đây không phải là cách triển khai phổ biến nhất, nhưng nó khá thú vị. Có một đánh giá tốt về điều này trên web: Hiểu Java Fork-Join Framework với các ví dụ . Dựa ForkJoinPool
vào java.util.concurrent.RecursiveTask
. Ngoài ra còn có java.util.concurrent.RecursiveAction
. RecursiveAction
không trả lại kết quả. Do đó, RecursiveTask
tương tự với Callable
, và RecursiveAction
tương tự với unnable
. Chúng ta có thể thấy rằng tên bao gồm tên của hai phương thức quan trọng: fork
và join
. Cácfork
bắt đầu một số tác vụ không đồng bộ trên một luồng riêng biệt. Và join
phương pháp này cho phép bạn đợi công việc hoàn thành. Để hiểu rõ nhất, bạn nên đọc From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .
Bản tóm tắt
Vâng, điều đó kết thúc phần này của đánh giá. Chúng tôi đã học được rằngExecutor
ban đầu được phát minh ra để thực thi các luồng. Sau đó, những người tạo ra Java đã quyết định tiếp tục ý tưởng và đưa ra ExecutorService
. ExecutorService
cho phép chúng tôi gửi các tác vụ để thực thi bằng cách sử dụng submit()
và invoke()
đồng thời tắt dịch vụ. Vì ExecutorService
cần triển khai, họ đã viết một lớp với các phương thức xuất xưởng và gọi nó là Executors
. Nó cho phép bạn tạo nhóm chủ đề ( ThreadPoolExecutor
). Ngoài ra, có các nhóm luồng cũng cho phép chúng tôi chỉ định lịch thực hiện. Và một ForkJoinPool
ẩn đằng sau một WorkStealingPool
. Tôi hy vọng bạn thấy những gì tôi viết ở trên không chỉ thú vị mà còn dễ hiểu :) Tôi luôn vui mừng khi nghe những đề xuất và nhận xét của bạn. Kết hợp tốt hơn: Java và lớp Thread. Phần I - Các luồng thực thi Cùng nhau tốt hơn: Java và lớp Thread. Phần II — Đồng bộ hóa Tốt hơn khi kết hợp với nhau: Java và lớp Thread. Phần III — Tương tác tốt hơn khi kết hợp với nhau: Java và lớp Thread. Phần IV - Có thể gọi được, Tương lai và bạn bè Tốt hơn cùng nhau: Java và lớp Chủ đề. Phần VI — Bắn đi!
GO TO FULL VERSION