Introduction
In
Part I, we reviewed how threads are created. Let's recall one more time.
![Better together: Java and the Thread class. Part IV — Callable, Future, and friends - 1]()
A thread is represented by the Thread class, whose
run()
method gets called. So let's use the
Tutorialspoint online Java compiler and execute the following code:
public class HelloWorld {
public static void main(String[] args) {
Runnable task = () -> {
System.out.println("Hello World");
};
new Thread(task).start();
}
}
Is this the only option for starting a task on a thread?
java.util.concurrent.Callable
It turns out that
java.lang.Runnable has a brother called
java.util.concurrent.Callable who came into the world in Java 1.5. What are the differences? If you look closely at the Javadoc for this interface, we see that, unlike
Runnable
, the new interface declares a
call()
method that returns a result. Also, it throws Exception by default. That is, it saves us from having to
try-catch
blocks for checked exceptions. Not bad, right?
Now we have a new task instead of
Runnable
:
Callable task = () -> {
return "Hello, World!";
};
But what do we do with it? Why do we need a task running on a thread that returns a result? Obviously, for any actions performed in the future, we expect to receive the result of those actions in the future. And we have an interface with a corresponding name:
java.util.concurrent.Future
java.util.concurrent.Future
The
java.util.concurrent.Future interface defines an API for working with tasks whose results we plan to receive in the future: methods to get a result, and methods to check status.
In regards to
Future
, we are interested in its implementation in the
java.util.concurrent.FutureTask class. This is the "Task" that will be executed in
Future
. What makes this implementation even more interesting is that it also implements Runnable. You can consider this a kind of adapter between the old model of working with tasks on threads and the new model (new in the sense that it appeared in Java 1.5).
Here is an example:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class HelloWorld {
public static void main(String[] args) throws Exception {
Callable task = () -> {
return "Hello, World!";
};
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.get());
}
}
As you can see from the example, we use the
get
method to get the result from the task.
Note: when you get the result using the
get()
method, execution becomes synchronous! What mechanism do you think will be used here? True, there is no synchronization block. That's why we won't see
WAITING in JVisualVM as a
monitor
or
wait
, but as the familiar
park()
method (because the
LockSupport
mechanism is being used).
Functional interfaces
Next, we'll talk about classes from Java 1.8, so we would do well to provide a brief introduction. Look at the following code:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "String";
}
};
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Function<String, Integer> converter = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.valueOf(s);
}
};
Lots and lots of extra code, wouldn't you say? Each of the declared classes performs one function, but we use a bunch of extra supporting code to define it. And this is how Java developers thought. Accordingly, they introduced a set of "functional interfaces" (
@FunctionalInterface
) and decided that now Java itself would do the "thinking", leaving only the important stuff for us to worry about:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
A
Supplier
supplies. It has no parameters, but it returns something. This is how it supplies things.
A
Consumer
consumes. It takes something as an input (an argument) and does something with it. The argument is what it consumes.
Then we also have
Function
. It takes inputs (arguments), does something, and returns something. You can see that we are actively using generics. If you're unsure, you can get a refresher by reading "
Generics in Java: how to use angled brackets in practice".
CompletableFuture
Time passed and a new class called
CompletableFuture
appeared in Java 1.8. It implements the
Future
interface, i.e. our tasks will be completed in the future, and we can call
get()
to get the result. But it also implements the
CompletionStage
interface. The name says it all: this is a certain stage of some set of calculations. A brief introduction to the topic can be found in the review here: Introduction to CompletionStage and CompletableFuture.
Let's get right to the point. Let's look at the list of available static methods that will help us get started:
![Better together: Java and the Thread class. Part IV — Callable, Future, and friends - 2]()
Here are options for using them:
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String[] args) throws Exception {
// A CompletableFuture that already contains a Result
CompletableFuture<String> completed;
completed = CompletableFuture.completedFuture("Just a value");
// A CompletableFuture that runs a new thread from Runnable. That's why it's Void
CompletableFuture<Void> voidCompletableFuture;
voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("run " + Thread.currentThread().getName());
});
// A CompletableFuture that starts a new thread whose result we'll get from a Supplier
CompletableFuture<String> supplier;
supplier = CompletableFuture.supplyAsync(() -> {
System.out.println("supply " + Thread.currentThread().getName());
return "Value";
});
}
}
If we execute this code, we'll see that creating a
CompletableFuture
also involves launching a whole pipeline. Therefore, with a certain similarity to the SteamAPI from Java8, this is where we find the difference between these approaches. For example:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
System.out.println("Executed");
return value.toUpperCase();
});
This is an example of Java 8's Stream API. If you run this code, you'll see that "Executed" won't be displayed. In other words, when a stream is created in Java, the stream does not start immediately. Instead, it waits for someone to want a value from it. But
CompletableFuture
starts executing the pipeline immediately, without waiting for someone to ask it for a value. I think this is important to understand. S
o, we have a
CompletableFuture
. How can we make a pipeline (or chain) and what mechanisms do we have?
Recall those functional interfaces that we wrote about earlier.
- We have a
Function
that takes an A and returns a B. It has a single method: apply()
.
- We have a
Consumer
that takes an A and returns nothing (Void). It has a single method: accept()
.
- We have
Runnable
, which runs on the thread, and takes nothing and returns nothing. It has a single method: run()
.
The next thing to remember is that
CompletableFuture
uses
Runnable
,
Consumers
, and
Functions
in its work. Accordingly, you can always know that you can do the following with
CompletableFuture
:
public static void main(String[] args) throws Exception {
AtomicLong longValue = new AtomicLong(0);
Runnable task = () -> longValue.set(new Date().getTime());
Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
Consumer<Date> printer = date -> {
System.out.println(date);
System.out.flush();
};
// CompletableFuture computation
CompletableFuture.runAsync(task)
.thenApply((v) -> longValue.get())
.thenApply(dateConverter)
.thenAccept(printer);
}
The
thenRun()
,
thenApply()
, and
thenAccept()
methods have "Async" versions. This means that these stages will be completed on a different thread. This thread will be taken from a special pool — so we won't know in advance whether it will be a new or old thread. It all depends on how computationally-intensive the tasks are.
In addition to these methods, there are three more interesting possibilities.
For clarity, let's imagine that we have a certain service that receives some kind of message from somewhere — and this takes time:
public static class NewsService {
public static String getMessage() {
try {
Thread.currentThread().sleep(3000);
return "Message";
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
Now, let's take a look at other abilities that
CompletableFuture
provides. We can combine the result of a
CompletableFuture
with the result of another
CompletableFuture
:
Supplier newsSupplier = () -> NewsService.getMessage();
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
.thenCombine(reader, (a, b) -> b + a)
.thenAccept(result -> System.out.println(result))
.get();
Note that threads are daemon threads by default, so for clarity, we use
get()
to wait for the result.
Not only can we combine
CompletableFutures
, we can also return a
CompletableFuture
:
CompletableFuture.completedFuture(2L)
.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
.thenAccept(result -> System.out.println(result));
Here I want to note that the
CompletableFuture.completedFuture()
method was used for brevity. This method does not create a new thread, so the rest of the pipeline will be executed on the same thread where
completedFuture
was called.
There is also a
thenAcceptBoth()
method. It is very similar to
accept()
, but if
thenAccept()
accepts a
Consumer
,
thenAcceptBoth()
accepts another
CompletableStage
+
BiConsumer
as input, i.e. a
consumer
that takes 2 sources instead of one.
There is another interesting ability offered by methods whose name includes the word "Either":
![Better together: Java and the Thread class. Part IV — Callable, Future, and friends - 3]()
These methods accept an alternative
CompletableStage
and are executed on the
CompletableStage
that is be executed first.
Finally, I want to end this review with another interesting feature of
CompletableFuture
: error handling.
CompletableFuture.completedFuture(2L)
.thenApply((a) -> {
throw new IllegalStateException("error");
}).thenApply((a) -> 3L)
//.exceptionally(ex -> 0L)
.thenAccept(val -> System.out.println(val));
This code will do nothing, because there will be an exception and nothing else will happen. But by uncommenting the "exceptionally" statement, we define the expected behavior.
Speaking of
CompletableFuture
, I also recommend you to watch the following video:
In my humble opinion, these are among the most explanatory videos on the Internet. They should make it clear how this all works, what toolkit we have available, and why all this is needed.
Conclusion
Hopefully, it's now clear how you can use threads to get calculations after they are completed.
Additional material:
Better together: Java and the Thread class. Part I — Threads of execution
Better together: Java and the Thread class. Part II — Synchronization
Better together: Java and the Thread class. Part III — Interaction
Better together: Java and the Thread class. Part V — Executor, ThreadPool, Fork/Join
Better together: Java and the Thread class. Part VI — Fire away!
GO TO FULL VERSION