1. 组合 CompletableFuture
在真实场景中,很少所有数据都来自单一来源:用户资料与其订单可以并行加载,来自两个微服务的数据可以合并,有时只想处理最先返回的结果。同步方式会让我们按顺序等待;CompletableFuture 类允许同时启动所有任务,并优雅地合并结果。它为此提供了专门的方法:thenCombine、allOf、anyOf。我们按顺序逐一说明。
并行任务:两个异步请求
同步情况下会怎样:
String name = loadUserName(); // 耗时
int balance = loadUserBalance(); // 耗时
System.out.println("姓名: " + name + ", 余额: " + balance);
问题:第二个调用只会在第一个完成后才开始。
异步方式
借助 CompletableFuture,可以同时启动这两个任务:
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> loadUserName());
CompletableFuture<Integer> balanceFuture = CompletableFuture.supplyAsync(() -> loadUserBalance());
但如何同时拿到两个结果并一起处理?这时就用 thenCombine。
2. thenCombine:合并两项任务的结果
方法 thenCombine 可以把两个 CompletableFuture 合并,并在两项任务都完成时执行一个操作。它会返回一个新的 CompletableFuture,其结果是合并的产物。
方法签名:
<A, B, C> CompletableFuture<C> thenCombine(
CompletionStage<? extends B> other,
BiFunction<? super A, ? super B, ? extends C> fn
)
- A — 第一个 future 的结果类型,
- B — 第二个的类型,
- C — 合并后的结果类型。
示例
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> loadUserName());
CompletableFuture<Integer> balanceFuture = CompletableFuture.supplyAsync(() -> loadUserBalance());
CompletableFuture<String> resultFuture = nameFuture.thenCombine(
balanceFuture,
(name, balance) -> "姓名: " + name + ", 余额: " + balance
);
resultFuture.thenAccept(System.out::println);
工作原理:
- 两个 future 并行启动。
- 一旦二者都完成,就会调用函数 (name, balance) -> ...。
- 最终的 future 包含合并后的字符串。
小示例:数字
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> sum = f1.thenCombine(f2, Integer::sum);
sum.thenAccept(result -> System.out.println("总和: " + result));
输出:
总和: 5
异步版本
如果合并是个重活,请使用 thenCombineAsync:
f1.thenCombineAsync(f2, (a, b) -> a * b);
3. allOf:当任务很多时
如果不是两项任务,而是十来项呢?例如,我们想并行加载 10 个用户的数据。 为此可以使用 CompletableFuture.allOf。
说明
CompletableFuture.allOf(f1, f2, ..., fn) 返回一个新的 future,当且仅当传入的所有任务都完成时它才会完成。但有个细节:该 future 不包含任何结果——它的类型始终是 CompletableFuture<Void>。要拿到结果,需要从原始的 futures 中单独“取出”。
示例
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "第一个");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "第二个");
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.thenRun(() -> {
// 所有任务都已完成!
String s1 = f1.join(); // join() 类似 get(),但抛出未检查异常
String s2 = f2.join();
System.out.println(s1 + " & " + s2);
});
输出:
第一个 & 第二个
包含任务数组的示例
List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int id = i;
futures.add(CompletableFuture.supplyAsync(() -> "用户 " + id));
}
CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
all.thenRun(() -> {
for (CompletableFuture<String> f : futures) {
System.out.println(f.join());
}
});
发生了什么:
- 所有任务并行启动。
- allOf 会在所有任务都完成时结束。
- 在 thenRun 块中,我们可以通过 join() 获取结果。
可视化示意
[Future1] \
[Future2] ----> [allOf] ---> thenRun
[Future3] /
4. anyOf:等待最先完成的任务
有时我们不需要等所有任务,而是获取最快完成的那个结果。比如同时向两个服务器请求数据——谁先返回就用谁的结果。 这时就用 CompletableFuture.anyOf。
说明
CompletableFuture.anyOf(f1, f2, ..., fn) 返回的 future 会在任意一个传入任务完成时立即完成。结果类型是 CompletableFuture<Object>,因为各任务的类型可能不同。
示例
CompletableFuture<String> fast = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "快速服务器";
});
CompletableFuture<String> slow = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "慢服务器";
});
CompletableFuture<Object> any = CompletableFuture.anyOf(fast, slow);
any.thenAccept(result -> System.out.println("收到: " + result));
输出:
收到: 快速服务器
不同类型的示例
可以组合不同返回类型的任务,但那样结果类型就是 Object,需要显式进行类型转换。
5. 有用的细节
建议
- allOf 不会返回结果数组。需要保留原始 futures,随后通过 join() 或 get() 取值。
- anyOf 返回第一个完成的结果,但类型是 Object。如果所有任务类型相同,可以进行类型转换。
- 如果 allOf 中任何一个任务失败,最终的 future 也会以异常结束。
- 对于 thenCombine,两项任务都必须成功完成,否则会抛出异常。
方法对比表
| 方法 | 何时使用 | 结果类型 |
|---|---|---|
|
需要合并两项任务的结果 | 合并后的结果 |
|
需要等待所有任务完成 | |
|
需要等待任意一个任务完成 | Object(第一项任务的结果) |
6. 组合 CompletableFuture 时的常见错误
错误 №1:在主线程里用 get()/join() 等待结果。
如果你写的是异步代码,但最后还是调用了 get() 或 join(),那就会阻塞线程,从而失去异步的优势。更好的方式是使用 thenAccept/thenRun 进行非阻塞的结果处理。
错误 №2:使用 allOf 时没有保存对原始 futures 的引用。
如果你调用了 CompletableFuture.allOf(f1, f2, f3),却没有保存 f1、f2、f3——你将无法获取它们的结果。allOf 只返回 Void!
错误 №3:没有在链路中处理异常。
如果某个任务以异常结束,整个 allOf 或 thenCombine 也会以异常结束。请使用异常处理方法(exceptionally、handle、whenComplete)以免漏掉异常。
错误 №4:anyOf 的类型不匹配。
anyOf 返回 Object。如果你的 futures 返回不同的类型,就需要判断先到的是哪一种。若可能,尽量让各任务的返回类型一致。
错误 №5:链条过于复杂且没有注释。
当代码越来越多时,future 链很容易变成“面条代码”。请不要犹豫,把链条拆分成多个变量,并为各步骤添加注释。
GO TO FULL VERSION