CodeGym /课程 /JAVA 25 SELF /组合 CompletableFuture:thenCombine、allOf、anyOf

组合 CompletableFuture:thenCombine、allOf、anyOf

JAVA 25 SELF
第 55 级 , 课程 2
可用

1. 组合 CompletableFuture

在真实场景中,很少所有数据都来自单一来源:用户资料与其订单可以并行加载,来自两个微服务的数据可以合并,有时只想处理最先返回的结果。同步方式会让我们按顺序等待;CompletableFuture 类允许同时启动所有任务,并优雅地合并结果。它为此提供了专门的方法:thenCombineallOfanyOf。我们按顺序逐一说明。

并行任务:两个异步请求

同步情况下会怎样:

String name = loadUserName();    // 耗时
int balance = loadUserBalance(); // 耗时
System.out.println("姓名: " + name + ", 余额: " + balance);

问题:第二个调用只会在第一个完成后才开始。

异步方式

借助 CompletableFuture,可以同时启动这两个任务:

CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> loadUserName());
CompletableFuture<Integer> balanceFuture = CompletableFuture.supplyAsync(() -> loadUserBalance());

但如何同时拿到两个结果并一起处理?这时就用 thenCombine

2. thenCombine:合并两项任务的结果

方法 thenCombine 可以把两个 CompletableFuture 合并,并在两项任务都完成时执行一个操作。它会返回一个新的 CompletableFuture,其结果是合并的产物。

方法签名:

<A, B, C> CompletableFuture<C> thenCombine(
    CompletionStage<? extends B> other,
    BiFunction<? super A, ? super B, ? extends C> fn
)
  • A — 第一个 future 的结果类型,
  • B — 第二个的类型,
  • C — 合并后的结果类型。

示例

CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> loadUserName());
CompletableFuture<Integer> balanceFuture = CompletableFuture.supplyAsync(() -> loadUserBalance());

CompletableFuture<String> resultFuture = nameFuture.thenCombine(
    balanceFuture,
    (name, balance) -> "姓名: " + name + ", 余额: " + balance
);

resultFuture.thenAccept(System.out::println);

工作原理:

  • 两个 future 并行启动。
  • 一旦二者都完成,就会调用函数 (name, balance) -> ...
  • 最终的 future 包含合并后的字符串。

小示例:数字

CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> sum = f1.thenCombine(f2, Integer::sum);

sum.thenAccept(result -> System.out.println("总和: " + result));

输出:

总和: 5

异步版本

如果合并是个重活,请使用 thenCombineAsync

f1.thenCombineAsync(f2, (a, b) -> a * b);

3. allOf:当任务很多时

如果不是两项任务,而是十来项呢?例如,我们想并行加载 10 个用户的数据。 为此可以使用 CompletableFuture.allOf

说明

CompletableFuture.allOf(f1, f2, ..., fn) 返回一个新的 future,当且仅当传入的所有任务都完成时它才会完成。但有个细节:该 future 不包含任何结果——它的类型始终是 CompletableFuture<Void>。要拿到结果,需要从原始的 futures 中单独“取出”。

示例

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "第一个");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "第二个");

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);

all.thenRun(() -> {
    // 所有任务都已完成!
    String s1 = f1.join(); // join() 类似 get(),但抛出未检查异常
    String s2 = f2.join();
    System.out.println(s1 + " & " + s2);
});

输出:

第一个 & 第二个

包含任务数组的示例

List<CompletableFuture<String>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
    int id = i;
    futures.add(CompletableFuture.supplyAsync(() -> "用户 " + id));
}

CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

all.thenRun(() -> {
    for (CompletableFuture<String> f : futures) {
        System.out.println(f.join());
    }
});

发生了什么:

  • 所有任务并行启动。
  • allOf 会在所有任务都完成时结束。
  • thenRun 块中,我们可以通过 join() 获取结果。

可视化示意

[Future1]   \
[Future2] ----> [allOf] ---> thenRun
[Future3]   /

4. anyOf:等待最先完成的任务

有时我们不需要等所有任务,而是获取最快完成的那个结果。比如同时向两个服务器请求数据——谁先返回就用谁的结果。 这时就用 CompletableFuture.anyOf

说明

CompletableFuture.anyOf(f1, f2, ..., fn) 返回的 future 会在任意一个传入任务完成时立即完成。结果类型是 CompletableFuture<Object>,因为各任务的类型可能不同。

示例

CompletableFuture<String> fast = CompletableFuture.supplyAsync(() -> {
    sleep(500);
    return "快速服务器";
});
CompletableFuture<String> slow = CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    return "慢服务器";
});

CompletableFuture<Object> any = CompletableFuture.anyOf(fast, slow);

any.thenAccept(result -> System.out.println("收到: " + result));

输出:

收到: 快速服务器

不同类型的示例
可以组合不同返回类型的任务,但那样结果类型就是 Object,需要显式进行类型转换。

5. 有用的细节

建议

  • allOf 不会返回结果数组。需要保留原始 futures,随后通过 join()get() 取值。
  • anyOf 返回第一个完成的结果,但类型是 Object。如果所有任务类型相同,可以进行类型转换。
  • 如果 allOf 中任何一个任务失败,最终的 future 也会以异常结束。
  • 对于 thenCombine,两项任务都必须成功完成,否则会抛出异常。

方法对比表

方法 何时使用 结果类型
thenCombine
需要合并两项任务的结果 合并后的结果
allOf
需要等待所有任务完成
Void
anyOf
需要等待任意一个任务完成 Object(第一项任务的结果)

6. 组合 CompletableFuture 时的常见错误

错误 №1:在主线程里用 get()/join() 等待结果。
如果你写的是异步代码,但最后还是调用了 get()join(),那就会阻塞线程,从而失去异步的优势。更好的方式是使用 thenAccept/thenRun 进行非阻塞的结果处理。

错误 №2:使用 allOf 时没有保存对原始 futures 的引用。
如果你调用了 CompletableFuture.allOf(f1, f2, f3),却没有保存 f1f2f3——你将无法获取它们的结果。allOf 只返回 Void

错误 №3:没有在链路中处理异常。
如果某个任务以异常结束,整个 allOfthenCombine 也会以异常结束。请使用异常处理方法(exceptionallyhandlewhenComplete)以免漏掉异常。

错误 №4:anyOf 的类型不匹配。
anyOf 返回 Object。如果你的 futures 返回不同的类型,就需要判断先到的是哪一种。若可能,尽量让各任务的返回类型一致。

错误 №5:链条过于复杂且没有注释。
当代码越来越多时,future 链很容易变成“面条代码”。请不要犹豫,把链条拆分成多个变量,并为各步骤添加注释。

评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION