介绍
所以,我们知道Java有线程。您可以在标题为Better together:Java 和 Thread 类的评论中阅读相关内容。第一部分 — 执行线程。 我们再看一下典型的代码:
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
如您所见,启动任务的代码非常典型,但我们必须为新任务重复它。一种解决方案是将其放在单独的方法中,例如execute(Runnable runnable)
. 但是 Java 的创造者考虑了我们的困境并提出了接口Executor
:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
这段代码显然更加简洁:现在我们只需编写代码来启动Runnable
线程。太好了,不是吗?但这只是开始:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
接口有一个子ExecutorService
接口。该接口的 Javadoc 描述了一个ExecutorService
提供Executor
关闭Executor
. 它还可以获取一个java.util.concurrent.Future
以便跟踪执行过程。以前,在Better together:Java 和 Thread 类中。第四部分 — Callable、Future 和朋友们,我们简要回顾了Future
. 如果您忘记或从未读过它,我建议您复习一下;) Javadoc 还说了什么?它告诉我们,我们有一个特殊的java.util.concurrent.Executors
工厂,可以让我们创建ExecutorService
.
执行服务
我们来复习。我们必须在线程上Executor
执行(即调用execute()
)某个任务,创建线程的代码对我们是隐藏的。我们有ExecutorService
— 一个具体的Executor
,有几个选项来控制进度。我们有Executors
工厂可以让我们创建一个ExecutorService
. 现在让我们自己做:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
可以看到我们指定了一个固定大小为2的线程池,然后我们将任务一个一个的提交到池中。每个任务返回一个String
包含线程名称 ( currentThread().GetName()
) 的。ExecutorService
在最后关闭 很重要,否则我们的程序将不会结束。工厂Executors
有额外的工厂方法。例如,我们可以创建一个只包含一个线程的池 ( newSingleThreadExecutor
) 或一个包含缓存 ( newCachedThreadPool
) 的池,线程在空闲 1 分钟后就会从缓存中删除。实际上,这些由阻塞队列ExecutorService
支持,任务被放入其中并从中执行。可以在此视频中找到有关阻塞队列的更多信息。你也可以阅读这个回顾 BlockingQueue。并查看“何时更喜欢 LinkedBlockingQueue 而不是 ArrayBlockingQueue?”这个问题的答案。最简单的来说,a在BlockingQueue
两种情况下会阻塞一个线程:
- 线程尝试从空队列中获取项目
- 线程尝试将项目放入完整队列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
或者
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如我们所见, 的实现ExecutorService
是在工厂方法内部创建的。在大多数情况下,我们谈论的是ThreadPoolExecutor
. 仅更改影响工作的参数。
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
线程池执行器
正如我们之前看到的,ThreadPoolExecutor
通常是在工厂方法中创建的。该功能受我们作为最大和最小线程数以及所使用的队列类型传递的参数的影响。但是java.util.concurrent.BlockingQueue
可以使用该接口的任何实现。说到ThreadPoolExecutor
,我们应该提到一些有趣的功能。ThreadPoolExecutor
例如,如果没有可用空间,则 不能将任务提交到 a :
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
此代码将崩溃并出现如下错误:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
换句话说,task
不能提交,因为SynchronousQueue
它被设计成实际上由一个元素组成,不允许我们在其中放入更多东西。我们可以看到这里有零queued tasks
(“排队任务 = 0”)。但这并没有什么奇怪的,因为这是 的一个特殊特征SynchronousQueue
,它实际上是一个永远为空的 1 元素队列!当一个线程将一个元素放入队列时,它会一直等待,直到另一个线程从队列中取出该元素。因此,我们可以将其替换为new LinkedBlockingQueue<>(1)
,错误将更改为 now show queued tasks = 1
。因为队列只有 1 个元素,我们不能添加第二个元素。这就是导致程序失败的原因。继续我们对队列的讨论,值得注意的是ThreadPoolExecutor
类具有用于服务队列的其他方法。例如,该threadPoolExecutor.purge()
方法将从队列中删除所有已取消的任务,以释放队列中的空间。另一个有趣的队列相关函数是拒绝任务的处理程序:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
Rejected
在此示例中,每次拒绝队列中的任务时, 我们的处理程序都会显示。方便,不是吗?此外,ThreadPoolExecutor
还有一个有趣的子类:ScheduledThreadPoolExecutor
,它是一个ScheduledExecutorService
. 它提供了基于计时器执行任务的能力。
预定执行服务
ScheduledExecutorService
(这是一种类型ExecutorService
)让我们按计划运行任务。让我们看一个例子:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
这里的一切都很简单。提交任务,然后我们得到一个java.util.concurrent.ScheduledFuture
. 在以下情况下,时间表也可能会有帮助:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
在这里,我们提交了一个Runnable
以固定频率(“FixedRate”)执行的任务,并具有一定的初始延迟。在这种情况下,1 秒后,任务将开始每 2 秒执行一次。有一个类似的选项:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
但在这种情况下,任务是在每次执行之间以特定间隔执行的。也就是说,task
将在 1 秒后执行。然后,一完成,2秒过去,然后开始新的任务。以下是有关此主题的一些其他资源:
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
除了上面的线程池,还有一个。我们可以诚实地说它有点特别。它被称为工作窃取池。简而言之,工作窃取是一种算法,其中空闲线程开始从其他线程或共享队列中获取任务。让我们看一个例子:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
如果我们运行这段代码,那么ExecutorService
将为我们创建 5 个线程,因为每个线程都会被放入锁对象的等待队列中。我们已经在Better together: Java 和 Thread 类中找到了监视器和锁。第二部分 — 同步。现在让我们替换Executors.newCachedThreadPool()
为Executors.newWorkStealingPool()
. 什么会改变?我们将看到我们的任务在少于 5 个线程上执行。还记得CachedThreadPool
为每个任务创建一个线程吗?那是因为wait()
阻塞了线程,后续的任务想要完成,在池中为它们创建了新的线程。使用窃取池,线程不会永远闲置。他们开始执行邻居的任务。是什么让 aWorkStealingPool
与其他线程池如此不同?神奇的事实ForkJoinPool
住在里面:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
实际上,还有一个区别。默认情况下,为 a 创建的线程ForkJoinPool
是守护线程,这与通过 onrdinary 创建的线程不同ThreadPool
。一般来说,你应该记住守护线程,因为,例如,CompletableFuture
也使用守护线程,除非你指定你自己的ThreadFactory
创建非守护线程。这些都是可能潜伏在意想不到的地方的惊喜!:)
ForkJoinPool
在这一部分中,我们将再次讨论ForkJoinPool
(也称为 fork/join 框架),它位于WorkStealingPool
. 一般来说,fork/join 框架出现在 Java 1.7 中。即使 Java 11 近在咫尺,它仍然值得牢记。这不是最常见的实现,但非常有趣。网上对此有很好的评论:Understanding Java Fork-Join Framework with Examples。的ForkJoinPool
依赖java.util.concurrent.RecursiveTask
。还有java.util.concurrent.RecursiveAction
. RecursiveAction
不返回结果。因此,RecursiveTask
类似于Callable
,并且RecursiveAction
类似于unnable
。我们可以看到该名称包括两个重要方法的名称:fork
和join
。这fork
方法在单独的线程上异步启动一些任务。该join
方法让您等待工作完成。为了获得最好的理解,您应该阅读从命令式编程到 Fork/Join 到 Java 8 中的并行流。
概括
好了,这部分评论到此结束。我们了解到,它Executor
最初是为执行线程而发明的。然后 Java 的创造者决定继续这个想法并想出了ExecutorService
. ExecutorService
让我们使用submit()
和提交要执行的任务invoke()
,并关闭服务。因为ExecutorService
需要实现,他们写了一个带有工厂方法的类并调用它Executors
。它允许您创建线程池 ( ThreadPoolExecutor
)。此外,还有线程池也允许我们指定执行计划。一个ForkJoinPool
躲在一个后面WorkStealingPool
。我希望你发现我上面写的不仅有趣,而且可以理解 :) 我总是很高兴听到你的建议和评论。 更好的结合:Java 和 Thread 类。第 I 部分 — 执行的线程 更好地结合:Java 和 Thread 类。第二部分 — 同步 更好地结合:Java 和 Thread 类。第 III 部分 — 更好地交互:Java 和 Thread 类。第 IV 部分 — Callable、Future 和朋友 更好地结合在一起:Java 和 Thread 类。第六部分——开火!
GO TO FULL VERSION