為什麼需要 Executor 接口?

在 Java 5 之前,您必須在應用程序中編寫所有自己的線程管理代碼。此外,創建一個新線程object 是一個資源密集型操作,為每個輕量級任務創建一個新線程沒有意義。並且因為這個問題對於多線程應用程序的每個開發人員來說絕對是熟悉的,所以他們決定將此功能作為Executor框架引入 Java 。

有什麼好主意?很簡單:不是為每個新任務創建一個新線程,線程被保存在一種“存儲”中,當一個新任務到達時,我們檢索一個現有線程而不是創建一個新線程。

該框架的主要接口是ExecutorExecutorServiceScheduledExecutorService,每個接口都擴展了前一個的功能。

Executor 接口是基本接口。它聲明了一個由Runnable對象實現的void execute(Runnable command)方法。

ExecutorService接口更有趣它有管理工作完成的方法,也有返回某種結果的方法。讓我們仔細看看它的方法:

方法 描述
無效關機(); 調用此方法會停止ExecutorService。所有已經提交處理的任務都會完成,但不會接受新的任務。
列表<Runnable> shutdownNow();

調用此方法會停止ExecutorService。所有已經提交處理的任務都會調用Thread.interrupt 。此方法返回排隊任務的列表。

該方法不會等待調用該方法時“正在進行”的所有任務完成。

警告:調用此方法可能會洩漏資源。

布爾 isShutdown(); 檢查ExecutorService是否停止。
布爾isTerminated(); 如果在ExecutorService關閉後所有任務都已完成,則返回 true 。在調用shutdown()shutdownNow()之前,它將始終返回false
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

調用shutdown()方法後,此方法將阻塞調用它的線程,直到滿足以下條件之一:

  • 所有預定任務都已完成;
  • 傳遞給方法的超時已經過去;
  • 當前線程被中斷。

如果所有任務都已完成,則返回true ,如果在終止前超時已過,則返回false 。

<T> 未來<T> 提交(可調用<T> 任務);

向ExecutorService添加一個Callable任務並返回一個實現Future接口的對象。

<T>是傳遞任務的結果類型。

<T> Future<T> submit(Runnable task, T result);

向ExecutorService添加一個Runnable任務並返回一個實現Future接口的對象。

T結果參數是通過對結果調用get()方法返回的內容未來的對象。

未來 <?> 提交(可運行任務);

向ExecutorService添加一個Runnable任務並返回一個實現Future接口的對象。

如果我們在生成的Future對像上調用get()方法,那麼我們將得到 null。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

將Callable任務列表傳遞給ExecutorService。返回一個 Futures 列表,我們可以從中獲取工作結果。當所有提交的任務完成時返回此列表。

如果在方法運行時修改了任務集合,則此方法的結果是未定義的。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

將Callable任務列表傳遞給ExecutorService。返回一個 Futures 列表,我們可以從中獲取工作結果。當所有傳遞的任務完成時,或者傳遞給方法的超時時間過去後,以先到者為準,返回此列表。

如果超時結束,未完成的任務將被取消。

注意:取消的任務可能不會停止運行(我們將在示例中看到這種副作用)。

如果在方法運行時修改了任務集合,則此方法的結果是未定義的。

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

將Callable任務列表傳遞給ExecutorService。返回完成而沒有拋出異常(如果有)的任務之一(如果有)的結果。

如果在方法運行時修改了任務集合,則此方法的結果是未定義的。

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

將Callable任務列表傳遞給ExecutorService。返回在傳遞給方法的超時結束前完成且未拋出異常的任務之一(如果有)的結果。

如果在方法運行時修改了任務集合,則此方法的結果是未定義的。

讓我們看一個使用ExecutorService的小例子。


import java.util.List;
import java.util.concurrent.*;

public class ExecutorServiceTest {
   public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
//Create an ExecutorService for 2 threads
       java.util.concurrent.ExecutorService executorService = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
// Create 5 tasks
       MyRunnable task1 = new MyRunnable();
       MyRunnable task2 = new MyRunnable();
       MyRunnable task3 = new MyRunnable();
       MyRunnable task4 = new MyRunnable();
       MyRunnable task5 = new MyRunnable();

       final List<MyRunnable> tasks = List.of(task1, task2, task3, task4, task5);
// Pass a list that contains the 5 tasks we created
       final List<Future<Void>> futures = executorService.invokeAll(tasks, 6, TimeUnit.SECONDS);
       System.out.println("Futures received");

// Stop the ExecutorService
       executorService.shutdown();

       try {
           TimeUnit.SECONDS.sleep(3);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println(executorService.isShutdown());
       System.out.println(executorService.isTerminated());
   }

   public static class MyRunnable implements Callable<Void> {

       @Override
       public void call() {
// Add 2 delays. When the ExecutorService is stopped, we will see which delay is in progress when the attempt is made to stop execution of the task
           try {
               TimeUnit.SECONDS.sleep(3);
           } catch (InterruptedException e) {
               System.out.println("sleep 1: " + e.getMessage());
           }
           try {
               TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
               System.out.println("sleep 2: " + e.getMessage());
           }
           System.out.println("done");
           return null;
       }
   }
}

輸出:

done
done
Futures 收到
sleep 1: sleep interrupted
sleep 1: sleep interrupted
done
done done
true
true

每個任務運行 5 秒。我們為兩個線程創​​建了一個池,因此前兩行輸出非常有意義。

程序啟動六秒後,invokeAll方法超時,結果作為Futures列表返回。這可以從Futures received 的輸出字符串中看出。

前兩項任務完成後,還有兩項任務開始。但是因為invokeAll方法中設置的超時已經過去,這兩個任務沒有時間完成。他們收到“取消”命令。這就是為什麼輸出有兩行sleep 1: sleep interrupted

然後你可以看到另外兩行done這是我在描述invokeAll方法時提到的副作用。

第五個也是最後一個任務甚至從未開始,所以我們在輸出中看不到任何關於它的信息。

最後兩行是調用isShutdownisTerminated方法的結果。

在調試模式下運行此示例並在超時結束後查看任務狀態也很有趣(在executorService.shutdown();行上設置斷點):

我們看到兩個任務正常完成,三個任務被“取消”

預定執行服務

為了結束我們對執行程序的討論,讓我們看一下ScheduledExecutorService

它有4個方法:

方法 描述
public Sc​​heduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); 安排傳遞的Runnable任務在指定為參數的延遲後運行一次。
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); 安排傳遞的Callable任務在指定為參數的延遲後運行一次。
public Sc​​heduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); 安排定期執行傳遞的任務,該任務將在initialDelay之後第一次執行,並且每個後續運行都將在period之後開始。
public Sc​​heduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); 安排定期執行傳遞的任務,該任務將在initialDelay之後第一次執行,並且每個後續運行將在delay之後開始(上一次運行完成和當前運行開始之間的時間段)。