为什么需要 Executor 接口?

在 Java 5 之前,您必须在应用程序中编写所有自己的线程管理代码。此外,创建一个新线程object 是一个资源密集型操作,为每个轻量级任务创建一个新线程没有意义。并且因为这个问题对于多线程应用程序的每个开发人员来说绝对是熟悉的,所以他们决定将此功能作为Executor框架引入 Java 。

有什么好主意?很简单:不是为每个新任务创建一个新线程,线程被保存在一种“存储”中,当一个新任务到达时,我们检索一个现有线程而不是创建一个新线程。

该框架的主要接口是ExecutorExecutorServiceScheduledExecutorService,每个接口都扩展了前一个的功能。

Executor 接口是基本接口。它声明了一个由Runnable对象实现的void execute(Runnable command)方法。

ExecutorService接口更有趣它有管理工作完成的方法,也有返回某种结果的方法。让我们仔细看看它的方法:

方法 描述
无效关机(); 调用此方法会停止ExecutorService。所有已经提交处理的任务都会完成,但不会接受新的任务。
列表<Runnable> shutdownNow();

调用此方法会停止ExecutorService。所有已经提交处理的任务都会调用Thread.interrupt 。此方法返回排队任务的列表。

该方法不会等待调用该方法时“正在进行”的所有任务完成。

警告:调用此方法可能会泄漏资源。

布尔 isShutdown(); 检查ExecutorService是否停止。
布尔isTerminated(); 如果在ExecutorService关闭后所有任务都已完成,则返回 true 。在调用shutdown()shutdownNow()之前,它将始终返回false
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

调用shutdown()方法后,此方法将阻塞调用它的线程,直到满足以下条件之一:

  • 所有预定任务都已完成;
  • 传递给方法的超时已经过去;
  • 当前线程被中断。

如果所有任务都已完成,则返回true ,如果在终止前超时已过,则返回false 。

<T> 未来<T> 提交(可调用<T> 任务);

向ExecutorService添加一个Callable任务并返回一个实现Future接口的对象。

<T>是传递任务的结果类型。

<T> Future<T> submit(Runnable task, T result);

向ExecutorService添加一个Runnable任务并返回一个实现Future接口的对象。

T结果参数是通过对结果调用get()方法返回的内容未来的对象。

未来 <?> 提交(可运行任务);

向ExecutorService添加一个Runnable任务并返回一个实现Future接口的对象。

如果我们在生成的Future对象上调用get()方法,那么我们将得到 null。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

将Callable任务列表传递给ExecutorService。返回一个 Futures 列表,我们可以从中获取工作结果。当所有提交的任务完成时返回此列表。

如果在方法运行时修改了任务集合,则此方法的结果是未定义的。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

将Callable任务列表传递给ExecutorService。返回一个 Futures 列表,我们可以从中获取工作结果。当所有传递的任务完成时,或者传递给方法的超时时间过去后,以先到者为准,返回此列表。

如果超时结束,未完成的任务将被取消。

注意:取消的任务可能不会停止运行(我们将在示例中看到这种副作用)。

如果在方法运行时修改了任务集合,则此方法的结果是未定义的。

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

将Callable任务列表传递给ExecutorService。返回完成而没有抛出异常(如果有)的任务之一(如果有)的结果。

如果在方法运行时修改了任务集合,则此方法的结果是未定义的。

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

将Callable任务列表传递给ExecutorService。返回在传递给方法的超时结束前完成且未抛出异常的任务之一(如果有)的结果。

如果在方法运行时修改了任务集合,则此方法的结果是未定义的。

让我们看一个使用ExecutorService的小例子。


import java.util.List;
import java.util.concurrent.*;

public class ExecutorServiceTest {
   public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
//Create an ExecutorService for 2 threads
       java.util.concurrent.ExecutorService executorService = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
// Create 5 tasks
       MyRunnable task1 = new MyRunnable();
       MyRunnable task2 = new MyRunnable();
       MyRunnable task3 = new MyRunnable();
       MyRunnable task4 = new MyRunnable();
       MyRunnable task5 = new MyRunnable();

       final List<MyRunnable> tasks = List.of(task1, task2, task3, task4, task5);
// Pass a list that contains the 5 tasks we created
       final List<Future<Void>> futures = executorService.invokeAll(tasks, 6, TimeUnit.SECONDS);
       System.out.println("Futures received");

// Stop the ExecutorService
       executorService.shutdown();

       try {
           TimeUnit.SECONDS.sleep(3);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }

       System.out.println(executorService.isShutdown());
       System.out.println(executorService.isTerminated());
   }

   public static class MyRunnable implements Callable<Void> {

       @Override
       public void call() {
// Add 2 delays. When the ExecutorService is stopped, we will see which delay is in progress when the attempt is made to stop execution of the task
           try {
               TimeUnit.SECONDS.sleep(3);
           } catch (InterruptedException e) {
               System.out.println("sleep 1: " + e.getMessage());
           }
           try {
               TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
               System.out.println("sleep 2: " + e.getMessage());
           }
           System.out.println("done");
           return null;
       }
   }
}

输出:

done
done
Futures 收到
sleep 1: sleep interrupted
sleep 1: sleep interrupted
done
done done
true
true

每个任务运行 5 秒。我们为两个线程创建了一个池,因此前两行输出非常有意义。

程序启动六秒后,invokeAll方法超时,结果作为Futures列表返回。这可以从Futures received 的输出字符串中看出。

前两项任务完成后,还有两项任务开始。但是因为invokeAll方法中设置的超时已经过去,这两个任务没有时间完成。他们收到“取消”命令。这就是为什么输出有两行sleep 1: sleep interrupted

然后你可以看到另外两行done这是我在描述invokeAll方法时提到的副作用。

第五个也是最后一个任务甚至从未开始,所以我们在输出中看不到任何关于它的信息。

最后两行是调用isShutdownisTerminated方法的结果。

在调试模式下运行此示例并在超时结束后查看任务状态也很有趣(在executorService.shutdown();行上设置断点):

我们看到两个任务正常完成,三个任务被“取消”

预定执行服务

为了结束我们对执行程序的讨论,让我们看一下ScheduledExecutorService

它有4个方法:

方法 描述
public Sc​​heduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); 安排传递的Runnable任务在指定为参数的延迟后运行一次。
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); 安排传递的Callable任务在指定为参数的延迟后运行一次。
public Sc​​heduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); 安排定期执行传递的任务,该任务将在initialDelay之后第一次执行,并且每个后续运行都将在period之后开始。
public Sc​​heduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); 安排定期执行传递的任务,该任务将在initialDelay之后第一次执行,并且每个后续运行将在delay之后开始(上一次运行完成和当前运行开始之间的时间段)。