1. thenCompose vs. thenApply: 違いと使い分け
Java の非同期プログラミング(CompletableFuture)では、処理をチェーンとしてつなぐことがよくあります。そのための似たメソッドに thenApply と thenCompose がありますが、動作は異なります。
thenApply
thenApply は、次のステップが単なる値の変換であり、新しい非同期処理を開始しない場合に使います。前段の結果を受け取り、それを処理して新しい値(CompletableFuture ではない)を返します。
Stream API に馴染みがあるなら、thenApply は map に近い振る舞いです。結果を取り、関数を適用し、変換後の値を返します。
例:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "42");
CompletableFuture<Integer> lengthFuture = cf.thenApply(s -> s.length());
// lengthFuture は 2(文字列 "42" の長さ)を持つ
簡単に言えば、thenApply は「結果が用意できたら、これを実行してね」と伝える方法です。
thenCompose
- 次のステップがさらに別の非同期処理(CompletableFuture を返す)である場合に使います。
- 入れ子になった CompletableFuture を「展開」できます(flatMap のイメージ)。
- 非同期な関数に thenApply を使うと、CompletableFuture<CompletableFuture<T>> になってしまい不便です。
例:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "user42");
// たとえば、ユーザー名から注文を非同期で取得したいとします
CompletableFuture<List<Order>> ordersFuture = cf.thenCompose(username -> fetchOrdersAsync(username));
// fetchOrdersAsync は CompletableFuture<List<Order>> を返す
イメージ:
- thenApply: CF<String> → thenApply(s -> s.length()) → CF<Integer>
- thenCompose: CF<User> → thenCompose(u -> fetchOrdersAsync(u.id)) → CF<List<Order>>
どちらを使うべきか?
- 関数が通常の値を返す — thenApply を使う。
- 関数が CompletableFuture を返す — thenCompose を使う。
誤用の例:
cf.thenApply(username -> fetchOrdersAsync(username)); // CF<CF<List<Order>>> になってしまう
cf.thenCompose(username -> fetchOrdersAsync(username)); // CF<List<Order>> を得る
2. スレッドプール(Executor)の管理: なぜ独自 Executor を使うか、どう使うか
デフォルト: ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(...) や thenApplyAsync(...) を Executor を指定せずに呼ぶと、Java は共有スレッドプール — ForkJoinPool.commonPool() を使います。便利ですが、常に適切とは限りません:
- 長時間またはブロッキングの処理(ネットワーク I/O、ファイル I/O)が多いと、共有プールが詰まり、すべてのタスクが待たされることがあります。
- タスクの優先度を分離したい、同時に動くスレッド数を制限したい、といった場合があります。
独自 Executor が必要なとき
- 長時間・ブロッキングな処理(DB 問い合わせ、HTTP リクエスト、ファイル読み取りなど)。
- タスクの分離: ユーザー処理がシステム処理を妨げないようにする。
- リソース制限: たとえば、同時ダウンロードを 10 件までにする。
独自 Executor の作り方
通常は ThreadPoolExecutor や Executors のファクトリを使います:
ExecutorService myExecutor = Executors.newFixedThreadPool(10);
CompletableFuture で独自 Executor を使う方法
- supplyAsync、runAsync、thenApplyAsync、thenComposeAsync などのメソッドに、第二引数としてあなたの Executor を渡せます。
例:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(
() -> loadDataFromNetwork(), myExecutor
);
cf.thenApplyAsync(data -> processData(data), myExecutor)
.thenAcceptAsync(result -> System.out.println(result), myExecutor);
重要: Executor を指定しない場合は ForkJoinPool.commonPool() が使われます。
デフォルト Executor で十分なとき
- 短時間の CPU バウンドな処理(単純な計算)。
- どのスレッドで実行されるかが重要でない場合。
3. タイムアウト処理: orTimeout と completeOnTimeout
非同期処理はハングしたり長時間かかったりすることがあります(サーバーが応答しない等)。永遠に待たないために、CompletableFuture にはタイムアウト用のメソッドがあります。
orTimeout
- 指定時間内に終わらない場合、TimeoutException による失敗として CompletableFuture を完了させます。
- 実行中のタスク自体はキャンセルされませんが、後続のチェーンはエラーを受け取ります。
構文:
cf.orTimeout(3, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.out.println("タイムアウト: " + ex);
return null;
});
例:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
Thread.sleep(5000); // 長い処理を模擬
return "OK";
});
cf.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.out.println("エラー: " + ex);
return "TIMEOUT";
});
結果:
2 秒後に TimeoutException がスローされ、exceptionally がエラーを処理します。
completeOnTimeout
- タイムアウト時間内に終わらなければ、指定した値で CompletableFuture を完了させます。
- 例外は投げず、「フォールバック」値を返します。
構文:
cf.completeOnTimeout("DEFAULT", 2, TimeUnit.SECONDS);
例:
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
Thread.sleep(5000);
return "OK";
});
cf.completeOnTimeout("TIMEOUT", 2, TimeUnit.SECONDS)
.thenAccept(System.out::println); // 2 秒後に "TIMEOUT" を出力
orTimeout と completeOnTimeout の比較
| メソッド | タイムアウト時の動作 | その後の扱い |
|---|---|---|
|
TimeoutException で完了 | exceptionally/handle で処理できる |
|
指定値で完了 | thenAccept/thenApply がその値を受け取る |
4. 実践: thenCompose、カスタム Executor、タイムアウトの例
課題:
- ID でユーザーを取得する(非同期・遅延あり)。
- 続いて、そのユーザーの注文一覧を非同期で取得する(遅延あり)。
- カスタム Executor を使用する。
- 注文取得にタイムアウトを付ける。
import java.util.concurrent.*;
import java.util.*;
public class AsyncDemo {
static ExecutorService ioExecutor = Executors.newFixedThreadPool(4);
// ユーザー取得(非同期)の模擬
static CompletableFuture<String> fetchUserAsync(int userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "user" + userId;
}, ioExecutor);
}
// ユーザー注文取得(非同期)の模擬
static CompletableFuture<List<String>> fetchOrdersAsync(String username) {
return CompletableFuture.supplyAsync(() -> {
sleep(3000); // 長い処理!
return List.of("order1", "order2");
}, ioExecutor);
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
}
public static void main(String[] args) {
fetchUserAsync(42)
.thenCompose(username ->
fetchOrdersAsync(username)
.orTimeout(2, TimeUnit.SECONDS) // 注文取得に対するタイムアウト
.exceptionally(ex -> {
System.out.println("注文の取得に失敗: " + ex);
return List.of();
})
)
.thenAccept(orders -> System.out.println("注文: " + orders))
.join(); // チェーン全体の完了を待つ
ioExecutor.shutdown();
}
}
何が起きているか:
- ユーザーを取得(1 秒)。
- 注文を取得(3 秒。ただしタイムアウトは 2 秒)。
- 間に合わなければ TimeoutException を受け取り、空のリストを返す。
- すべてカスタム Executor 上で動作。
結果:
注文の取得に失敗: java.util.concurrent.TimeoutException
注文: []
fetchOrdersAsync の遅延を 1_000 ms に減らすと、実際の注文が表示されます。
5. よくある誤りと注意点
誤り №1: 使用 thenApply の代わりに thenCompose を使うべき非同期処理で thenApply を使ってしまう。
関数が CompletableFuture を返すのに thenApply を適用すると、入れ子の CompletableFuture<CompletableFuture<T>> になります。チェーンが複雑になり、余計なラップが増えます。対策: thenCompose を使って結果を CompletableFuture<T> に「フラット化」しましょう。
誤り №2: 長時間または IO タスクを独自の Executor なしで実行する。
デフォルトではタスクは ForkJoinPool.commonPool() で実行されます。これが過負荷になると遅延が増え、アプリ内の他のタスクも遅くなります。対策: 独自の ExecutorService を作成し、supplyAsync/thenApplyAsync などに渡しましょう。
誤り №3: orTimeout がタスクの実行を取り消すと誤解する。
orTimeout はタイムアウトで CompletableFuture を例外完了させるだけで、タスク自体は裏で動き続けます。対策: 実行を止めたいなら cancel(true) や独自の割り込みメカニズムを使いましょう。
誤り №4: タイムアウトの適用範囲を誤解している。
orTimeout と completeOnTimeout はチェーンの「そのステップ」にだけ作用します。チェーン全体に対する総合的なタイムアウトが必要なら、チェーン全体を別の CompletableFuture に包み、それにタイムアウトを適用します。
誤り №5: ExecutorService を閉じない。
処理後に ExecutorService の shutdown()/shutdownNow() を呼ばないと、スレッドが動き続け、プログラムが終了しないことがあります。対策: 常に ExecutorService を finally で閉じるか、Java 21+ の try-with-resources を利用しましょう。
GO TO FULL VERSION