Viacheslav
Level 3
St. Petersburg

Better together: Java and the Thread class. Part V — Executor, ThreadPool, Fork/Join

Published in the Java Developer group
members

Introduction

So, we know that Java has threads. You can read about that in the review entitled Better together: Java and the Thread class. Part I — Threads of execution. Better together: Java and the Thread class. Part V — Executor, ThreadPool, Fork/Join - 1Let's take another look at the typical code:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
As you can see, the code to start a task is pretty typical, but we have to repeat it for new task. One solution is to put it in a separate method, e.g. execute(Runnable runnable). But Java's creators have considered our plight and came up with the Executor interface:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
This code is clearly more concise: now we simply write code to start the Runnable on the thread. That's great, isn't it? But this is only the beginning: Better together: Java and the Thread class. Part V — Executor, ThreadPool, Fork/Join - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

As you can see, the Executor interface has an ExecutorService subinterface. The Javadoc for this interface says that an ExecutorService describes a particular Executor that provides methods to shut down the Executor. It also makes it possible to get a java.util.concurrent.Future in order to track the execution process. Previously, in Better together: Java and the Thread class. Part IV — Callable, Future, and friends, we briefly reviewed the capabilities of Future. If you forgot or never read it, I suggest that you refresh your memory ;) What else does the Javadoc say? It tells us that we have a special java.util.concurrent.Executors factory that lets us create default implementations of ExecutorService.

ExecutorService

Let's review. We have Executor to execute (i.e. to call execute() on) a certain task on a thread, and the code that creates the thread is hidden from us. We have ExecutorService — a specific Executor that has several options for controlling progress. And we have the Executors factory that lets us create an ExecutorService. Now let's do it ourselves:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
You can see that we specified a fixed thread pool whose size is 2. Then we submit tasks to the pool one by one. Each task returns a String containing the thread name (currentThread().GetName()). It's important to shut down the ExecutorService at the very end, because otherwise our program won't end. The Executors factory has additional factory methods. For example, we can create a pool consisting of just one thread (newSingleThreadExecutor) or a pool that includes a cache (newCachedThreadPool) from which threads are removed after they are idle for 1 minute. In reality, these ExecutorService are backed by a blocking queue, into which tasks are placed and from which tasks are executed. More information about blocking queues can be found in this video. You can also read this review about BlockingQueue. And check out the answer to the question "When to prefer LinkedBlockingQueue over ArrayBlockingQueue?" In the simplest terms, a BlockingQueue blocks a thread in two cases:
  • the thread attempts to get items from an empty queue
  • the thread attempts to put items into a full queue
If we look at the implementation of the factory methods, we can see how they work. For example:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
As we can see, implementations of ExecutorService are created inside the factory methods. And for the most part, we're talking about ThreadPoolExecutor. Only the parameters affecting the work are changed. Better together: Java and the Thread class. Part V — Executor, ThreadPool, Fork/Join - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

As we saw earlier, ThreadPoolExecutor is what usually gets created inside the factory methods. The functionality is affected by the arguments we pass as the maximum and minimum number of threads, as well as which type of queue being used. But any implementation of the java.util.concurrent.BlockingQueue interface can be used. Speaking of ThreadPoolExecutor, we should mention some interesting features. For example, you can't submit tasks to a ThreadPoolExecutor if there is no available space:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
This code will crash with an error like this:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
In other words, task cannot be submitted, because SynchronousQueue is designed so that it actually consists of a single element and doesn't allow us to put anything more into it. We can see that we have zero queued tasks ("queued tasks = 0") here. But there's nothing strange about this, because this is a special feature of SynchronousQueue, which in fact is a 1-element queue that is always empty! When one thread puts an element in the queue, it will wait until another thread takes the element from the queue. Accordingly, we can replace it with new LinkedBlockingQueue<>(1) and the error will changed to now show queued tasks = 1. Because the queue is only 1 element, we can't add a second element. And that's what causes the program to fail. Continuing our discussion of queue, it is worth noting that the ThreadPoolExecutor class has additional methods for servicing the queue. For example, the threadPoolExecutor.purge() method will remove all canceled tasks from the queue in order to free up space in the queue. Another interesting queue-related function is the handler for rejected tasks:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
In this example, our handler simply displays Rejected each time a task in the queue is rejected. Convenient, isn't it? In addition, ThreadPoolExecutor has an interesting subclass: ScheduledThreadPoolExecutor, which is a ScheduledExecutorService. It provides the ability to perform a task based on a timer.

ScheduledExecutorService

ScheduledExecutorService (which is a type of ExecutorService) lets us run tasks on a schedule. Let's look at an example:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Everything is simple here. The tasks are submitted and then we get a java.util.concurrent.ScheduledFuture. A schedule may also be helpful in the following situation:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Here we submit a Runnable task for execution at a fixed frequency ("FixedRate") with a certain initial delay. In this case, after 1 second, the task will start to be executed every 2 seconds. There is a similar option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
But in this case, the tasks are performed with a specific interval BETWEEN each execution. That is, the task will be executed after 1 second. Then, as soon as it is completed, 2 seconds will pass, and then a new task will be started. Here are some additional resources on this topic: Better together: Java and the Thread class. Part V — Executor, ThreadPool, Fork/Join - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

In addition to the above thread pools, there is one more. We can honestly say that it is a little special. It's called a work-stealing pool. In short, work-stealing is an algorithm in which idle threads start taking tasks from other threads or tasks from a shared queue. Let's look at an example:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
If we run this code, then the ExecutorService will create 5 threads for us, because each thread will be put in the wait queue for the lock object. We already figured out monitors and locks in Better together: Java and the Thread class. Part II — Synchronization. Now let's replace Executors.newCachedThreadPool() with Executors.newWorkStealingPool(). What will change? We'll see that our tasks are executed on fewer than 5 threads. Remember that CachedThreadPool creates a thread for each task? That's because wait() blocked the thread, subsequent tasks want to be completed, and new threads were created for them in the pool. With a stealing pool, threads don't stand idle forever. They start performing their neighbors' tasks. What makes a WorkStealingPool so different from other thread pools? The fact that the magical ForkJoinPool lives inside it:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Actually, there is one more difference. By default, the threads created for a ForkJoinPool are daemon threads, unlike the threads created through an onrdinary ThreadPool. In general, you should remember daemon threads, because, for example, CompletableFuture also uses daemon threads unless you specify your own ThreadFactory that creates non-daemon threads. These are the surprises that may lurk in unexpected places! :)

ForkJoinPool

In this part, we'll again talk about ForkJoinPool (also called the fork/join framework), which lives "under the hood" of WorkStealingPool. In general, the fork/join framework appeared back in Java 1.7. And even though Java 11 is close at hand, it's still worth remembering. This isn't the most common implementation, but it is quite interesting. There's a good review about this on the web: Understanding Java Fork-Join Framework with Examples. The ForkJoinPool relies on java.util.concurrent.RecursiveTask. There's also java.util.concurrent.RecursiveAction. RecursiveAction doesn't return a result. Thus, RecursiveTask is similar to Callable, and RecursiveAction is similar to unnable. We can see that the name includes the names of two important methods: fork and join. The fork method starts some task asynchronously on a separate thread. And the join method lets you to wait for work to be done. To get the best understanding, you should read From Imperative Programming to Fork/Join to Parallel Streams in Java 8. Better together: Java and the Thread class. Part V — Executor, ThreadPool, Fork/Join - 5

Summary

Well, that wraps up this part of the review. We've learned that Executor was originally invented to execute threads. Then Java's creators decided to continue the idea and came up with ExecutorService. ExecutorService lets us submit tasks for execution using submit() and invoke(), and also shut down the service. Because ExecutorService needs implementations, they wrote a class with factory methods and called it Executors. It lets you create thread pools (ThreadPoolExecutor). Additionally, there are thread pools that also allow us specify an execution schedule. And a ForkJoinPool hides behind a WorkStealingPool. I hope you found what I wrote above not only interesting, but also understandable :) I am always glad to hear your suggestions and comments. Better together: Java and the Thread class. Part I — Threads of execution Better together: Java and the Thread class. Part II — Synchronization Better together: Java and the Thread class. Part III — Interaction Better together: Java and the Thread class. Part IV — Callable, Future, and friends Better together: Java and the Thread class. Part VI — Fire away!
Comments (4)
  • Popular
  • New
  • Old
You must be signed in to leave a comment
Thành Black
Level 49 , Hanoi
20 November 2021, 08:29
🤣🤣🤣🤣🤣🤣🤣🤣🤣 I do not know what are happen now?
MaGaby2280
Level 41 , Guatemala City, Guatemala
21 January 2021, 22:38
🤯🤯🤯🤯🤯🤯🤯🤯🤯🤯🤯
BlueJavaBanana
Level 37
18 October 2020, 16:44
What the HELL!?
Chottu Kumar
Level 0
27 April 2020, 07:56
HII