CodeGym /Blog Java /Ngẫu nhiên /Cùng nhau tốt hơn: Java và lớp Thread. Phần V — Executor,...
John Squirrels
Mức độ
San Francisco

Cùng nhau tốt hơn: Java và lớp Thread. Phần V — Executor, ThreadPool, Fork/Join

Xuất bản trong nhóm

Giới thiệu

Vì vậy, chúng ta biết rằng Java có luồng. Bạn có thể đọc về điều đó trong bài đánh giá có tựa đề Cùng nhau tốt hơn: Java và lớp Chủ đề. Phần I - Chủ đề thực hiện . Cùng nhau tốt hơn: Java và lớp Thread.  Phần V — Executor, ThreadPool, Fork/Join - 1Hãy xem xét lại đoạn mã điển hình:

public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Như bạn có thể thấy, mã để bắt đầu một tác vụ khá điển hình, nhưng chúng ta phải lặp lại nó cho tác vụ mới. Một giải pháp là đặt nó trong một phương thức riêng, ví dụ: execute(Runnable runnable). Nhưng những người tạo ra Java đã xem xét hoàn cảnh khó khăn của chúng tôi và đưa ra giao Executordiện:

public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Mã này rõ ràng ngắn gọn hơn: bây giờ chúng ta chỉ cần viết mã để bắt đầu Runnablechuỗi. Điều đó thật tuyệt phải không? Nhưng điều này chỉ là khởi đầu: Cùng nhau tốt hơn: Java và lớp Thread.  Phần V — Executor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Như bạn có thể thấy, Executorgiao diện có một ExecutorServicegiao diện phụ. Javadoc cho giao diện này nói rằng một ExecutorServicemô tả cụ thể Executorcung cấp các phương thức để tắt Executor. Nó cũng giúp bạn có thể lấy một thứ tự java.util.concurrent.Futuređể theo dõi quá trình thực hiện. Trước đây, trong Tốt hơn cùng nhau: Java và lớp Chủ đề. Phần IV - Có thể gọi, Tương lai và bạn bè , chúng tôi đã xem xét ngắn gọn các khả năng của Future. Nếu bạn quên hoặc chưa bao giờ đọc nó, tôi khuyên bạn nên làm mới bộ nhớ của mình;) Javadoc còn nói gì nữa? Nó cho chúng tôi biết rằng chúng tôi có một java.util.concurrent.Executorsnhà máy đặc biệt cho phép chúng tôi tạo các triển khai mặc định của ExecutorService.

Người thi hànhDịch vụ

Hãy xem xét. Chúng ta phải Executorthực thi (nghĩa là gọi execute()) một tác vụ nhất định trên một luồng và mã tạo luồng bị ẩn đối với chúng ta. Chúng tôi có ExecutorService— một công cụ cụ thể Executorcó một số tùy chọn để kiểm soát tiến độ. Và chúng tôi có Executorsnhà máy cho phép chúng tôi tạo tệp ExecutorService. Bây giờ chúng ta hãy tự làm điều đó:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Bạn có thể thấy rằng chúng tôi đã chỉ định một nhóm luồng cố định có kích thước là 2. Sau đó, chúng tôi gửi từng nhiệm vụ đến nhóm một. Mỗi tác vụ trả về một Stringchứa tên chủ đề ( currentThread().GetName()). Điều quan trọng là phải tắt máy ExecutorServicekhi kết thúc, vì nếu không thì chương trình của chúng ta sẽ không kết thúc. Nhà Executorsmáy có các phương pháp nhà máy bổ sung. Ví dụ: chúng ta có thể tạo một nhóm chỉ bao gồm một luồng ( newSingleThreadExecutor) hoặc một nhóm bao gồm bộ đệm ( newCachedThreadPool) mà các luồng sẽ bị xóa sau khi chúng không hoạt động trong 1 phút. Trên thực tế, những thứ này ExecutorServiceđược hỗ trợ bởi một hàng đợi chặn , trong đó các tác vụ được đặt vào và từ đó các tác vụ được thực thi. Thông tin thêm về việc chặn hàng đợi có thể được tìm thấy trong video này . Bạn cũng có thể đọc cái nàyđánh giá về BlockingQueue . Và kiểm tra câu trả lời cho câu hỏi "Khi nào thích LinkedBlockingQueue hơn ArrayBlockingQueue?" Nói một cách đơn giản nhất, một BlockingQueuekhối một luồng trong hai trường hợp:
  • chuỗi cố gắng lấy các mục từ một hàng đợi trống
  • chuỗi cố gắng đưa các mục vào hàng đợi đầy đủ
Nếu chúng ta xem xét việc triển khai các phương thức xuất xưởng, chúng ta có thể thấy chúng hoạt động như thế nào. Ví dụ:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
hoặc

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Như chúng ta có thể thấy, việc triển khai ExecutorServiceđược tạo bên trong các phương thức xuất xưởng. Và phần lớn, chúng ta đang nói về ThreadPoolExecutor. Chỉ các tham số ảnh hưởng đến công việc được thay đổi. Cùng nhau tốt hơn: Java và lớp Thread.  Phần V — Executor, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Như chúng ta đã thấy trước đó, ThreadPoolExecutorlà những gì thường được tạo bên trong các phương thức xuất xưởng. Chức năng bị ảnh hưởng bởi các đối số mà chúng tôi chuyển qua như số lượng luồng tối đa và tối thiểu, cũng như loại hàng đợi đang được sử dụng. Nhưng bất kỳ triển khai java.util.concurrent.BlockingQueuegiao diện nào cũng có thể được sử dụng. Nói về ThreadPoolExecutor, chúng ta nên đề cập đến một số tính năng thú vị. Ví dụ: bạn không thể gửi tác vụ tới ThreadPoolExecutornếu không có chỗ trống:

public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Mã này sẽ bị lỗi với một lỗi như thế này:

Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Nói cách khác, taskkhông thể gửi được, bởi vì SynchronousQueuenó được thiết kế sao cho nó thực sự bao gồm một phần tử duy nhất và không cho phép chúng tôi đưa thêm bất kỳ thứ gì vào đó. Chúng ta có thể thấy rằng chúng ta không có queued tasks("tác vụ được xếp hàng = 0") ở đây. Nhưng không có gì lạ về điều này, bởi vì đây là một tính năng đặc biệt của SynchronousQueue, mà thực tế là hàng đợi 1 phần tử luôn trống! Khi một luồng đặt một phần tử vào hàng đợi, nó sẽ đợi cho đến khi một luồng khác lấy phần tử đó từ hàng đợi. Theo đó, chúng ta có thể thay thế nó bằng new LinkedBlockingQueue<>(1)và lỗi sẽ chuyển thành now show queued tasks = 1. Vì hàng đợi chỉ có 1 phần tử nên chúng ta không thể thêm phần tử thứ hai. Và đó là nguyên nhân khiến chương trình bị lỗi. Tiếp tục thảo luận về hàng đợi, điều đáng chú ý làThreadPoolExecutorlớp có các phương thức bổ sung để phục vụ hàng đợi. Ví dụ: threadPoolExecutor.purge()phương thức sẽ xóa tất cả các tác vụ đã hủy khỏi hàng đợi để giải phóng dung lượng trong hàng đợi. Một chức năng thú vị khác liên quan đến hàng đợi là trình xử lý các tác vụ bị từ chối:

public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Trong ví dụ này, trình xử lý của chúng tôi chỉ hiển thị Rejectedmỗi khi một tác vụ trong hàng đợi bị từ chối. Tiện lợi phải không nào? Ngoài ra, ThreadPoolExecutorcó một lớp con thú vị: ScheduledThreadPoolExecutor, đó là tệp ScheduledExecutorService. Nó cung cấp khả năng thực hiện một tác vụ dựa trên bộ đếm thời gian.

Theo lịch trìnhExecutorDịch vụ

ScheduledExecutorService(là một loại ExecutorService) cho phép chúng tôi chạy các tác vụ theo lịch trình. Hãy xem xét một ví dụ:

public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Mọi thứ đều đơn giản ở đây. Các nhiệm vụ được gửi và sau đó chúng tôi nhận được tệp java.util.concurrent.ScheduledFuture. Một lịch trình cũng có thể hữu ích trong tình huống sau:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Tại đây, chúng tôi gửi một Runnabletác vụ để thực hiện ở tần suất cố định ("FixedRate") với độ trễ ban đầu nhất định. Trong trường hợp này, sau 1 giây, tác vụ sẽ bắt đầu được thực hiện sau mỗi 2 giây. Có một tùy chọn tương tự:

scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
Nhưng trong trường hợp này, các tác vụ được thực hiện với một khoảng thời gian cụ thể GIỮA mỗi lần thực hiện. Tức là, ý chí tasksẽ được thực thi sau 1 giây. Sau đó, ngay sau khi hoàn thành, 2 giây sẽ trôi qua và sau đó một nhiệm vụ mới sẽ được bắt đầu. Dưới đây là một số tài nguyên bổ sung về chủ đề này: Cùng nhau tốt hơn: Java và lớp Thread.  Phần V — Executor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

Công ViệcTrộm CắpHồ Bơi

Ngoài các nhóm chủ đề trên, còn có một nhóm nữa. Chúng tôi có thể thành thật nói rằng nó là một chút đặc biệt. Nó được gọi là một nhóm ăn cắp công việc. Nói tóm lại, đánh cắp công việc là một thuật toán trong đó các luồng nhàn rỗi bắt đầu nhận các tác vụ từ các luồng khác hoặc các tác vụ từ một hàng đợi được chia sẻ. Hãy xem xét một ví dụ:

public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Nếu chúng ta chạy mã này, thì nó ExecutorServicesẽ tạo ra 5 luồng cho chúng ta, bởi vì mỗi luồng sẽ được đưa vào hàng đợi cho đối tượng khóa. Chúng ta đã cùng nhau tìm ra màn hình và khóa trong Tốt hơn: Java và lớp Chủ đề. Phần II — Đồng bộ hóa . Bây giờ hãy thay thế Executors.newCachedThreadPool()bằng Executors.newWorkStealingPool(). Điều gì sẽ thay đổi? Chúng ta sẽ thấy rằng các tác vụ của chúng ta được thực hiện trên ít hơn 5 luồng. Hãy nhớ rằng CachedThreadPooltạo một chủ đề cho mỗi nhiệm vụ? Đó là do wait()luồng bị chặn, các tác vụ tiếp theo muốn được hoàn thành và các luồng mới được tạo cho chúng trong nhóm. Với một nhóm ăn cắp, các chủ đề không đứng yên mãi mãi. Họ bắt đầu thực hiện nhiệm vụ của hàng xóm. Điều gì tạo nên WorkStealingPoolsự khác biệt so với các nhóm chủ đề khác? Thực tế là phép thuậtForkJoinPoolsống bên trong nó:

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Trên thực tế, có một sự khác biệt nữa. Theo mặc định, các luồng được tạo cho a ForkJoinPoollà các luồng daemon, không giống như các luồng được tạo thông qua onrdinary ThreadPool. Nói chung, bạn nên nhớ các chủ đề daemon, bởi vì, chẳng hạn, CompletableFuturecũng sử dụng các chủ đề daemon trừ khi bạn chỉ định riêng của mình ThreadFactoryđể tạo các chủ đề không phải daemon. Đây là những điều bất ngờ có thể ẩn nấp ở những nơi không ngờ tới! :)

Ngã BaTham GiaBể Bơi

Trong phần này, một lần nữa chúng ta sẽ nói về ForkJoinPool(còn được gọi là khuôn khổ fork/join), tồn tại "dưới vỏ bọc" của WorkStealingPool. Nói chung, khung fork/join đã xuất hiện trở lại trong Java 1.7. Và mặc dù Java 11 đã gần ra mắt, nhưng nó vẫn đáng để ghi nhớ. Đây không phải là cách triển khai phổ biến nhất, nhưng nó khá thú vị. Có một đánh giá tốt về điều này trên web: Hiểu Java Fork-Join Framework với các ví dụ . Dựa ForkJoinPoolvào java.util.concurrent.RecursiveTask. Ngoài ra còn có java.util.concurrent.RecursiveAction. RecursiveActionkhông trả lại kết quả. Do đó, RecursiveTasktương tự với Callable, và RecursiveActiontương tự với unnable. Chúng ta có thể thấy rằng tên bao gồm tên của hai phương thức quan trọng: forkjoin. Cácforkbắt đầu một số tác vụ không đồng bộ trên một luồng riêng biệt. Và joinphương pháp này cho phép bạn đợi công việc hoàn thành. Để hiểu rõ nhất, bạn nên đọc From Imperative Programming to Fork/Join to Parallel Streams in Java 8 .

Bản tóm tắt

Vâng, điều đó kết thúc phần này của đánh giá. Chúng tôi đã học được rằng Executorban đầu được phát minh ra để thực thi các luồng. Sau đó, những người tạo ra Java đã quyết định tiếp tục ý tưởng và đưa ra ExecutorService. ExecutorServicecho phép chúng tôi gửi các tác vụ để thực thi bằng cách sử dụng submit()invoke()đồng thời tắt dịch vụ. Vì ExecutorServicecần triển khai, họ đã viết một lớp với các phương thức xuất xưởng và gọi nó là Executors. Nó cho phép bạn tạo nhóm chủ đề ( ThreadPoolExecutor). Ngoài ra, có các nhóm luồng cũng cho phép chúng tôi chỉ định lịch thực hiện. Và một ForkJoinPoolẩn đằng sau một WorkStealingPool. Tôi hy vọng bạn thấy những gì tôi viết ở trên không chỉ thú vị mà còn dễ hiểu :) Tôi luôn vui mừng khi nghe những đề xuất và nhận xét của bạn. Kết hợp tốt hơn: Java và lớp Thread. Phần I - Các luồng thực thi Cùng nhau tốt hơn: Java và lớp Thread. Phần II — Đồng bộ hóa Tốt hơn khi kết hợp với nhau: Java và lớp Thread. Phần III — Tương tác tốt hơn khi kết hợp với nhau: Java và lớp Thread. Phần IV - Có thể gọi được, Tương lai và bạn bè Tốt hơn cùng nhau: Java và lớp Chủ đề. Phần VI — Bắn đi!
Bình luận
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION