让我们弄清楚newWorkStealingPool方法,它为我们准备了一个ExecutorService 。

这个线程池很特别。它的行为是基于“窃取”工作的想法。

任务在处理器之间排队和分配。但是如果一个处理器很忙,那么另一个空闲的处理器可以从它那里窃取一个任务并执行它。这种格式是在 Java 中引入的,目的是减少多线程应用程序中的冲突。它建立在fork/join框架之上。

分叉/加入

fork/join框架中,任务被递归分解,即分解为子任务。然后将子任务单独执行,将子任务的结果组合起来形成原任务的结果。

fork方法在某个线程上异步启动任务,而join方法让您等待该任务完成。

newWorkStealingPool

newWorkStealingPool方法两个实现:


public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
 
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

从一开始,我们就注意到在幕后我们并没有调用ThreadPoolExecutor构造函数。在这里,我们正在使用ForkJoinPool实体。与ThreadPoolExecutor一样,它是AbstractExecutorService的实现。

我们有 2 种方法可供选择。首先,我们自己指出我们希望看到的并行性级别。如果我们不指定这个值,那么我们的池的并行度将等于 Java 虚拟机可用的处理器核心数。

仍然需要弄清楚它在实践中是如何工作的:


Collection<Callable<Void>> tasks = new ArrayList<>();
        ExecutorService executorService = Executors.newWorkStealingPool(10);
 
        for (int i = 0; i < 10; i++) {
            int taskNumber = i;
            Callable<Void> callable = () -> {
                System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
                return null;
            };
            tasks.add(callable);
        }
        executorService.invokeAll(tasks);

我们创建了 10 个任务来显示它们自己的完成状态。之后,我们使用invokeAll方法启动所有任务。

在池中的 10 个线程上执行 10 个任务时的结果:

在 ForkJoinPool-1-worker-10 线程上处理的用户请求 #9
在 ForkJoinPool-1-worker-5 线程上
处理的用户请求 #4 在 ForkJoinPool-1-worker-8 线程上处理的用户请求 #7 在
ForkJoinPool- 上处理的用户请求 #1 1-worker-2 线程
在 ForkJoinPool-1-worker-3 线程上处理的用户请求 #2
在 ForkJoinPool-1-worker-4 线程上处理的用户请求 #3 在
ForkJoinPool-1-worker-7 线程上处理的用户请求 #6
处理的用户ForkJoinPool-1-worker-1 线程上的请求 #0 在
ForkJoinPool-1-worker-6 线程
上处理的用户请求 #5 在 ForkJoinPool-1-worker-9 线程上处理的用户请求 #8

我们看到队列形成后,线程就拿任务去执行。您还可以检查 20 个任务将如何分配到 10 个线程的池中。

在 ForkJoinPool-1-worker-4 线程上处理的用户请求 #3
在 ForkJoinPool-1-worker-8 线程上
处理的用户请求 #7 在 ForkJoinPool-1-worker-3 线程上处理的用户请求 #2
在 ForkJoinPool- 上处理的用户请求 #4 1-worker-5 线程
在 ForkJoinPool-1-worker-2 线程上处理的用户请求 #1
在 ForkJoinPool-1-worker-6 线程上
处理的用户请求 #5 在 ForkJoinPool-1-worker-9 线程上处理的用户请求 #8
处理的用户ForkJoinPool-1-worker-10 线程上的请求 #9
ForkJoinPool-1-worker-1 线程上
已处理的用户请求 #0 ForkJoinPool-1-worker-7 线程上已处理的用户请求 #6
ForkJoinPool-1-上已处理的用户请求 #10 worker-9 线程
在 ForkJoinPool-1-worker-1 线程上处理的用户请求 #12
在 ForkJoinPool-1-worker-8 线程
上处理的用户请求 #13 在 ForkJoinPool-1-worker-6 线程上处理的用户请求 #11 在
ForkJoinPool- 上处理的用户请求 #15 1-worker-8 线程
在 ForkJoinPool-1-worker-1 线程上处理的用户请求 #14
在 ForkJoinPool-1-worker-6 线程上处理的用户请求 #17 在
ForkJoinPool-1-worker-7 线程上处理的用户请求 #16
处理的用户ForkJoinPool-1-worker-6 线程上的请求 #19
已处理的 ForkJoinPool-1-worker-1 线程上的用户请求 #18

从输出中,我们可以看到一些线程设法完成了几个任务(ForkJoinPool-1-worker-6完成了 4 个任务),而一些线程只完成了一个(ForkJoinPool-1-worker-2)。如果在调用方法的实现中加入 1 秒的延迟,情况就会发生变化。


Callable<Void> callable = () -> {
   System.out.println("Processed user request #" + taskNumber + " on thread " + Thread.currentThread().getName());
   TimeUnit.SECONDS.sleep(1);
   return null;
};

为了进行实验,让我们在另一台机器上运行相同的代码。结果输出:

在 ForkJoinPool-1-worker-23 线程上处理的用户请求 #2
在 ForkJoinPool-1-worker-31 线程
上处理的用户请求 #7 在 ForkJoinPool-1-worker-27 线程上处理的用户请求 #4 在
ForkJoinPool- 上处理的用户请求 #5 1-worker-13 线程
在 ForkJoinPool-1-worker-19 线程上处理的用户请求 #0
在 ForkJoinPool-1-worker-3 线程上处理的用户请求 #8 在
ForkJoinPool-1-worker-21 线程上处理的用户请求 #9
处理的用户ForkJoinPool-1-worker-17 线程上的请求 #6 在
ForkJoinPool-1-worker-9 线程
上处理的用户请求 #3 在 ForkJoinPool-1-worker-5 线程上
处理的用户请求 #1 在 ForkJoinPool-1- 上处理的用户请求 #12 worker-23线程
在 ForkJoinPool-1-worker-19 线程上处理的用户请求 #15
在 ForkJoinPool-1-worker-27 线程上
处理的用户请求 #14 在 ForkJoinPool-1-worker-3 线程上处理的用户请求 #11 在
ForkJoinPool- 上处理的用户请求 #13 1-worker-13 线程
在 ForkJoinPool-1-worker-31 线程上处理的用户请求 #10
在 ForkJoinPool-1-worker-5 线程上
处理的用户请求 #18 在 ForkJoinPool-1-worker-9 线程上处理的用户请求 #16
处理的用户ForkJoinPool-1-worker-21 线程上的请求 #17
已处理的 ForkJoinPool-1-worker-17 线程上的用户请求 #19

在此输出中,值得注意的是我们“请求”池中的线程。而且,工作线程的名称不是从一到十,而是有时大于十。查看唯一名称,我们看到确实有十个工人(3、5、9、13、17、19、21、23、27 和 31)。在这里问为什么会这样是很合理的?每当您不明白发生了什么时,请使用调试器。

这就是我们要做的。让我们投执行服务反对ForkJoinPool


final ForkJoinPool forkJoinPool = (ForkJoinPool) executorService;

在调用invokeAll方法后,我们将使用 Evaluate Expression 操作来检查此对象。为此,在invokeAll方法之后,添加任何语句,例如空 sout,并在其上设置断点。

我们可以看到池中有 10 个线程,但是工作线程数组的大小是 32。很奇怪,但是还好。让我们继续挖掘。创建池时,让我们尝试将并行度设置为 32 以上,比如 40。


ExecutorService executorService = Executors.newWorkStealingPool(40);

在调试器中,让我们看看再次 forkJoinPool 对象。

现在工作线程数组的大小是 128。我们可以假设这是 JVM 的内部优化之一。让我们尝试在JDK(openjdk-14)的代码中找到它:

正如我们所怀疑的那样:工作线程数组的大小是通过对并行度值执行按位操作来计算的。我们不需要试图弄清楚这里到底发生了什么。只知道存在这样的优化就足够了。

我们示例的另一个有趣方面是invokeAll方法的使用。值得注意的是,invokeAll方法可以返回一个结果,或者更确切地说是一个结果列表(在我们的例子中,一个List<Future<Void>>),我们可以在其中找到每个任务的结果。


var results = executorService.invokeAll(tasks);
        for (Future<Void> result : results) {
            // Process the task's result
        }

这种特殊类型的服务和线程池可用于具有可预测或至少隐式并发级别的任务。