介绍
在
第 I 部分中,我们回顾了线程是如何创建的。让我们再回忆一次。
![更好的结合:Java 和 Thread 类。 第四部分 — Callable、Future 和朋友 - 1]()
线程由 Thread 类表示,它的
run()
方法被调用。因此,让我们使用
Tutorialspoint 在线 Java 编译器并执行以下代码:
public class HelloWorld {
public static void main(String[] args) {
Runnable task = () -> {
System.out.println("Hello World");
};
new Thread(task).start();
}
}
这是在线程上启动任务的唯一选择吗?
java.util.concurrent.Callable
原来
java.lang.Runnable有一个兄弟叫
java.util.concurrent.Callable,他是在Java 1.5中诞生的。有什么区别?如果您仔细查看此接口的 Javadoc,我们会发现,与 不同的是
Runnable
,新接口声明了一个
call()
返回结果的方法。此外,它默认抛出 Exception。也就是说,它使我们不必
try-catch
为已检查的异常而阻塞。不错,对吧?现在我们有一个新任务而不是
Runnable
:
Callable task = () -> {
return "Hello, World!";
};
但是我们用它做什么呢?为什么我们需要在返回结果的线程上运行任务?显然,对于将来执行的任何操作,我们都希望在将来收到这些操作的结果。我们有一个具有相应名称的接口:
java.util.concurrent.Future
java.util.concurrent.Future
java.util.concurrent.Future接口定义了一个 API,用于处理我们计划在未来接收其结果
的任务:获取结果的方法和检查状态的方法。关于
Future
,我们感兴趣的是它在
java.util.concurrent.FutureTask类中的实现。这是将在 中执行的“任务”
Future
。让这个实现更有趣的是它还实现了 Runnable。您可以认为这是在线程上处理任务的旧模型和新模型(在 Java 1.5 中出现的意义上的新模型)之间的一种适配器。这是一个例子:
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class HelloWorld {
public static void main(String[] args) throws Exception {
Callable task = () -> {
return "Hello, World!";
};
FutureTask<String> future = new FutureTask<>(task);
new Thread(future).start();
System.out.println(future.get());
}
}
从示例中可以看出,我们使用
get
方法从任务中获取结果。
笔记:当您使用该方法获得结果时
get()
,执行变得同步!你认为这里会用到什么机制?没错,没有同步块。这就是为什么我们不会在 JVisualVM 中看到
WAITING作为
monitor
or
wait
,而是作为熟悉的
park()
方法(因为
LockSupport
正在使用该机制)。
功能接口
接下来,我们将讨论 Java 1.8 中的类,因此我们最好提供一个简短的介绍。看下面的代码:
Supplier<String> supplier = new Supplier<String>() {
@Override
public String get() {
return "String";
}
};
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Function<String, Integer> converter = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.valueOf(s);
}
};
很多很多额外的代码,你不觉得吗?每个声明的类执行一个功能,但我们使用一堆额外的支持代码来定义它。这就是 Java 开发人员的想法。因此,他们引入了一组“功能接口”(
@FunctionalInterface
),并决定现在 Java 自己来做“思考”,只留下重要的事情让我们担心:
Supplier<String> supplier = () -> "String";
Consumer<String> consumer = s -> System.out.println(s);
Function<String, Integer> converter = s -> Integer.valueOf(s);
一
Supplier
用品。它没有参数,但会返回一些东西。这就是它供应东西的方式。一个
Consumer
消费。它需要一些东西作为输入(一个参数)并用它做一些事情。参数是它消耗的东西。那么我们也有
Function
。它接受输入(参数),做一些事情,然后返回一些东西。您可以看到我们正在积极使用泛型。
如果您不确定,可以通过阅读“ Java 中的泛型:如何在实践中使用尖括号” 来复习一下。
可完成的未来
CompletableFuture
时间过去了, Java 1.8 中出现了一个名为的新类。它实现了
Future
接口,即以后我们的任务完成,我们可以调用
get()
得到结果。但它也实现了
CompletionStage
接口。顾名思义:这是一组计算的某个阶段。可以在此处的评论中找到对该主题的简要介绍:CompletionStage 和 CompletableFuture 简介。让我们开门见山。让我们看一下可帮助我们入门的可用静态方法列表:
![更好的结合:Java 和 Thread 类。 第四部分 — Callable、Future 和朋友 - 2]()
以下是使用它们的选项:
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String[] args) throws Exception {
CompletableFuture<String> completed;
completed = CompletableFuture.completedFuture("Just a value");
CompletableFuture<Void> voidCompletableFuture;
voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("run " + Thread.currentThread().getName());
});
CompletableFuture<String> supplier;
supplier = CompletableFuture.supplyAsync(() -> {
System.out.println("supply " + Thread.currentThread().getName());
return "Value";
});
}
}
如果我们执行此代码,我们将看到创建一个
CompletableFuture
还涉及启动整个管道。因此,与 Java8 中的 SteamAPI 有一定的相似性,这就是我们发现这些方法之间的区别的地方。例如:
List<String> array = Arrays.asList("one", "two");
Stream<String> stringStream = array.stream().map(value -> {
System.out.println("Executed");
return value.toUpperCase();
});
这是 Java 8 的 Stream API 的示例。如果运行此代码,您会看到“已执行”不会显示。换句话说,在 Java 中创建流时,流不会立即启动。相反,它等待有人想要从它那里获得价值。但是
CompletableFuture
立即开始执行管道,而不是等待有人向它询问一个值。我认为理解这一点很重要。所以,我们有一个
CompletableFuture
. 我们如何制作管道(或链)以及我们有什么机制?回想一下我们之前写过的那些函数式接口。
- 我们有一个
Function
接受 A 并返回 B 的 a。它只有一个方法:apply()
。
- 我们有一个
Consumer
接受 A 并且不返回任何内容的 a (Void)。它只有一个方法:accept()
.
- 我们有
Runnable
,它在线程上运行,不接受任何东西,也不返回任何东西。它只有一个方法:run()
.
接下来要记住的是在其工作中
CompletableFuture
使用
Runnable
,
Consumers
, 和
Functions
。因此,您始终可以知道您可以执行以下操作
CompletableFuture
:
public static void main(String[] args) throws Exception {
AtomicLong longValue = new AtomicLong(0);
Runnable task = () -> longValue.set(new Date().getTime());
Function<Long, Date> dateConverter = (longvalue) -> new Date(longvalue);
Consumer<Date> printer = date -> {
System.out.println(date);
System.out.flush();
};
CompletableFuture.runAsync(task)
.thenApply((v) -> longValue.get())
.thenApply(dateConverter)
.thenAccept(printer);
}
、和方法具有
thenRun()
“异步”版本。这意味着这些阶段将在不同的线程上完成。这个线程将从一个特殊的池中获取——所以我们不会提前知道它是新线程还是旧线程。这完全取决于任务的计算密集程度。除了这些方法之外,还有三种更有趣的可能性。为了清楚起见,让我们假设我们有一个特定的服务从某个地方接收某种消息——这需要时间:
thenApply()
thenAccept()
public static class NewsService {
public static String getMessage() {
try {
Thread.currentThread().sleep(3000);
return "Message";
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
现在,让我们来看看
CompletableFuture
提供的其他能力。我们可以将 a 的结果
CompletableFuture
与 another 的结果结合起来
CompletableFuture
:
Supplier newsSupplier = () -> NewsService.getMessage();
CompletableFuture<String> reader = CompletableFuture.supplyAsync(newsSupplier);
CompletableFuture.completedFuture("!!")
.thenCombine(reader, (a, b) -> b + a)
.thenAccept(result -> System.out.println(result))
.get();
请注意,默认情况下线程是守护线程,因此为了清楚起见,我们使用
get()
等待结果。我们不仅可以合并
CompletableFutures
,还可以返回一个
CompletableFuture
:
CompletableFuture.completedFuture(2L)
.thenCompose((val) -> CompletableFuture.completedFuture(val + 2))
.thenAccept(result -> System.out.println(result));
在这里我想指出,该
CompletableFuture.completedFuture()
方法是为简洁起见而使用的。此方法不会创建新线程,因此管道的其余部分将在
completedFuture
调用的同一线程上执行。还有一个
thenAcceptBoth()
方法。它与 非常相似
accept()
,但如果
thenAccept()
接受 a
Consumer
,
thenAcceptBoth()
则接受另一个
CompletableStage
+
BiConsumer
作为输入,即 a
consumer
需要 2 个源而不是一个。名称中包含单词“Either”的方法还提供了另一个有趣的功能:
![更好的结合:Java 和 Thread 类。 第四部分 — Callable、Future 和朋友 - 3]()
这些方法接受一个替代方法并在最先执行的方法
CompletableStage
上执行。
CompletableStage
最后,我想用另一个有趣的特性来结束这篇评论
CompletableFuture
:错误处理。
CompletableFuture.completedFuture(2L)
.thenApply((a) -> {
throw new IllegalStateException("error");
}).thenApply((a) -> 3L)
.thenAccept(val -> System.out.println(val));
这段代码什么都不做,因为会有一个异常,其他什么也不会发生。但是通过取消注释“exceptionally”语句,我们定义了预期的行为。说到
CompletableFuture
,我也推荐你看下面的视频:
以我的拙见,这些是互联网上最具解释性的视频之一。他们应该清楚这一切是如何运作的,我们有什么工具包,以及为什么需要所有这些。
结论
希望现在您已经清楚如何使用线程在完成计算后进行计算。附加材料:
更好的结合:Java 和 Thread 类。第 I 部分 — 执行的线程 更好地结合:Java 和 Thread 类。第二部分 — 同步 更好地结合:Java 和 Thread 类。第 III 部分 — 更好地交互:Java 和 Thread 类。第五部分 — Executor、ThreadPool、Fork/Join 更好地结合在一起:Java 和 Thread 类。第六部分——开火!
GO TO FULL VERSION