Introduction
So, we know that Java has threads. You can read about that in the review entitled Better together: Java and the Thread class. Part I — Threads of execution. Let's take another look at the typical code:
public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
As you can see, the code to start a task is pretty typical, but we have to repeat it for new task. One solution is to put it in a separate method, e.g. execute(Runnable runnable)
. But Java's creators have considered our plight and came up with the Executor
interface:
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
This code is clearly more concise: now we simply write code to start the Runnable
on the thread. That's great, isn't it? But this is only the beginning:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
interface has an ExecutorService
subinterface. The Javadoc for this interface says that an ExecutorService
describes a particular Executor
that provides methods to shut down the Executor
. It also makes it possible to get a java.util.concurrent.Future
in order to track the execution process. Previously, in Better together: Java and the Thread class. Part IV — Callable, Future, and friends, we briefly reviewed the capabilities of Future
. If you forgot or never read it, I suggest that you refresh your memory ;)
What else does the Javadoc say? It tells us that we have a special java.util.concurrent.Executors
factory that lets us create default implementations of ExecutorService
.
ExecutorService
Let's review. We haveExecutor
to execute (i.e. to call execute()
on) a certain task on a thread, and the code that creates the thread is hidden from us. We have ExecutorService
— a specific Executor
that has several options for controlling progress. And we have the Executors
factory that lets us create an ExecutorService
. Now let's do it ourselves:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
You can see that we specified a fixed thread pool whose size is 2. Then we submit tasks to the pool one by one. Each task returns a String
containing the thread name (currentThread().GetName()
). It's important to shut down the ExecutorService
at the very end, because otherwise our program won't end.
The Executors
factory has additional factory methods. For example, we can create a pool consisting of just one thread (newSingleThreadExecutor
) or a pool that includes a cache (newCachedThreadPool
) from which threads are removed after they are idle for 1 minute.
In reality, these ExecutorService
are backed by a blocking queue, into which tasks are placed and from which tasks are executed. More information about blocking queues can be found in this video. You can also read this review about BlockingQueue. And check out the answer to the question "When to prefer LinkedBlockingQueue over ArrayBlockingQueue?" In the simplest terms, a BlockingQueue
blocks a thread in two cases:
- the thread attempts to get items from an empty queue
- the thread attempts to put items into a full queue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
or
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
As we can see, implementations of ExecutorService
are created inside the factory methods. And for the most part, we're talking about ThreadPoolExecutor
. Only the parameters affecting the work are changed.
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
ThreadPoolExecutor
As we saw earlier,ThreadPoolExecutor
is what usually gets created inside the factory methods. The functionality is affected by the arguments we pass as the maximum and minimum number of threads, as well as which type of queue being used. But any implementation of the java.util.concurrent.BlockingQueue
interface can be used.
Speaking of ThreadPoolExecutor
, we should mention some interesting features. For example, you can't submit tasks to a ThreadPoolExecutor
if there is no available space:
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
This code will crash with an error like this:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
In other words, task
cannot be submitted, because SynchronousQueue
is designed so that it actually consists of a single element and doesn't allow us to put anything more into it. We can see that we have zero queued tasks
("queued tasks = 0") here. But there's nothing strange about this, because this is a special feature of SynchronousQueue
, which in fact is a 1-element queue that is always empty! When one thread puts an element in the queue, it will wait until another thread takes the element from the queue.
Accordingly, we can replace it with new LinkedBlockingQueue<>(1)
and the error will changed to now show queued tasks = 1
. Because the queue is only 1 element, we can't add a second element. And that's what causes the program to fail.
Continuing our discussion of queue, it is worth noting that the ThreadPoolExecutor
class has additional methods for servicing the queue. For example, the threadPoolExecutor.purge()
method will remove all canceled tasks from the queue in order to free up space in the queue. Another interesting queue-related function is the handler for rejected tasks:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
In this example, our handler simply displays Rejected
each time a task in the queue is rejected.
Convenient, isn't it? In addition, ThreadPoolExecutor
has an interesting subclass: ScheduledThreadPoolExecutor
, which is a ScheduledExecutorService
. It provides the ability to perform a task based on a timer.
ScheduledExecutorService
ScheduledExecutorService
(which is a type of ExecutorService
) lets us run tasks on a schedule. Let's look at an example:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
Everything is simple here. The tasks are submitted and then we get a java.util.concurrent.ScheduledFuture
. A schedule may also be helpful in the following situation:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Here we submit a Runnable
task for execution at a fixed frequency ("FixedRate") with a certain initial delay. In this case, after 1 second, the task will start to be executed every 2 seconds.
There is a similar option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
But in this case, the tasks are performed with a specific interval BETWEEN each execution. That is, the task
will be executed after 1 second. Then, as soon as it is completed, 2 seconds will pass, and then a new task will be started. Here are some additional resources on this topic:
- An introduction to thread pools in Java
- Introduction to Thread Pools in Java
- Java Multithreading Steeplechase: Cancelling Tasks In Executors
- Using Java Executors for Background Tasks
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
WorkStealingPool
In addition to the above thread pools, there is one more. We can honestly say that it is a little special. It's called a work-stealing pool. In short, work-stealing is an algorithm in which idle threads start taking tasks from other threads or tasks from a shared queue. Let's look at an example:
public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
If we run this code, then the ExecutorService
will create 5 threads for us, because each thread will be put in the wait queue for the lock object. We already figured out monitors and locks in Better together: Java and the Thread class. Part II — Synchronization.
Now let's replace Executors.newCachedThreadPool()
with Executors.newWorkStealingPool()
. What will change?
We'll see that our tasks are executed on fewer than 5 threads. Remember that CachedThreadPool
creates a thread for each task? That's because wait()
blocked the thread, subsequent tasks want to be completed, and new threads were created for them in the pool. With a stealing pool, threads don't stand idle forever. They start performing their neighbors' tasks.
What makes a WorkStealingPool
so different from other thread pools? The fact that the magical ForkJoinPool
lives inside it:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Actually, there is one more difference. By default, the threads created for a ForkJoinPool
are daemon threads, unlike the threads created through an onrdinary ThreadPool
. In general, you should remember daemon threads, because, for example, CompletableFuture
also uses daemon threads unless you specify your own ThreadFactory
that creates non-daemon threads. These are the surprises that may lurk in unexpected places! :)
ForkJoinPool
In this part, we'll again talk aboutForkJoinPool
(also called the fork/join framework), which lives "under the hood" of WorkStealingPool
. In general, the fork/join framework appeared back in Java 1.7. And even though Java 11 is close at hand, it's still worth remembering. This isn't the most common implementation, but it is quite interesting.
There's a good review about this on the web: Understanding Java Fork-Join Framework with Examples.
The ForkJoinPool
relies on java.util.concurrent.RecursiveTask
. There's also java.util.concurrent.RecursiveAction
. RecursiveAction
doesn't return a result. Thus, RecursiveTask
is similar to Callable
, and RecursiveAction
is similar to unnable
. We can see that the name includes the names of two important methods: fork
and join
. The fork
method starts some task asynchronously on a separate thread. And the join
method lets you to wait for work to be done.
To get the best understanding, you should read From Imperative Programming to Fork/Join to Parallel Streams in Java 8.
Summary
Well, that wraps up this part of the review. We've learned thatExecutor
was originally invented to execute threads. Then Java's creators decided to continue the idea and came up with ExecutorService
. ExecutorService
lets us submit tasks for execution using submit()
and invoke()
, and also shut down the service. Because ExecutorService
needs implementations, they wrote a class with factory methods and called it Executors
. It lets you create thread pools (ThreadPoolExecutor
). Additionally, there are thread pools that also allow us specify an execution schedule. And a ForkJoinPool
hides behind a WorkStealingPool
.
I hope you found what I wrote above not only interesting, but also understandable :) I am always glad to hear your suggestions and comments.
Better together: Java and the Thread class. Part I — Threads of execution
Better together: Java and the Thread class. Part II — Synchronization
Better together: Java and the Thread class. Part III — Interaction
Better together: Java and the Thread class. Part IV — Callable, Future, and friends
Better together: Java and the Thread class. Part VI — Fire away!
GO TO FULL VERSION