介紹
所以,我們知道Java有線程。您可以在標題為Better together:Java 和 Thread 類的評論中閱讀相關內容。第一部分 — 執行線程。 我們再看一下典型的代碼:
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
如您所見,啟動任務的代碼非常典型,但我們必須為新任務重複它。一種解決方案是將其放在單獨的方法中,例如execute(Runnable runnable)
. 但是 Java 的創造者考慮了我們的困境並提出了接口Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
這段代碼顯然更加簡潔:現在我們只需編寫代碼來啟動Runnable
線程。太好了,不是嗎?但這只是開始:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
接口有一個ExecutorService
子接口。該接口的 Javadoc 描述了ExecutorService
一個Executor
提供關閉Executor
. 它還可以獲取一個java.util.concurrent.Future
以便跟踪執行過程。以前,在Better together:Java 和 Thread 類中。第四部分 — Callable、Future 和朋友們,我們簡要回顧了Future
. 如果您忘記或從未讀過它,我建議您複習一下;) Javadoc 還說了什麼?它告訴我們,我們有一個特殊的java.util.concurrent.Executors
工廠,可以讓我們創建ExecutorService
.
執行服務
我們來複習。我們必須在線程上Executor
執行(即調用)某個任務,創建線程的代碼對我們是隱藏的。execute()
我們有ExecutorService
— 一個具體的Executor
,有幾個選項來控制進度。我們有Executors
工廠可以讓我們創建一個ExecutorService
. 現在讓我們自己做:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
可以看到我們指定了一個固定大小為2的線程池,然後我們將任務一個一個的提交到池中。每個任務返回一個String
包含線程名稱 ( currentThread().GetName()
) 的。ExecutorService
在最後關閉 很重要,否則我們的程序將不會結束。工廠Executors
有額外的工廠方法。例如,我們可以創建一個僅由一個線程組成的池 ( newSingleThreadExecutor
) 或一個包含緩存 ( newCachedThreadPool
) 的池,線程在空閒 1 分鐘後就會從緩存中刪除。實際上,這些由阻塞隊列ExecutorService
支持,任務被放入其中並從中執行。可以在此視頻中找到有關阻塞隊列的更多信息。你也可以閱讀這個回顧 BlockingQueue。並查看“何時更喜歡 LinkedBlockingQueue 而不是 ArrayBlockingQueue?”這個問題的答案。最簡單的來說,a在BlockingQueue
兩種情況下會阻塞一個線程:
- 線程嘗試從空隊列中獲取項目
- 線程嘗試將項目放入完整隊列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
或者
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如我們所見, 的實現ExecutorService
是在工廠方法內部創建的。在大多數情況下,我們談論的是ThreadPoolExecutor
. 僅更改影響工作的參數。
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
線程池執行器
正如我們之前看到的,ThreadPoolExecutor
通常是在工廠方法中創建的。該功能受我們作為最大和最小線程數以及所使用的隊列類型傳遞的參數的影響。但是java.util.concurrent.BlockingQueue
可以使用該接口的任何實現。說到ThreadPoolExecutor
,我們應該提到一些有趣的功能。ThreadPoolExecutor
例如,如果沒有可用空間,則 不能將任務提交到 a :
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
此代碼將崩潰並出現如下錯誤:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
換句話說,task
不能提交,因為SynchronousQueue
它被設計成實際上由一個元素組成,不允許我們在其中放入更多東西。我們可以看到這裡有零queued tasks
(“排隊任務 = 0”)。但這並沒有什麼奇怪的,因為這是 的一個特殊特徵SynchronousQueue
,它實際上是一個永遠為空的 1 元素隊列!當一個線程將一個元素放入隊列時,它會一直等待,直到另一個線程從隊列中取出該元素。因此,我們可以將其替換為new LinkedBlockingQueue<>(1)
,錯誤將更改為 now show queued tasks = 1
。因為隊列只有 1 個元素,我們不能添加第二個元素。這就是導致程序失敗的原因。繼續我們對隊列的討論,值得注意的是ThreadPoolExecutor
類具有用於服務隊列的其他方法。例如,該threadPoolExecutor.purge()
方法將從隊列中刪除所有已取消的任務,以釋放隊列中的空間。另一個有趣的隊列相關函數是拒絕任務的處理程序:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Rejected
在此示例中,每次拒絕隊列中的任務時, 我們的處理程序都會顯示。方便,不是嗎?此外,ThreadPoolExecutor
還有一個有趣的子類:ScheduledThreadPoolExecutor
,它是一個ScheduledExecutorService
. 它提供了基於計時器執行任務的能力。
預定執行服務
ScheduledExecutorService
(這是一種類型ExecutorService
)讓我們按計劃運行任務。讓我們看一個例子:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
這裡的一切都很簡單。提交任務,然後我們得到一個java.util.concurrent.ScheduledFuture
. 在以下情況下,時間表也可能會有幫助:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
在這裡,我們提交了一個Runnable
以固定頻率(“FixedRate”)執行的任務,並具有一定的初始延遲。在這種情況下,1 秒後,任務將開始每 2 秒執行一次。有一個類似的選項:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
但在這種情況下,任務是在每次執行之間以特定間隔執行的。也就是說,task
將在 1 秒後執行。然後,一完成,2秒過去,然後開始新的任務。以下是有關此主題的一些其他資源:
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
除了上面的線程池,還有一個。我們可以誠實地說它有點特別。它被稱為工作竊取池。簡而言之,工作竊取是一種算法,其中空閒線程開始從其他線程或共享隊列中獲取任務。讓我們看一個例子:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
如果我們運行這段代碼,那麼ExecutorService
將為我們創建 5 個線程,因為每個線程都會被放入鎖對象的等待隊列中。我們已經在Better together: Java 和 Thread 類中找到了監視器和鎖。第二部分 — 同步。現在讓我們替換Executors.newCachedThreadPool()
為Executors.newWorkStealingPool()
. 什麼會改變?我們將看到我們的任務在少於 5 個線程上執行。還記得CachedThreadPool
為每個任務創建一個線程嗎?那是因為wait()
阻塞了線程,後續的任務想要完成,在池中為它們創建了新的線程。使用竊取池,線程不會永遠閒置。他們開始執行鄰居的任務。是什麼讓 aWorkStealingPool
與其他線程池如此不同?神奇的事實ForkJoinPool
住在裡面:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
實際上,還有一個區別。默認情況下,為 a 創建的線程ForkJoinPool
是守護線程,這與通過 onrdinary 創建的線程不同ThreadPool
。一般來說,你應該記住守護線程,因為,例如,CompletableFuture
也使用守護線程,除非你指定你自己的ThreadFactory
創建非守護線程。這些都是可能潛伏在意想不到的地方的驚喜!:)
ForkJoinPool
在這一部分中,我們將再次討論ForkJoinPool
(也稱為 fork/join 框架),它位於WorkStealingPool
. 一般來說,fork/join 框架出現在 Java 1.7 中。即使 Java 11 近在咫尺,它仍然值得牢記。這不是最常見的實現,但非常有趣。網上對此有很好的評論:Understanding Java Fork-Join Framework with Examples。的ForkJoinPool
依賴java.util.concurrent.RecursiveTask
。還有java.util.concurrent.RecursiveAction
. RecursiveAction
不返回結果。因此,RecursiveTask
類似於Callable
,並且RecursiveAction
類似於unnable
。我們可以看到該名稱包括兩個重要方法的名稱:fork
和join
。這fork
方法在單獨的線程上異步啟動一些任務。該join
方法讓您等待工作完成。為了獲得最好的理解,您應該閱讀從命令式編程到 Fork/Join 到 Java 8 中的並行流。
概括
好了,這部分評論到此結束。我們了解到,它Executor
最初是為執行線程而發明的。然後 Java 的創造者決定繼續這個想法並想出了ExecutorService
. ExecutorService
讓我們使用submit()
和提交要執行的任務invoke()
,並關閉服務。因為ExecutorService
需要實現,他們寫了一個帶有工廠方法的類並調用它Executors
。它允許您創建線程池 ( ThreadPoolExecutor
)。此外,還有線程池也允許我們指定執行計劃。一個ForkJoinPool
躲在一個後面WorkStealingPool
。我希望你發現我上面寫的不僅有趣,而且可以理解 :) 我總是很高興聽到你的建議和評論。 更好的結合:Java 和 Thread 類。第 I 部分 — 執行的線程 更好地結合:Java 和 Thread 類。第二部分 — 同步 更好地結合:Java 和 Thread 類。第 III 部分 — 更好地交互:Java 和 Thread 類。第 IV 部分 — Callable、Future 和朋友 更好地結合在一起:Java 和 Thread 類。第六部分——開火!
GO TO FULL VERSION