CodeGym /コース /JAVA 25 SELF /thenCompose + カスタム Executor + タイムアウト

thenCompose + カスタム Executor + タイムアウト

JAVA 25 SELF
レベル 55 , レッスン 4
使用可能

1. thenCompose vs. thenApply: 違いと使い分け

Java の非同期プログラミング(CompletableFuture)では、処理をチェーンとしてつなぐことがよくあります。そのための似たメソッドに thenApplythenCompose がありますが、動作は異なります。

thenApply

thenApply は、次のステップが単なる値の変換であり、新しい非同期処理を開始しない場合に使います。前段の結果を受け取り、それを処理して新しい値(CompletableFuture ではない)を返します。

Stream API に馴染みがあるなら、thenApplymap に近い振る舞いです。結果を取り、関数を適用し、変換後の値を返します。

例:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "42");
CompletableFuture<Integer> lengthFuture = cf.thenApply(s -> s.length());
// lengthFuture は 2(文字列 "42" の長さ)を持つ

簡単に言えば、thenApply は「結果が用意できたら、これを実行してね」と伝える方法です。

thenCompose

  • 次のステップがさらに別の非同期処理(CompletableFuture を返す)である場合に使います。
  • 入れ子になった CompletableFuture を「展開」できます(flatMap のイメージ)。
  • 非同期な関数に thenApply を使うと、CompletableFuture<CompletableFuture<T>> になってしまい不便です。

例:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "user42");

// たとえば、ユーザー名から注文を非同期で取得したいとします
CompletableFuture<List<Order>> ordersFuture = cf.thenCompose(username -> fetchOrdersAsync(username));
// fetchOrdersAsync は CompletableFuture<List<Order>> を返す

イメージ:

  • thenApply: CF<String>thenApply(s -> s.length())CF<Integer>
  • thenCompose: CF<User>thenCompose(u -> fetchOrdersAsync(u.id))CF<List<Order>>

どちらを使うべきか?

  • 関数が通常の値を返す — thenApply を使う。
  • 関数が CompletableFuture を返す — thenCompose を使う。

誤用の例:

cf.thenApply(username -> fetchOrdersAsync(username)); // CF<CF<List<Order>>> になってしまう
cf.thenCompose(username -> fetchOrdersAsync(username)); // CF<List<Order>> を得る

2. スレッドプール(Executor)の管理: なぜ独自 Executor を使うか、どう使うか

デフォルト: ForkJoinPool.commonPool()

CompletableFuture.supplyAsync(...)thenApplyAsync(...)Executor を指定せずに呼ぶと、Java は共有スレッドプール — ForkJoinPool.commonPool() を使います。便利ですが、常に適切とは限りません:

  • 長時間またはブロッキングの処理(ネットワーク I/O、ファイル I/O)が多いと、共有プールが詰まり、すべてのタスクが待たされることがあります。
  • タスクの優先度を分離したい、同時に動くスレッド数を制限したい、といった場合があります。

独自 Executor が必要なとき

  • 長時間・ブロッキングな処理(DB 問い合わせ、HTTP リクエスト、ファイル読み取りなど)。
  • タスクの分離: ユーザー処理がシステム処理を妨げないようにする。
  • リソース制限: たとえば、同時ダウンロードを 10 件までにする。

独自 Executor の作り方

通常は ThreadPoolExecutorExecutors のファクトリを使います:

ExecutorService myExecutor = Executors.newFixedThreadPool(10);

CompletableFuture で独自 Executor を使う方法

  • supplyAsyncrunAsyncthenApplyAsyncthenComposeAsync などのメソッドに、第二引数としてあなたの Executor を渡せます。

例:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(
    () -> loadDataFromNetwork(), myExecutor
);

cf.thenApplyAsync(data -> processData(data), myExecutor)
  .thenAcceptAsync(result -> System.out.println(result), myExecutor);

重要: Executor を指定しない場合は ForkJoinPool.commonPool() が使われます。

デフォルト Executor で十分なとき

  • 短時間の CPU バウンドな処理(単純な計算)。
  • どのスレッドで実行されるかが重要でない場合。

3. タイムアウト処理: orTimeout と completeOnTimeout

非同期処理はハングしたり長時間かかったりすることがあります(サーバーが応答しない等)。永遠に待たないために、CompletableFuture にはタイムアウト用のメソッドがあります。

orTimeout

  • 指定時間内に終わらない場合、TimeoutException による失敗として CompletableFuture を完了させます。
  • 実行中のタスク自体はキャンセルされませんが、後続のチェーンはエラーを受け取ります。

構文:

cf.orTimeout(3, TimeUnit.SECONDS)
  .exceptionally(ex -> {
      System.out.println("タイムアウト: " + ex);
      return null;
  });

例:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(5000); // 長い処理を模擬
    return "OK";
});

cf.orTimeout(2, TimeUnit.SECONDS)
  .exceptionally(ex -> {
      System.out.println("エラー: " + ex);
      return "TIMEOUT";
  });

結果:

2 秒後に TimeoutException がスローされ、exceptionally がエラーを処理します。

completeOnTimeout

  • タイムアウト時間内に終わらなければ、指定した値で CompletableFuture を完了させます。
  • 例外は投げず、「フォールバック」値を返します。

構文:

cf.completeOnTimeout("DEFAULT", 2, TimeUnit.SECONDS);

例:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(5000);
    return "OK";
});

cf.completeOnTimeout("TIMEOUT", 2, TimeUnit.SECONDS)
  .thenAccept(System.out::println); // 2 秒後に "TIMEOUT" を出力

orTimeout と completeOnTimeout の比較

メソッド タイムアウト時の動作 その後の扱い
orTimeout
TimeoutException で完了 exceptionally/handle で処理できる
completeOnTimeout
指定値で完了 thenAccept/thenApply がその値を受け取る

4. 実践: thenCompose、カスタム Executor、タイムアウトの例

課題:

  • ID でユーザーを取得する(非同期・遅延あり)。
  • 続いて、そのユーザーの注文一覧を非同期で取得する(遅延あり)。
  • カスタム Executor を使用する。
  • 注文取得にタイムアウトを付ける。
import java.util.concurrent.*;
import java.util.*;

public class AsyncDemo {
    static ExecutorService ioExecutor = Executors.newFixedThreadPool(4);

    // ユーザー取得(非同期)の模擬
    static CompletableFuture<String> fetchUserAsync(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "user" + userId;
        }, ioExecutor);
    }

    // ユーザー注文取得(非同期)の模擬
    static CompletableFuture<List<String>> fetchOrdersAsync(String username) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(3000); // 長い処理!
            return List.of("order1", "order2");
        }, ioExecutor);
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException ignored) {}
    }

    public static void main(String[] args) {
        fetchUserAsync(42)
            .thenCompose(username ->
                fetchOrdersAsync(username)
                    .orTimeout(2, TimeUnit.SECONDS) // 注文取得に対するタイムアウト
                    .exceptionally(ex -> {
                        System.out.println("注文の取得に失敗: " + ex);
                        return List.of();
                    })
            )
            .thenAccept(orders -> System.out.println("注文: " + orders))
            .join(); // チェーン全体の完了を待つ

        ioExecutor.shutdown();
    }
}

何が起きているか:

  • ユーザーを取得(1 秒)。
  • 注文を取得(3 秒。ただしタイムアウトは 2 秒)。
  • 間に合わなければ TimeoutException を受け取り、空のリストを返す。
  • すべてカスタム Executor 上で動作。

結果:

注文の取得に失敗: java.util.concurrent.TimeoutException
注文: []

fetchOrdersAsync の遅延を 1_000 ms に減らすと、実際の注文が表示されます。

5. よくある誤りと注意点

誤り №1: 使用 thenApply の代わりに thenCompose を使うべき非同期処理で thenApply を使ってしまう。
関数が CompletableFuture を返すのに thenApply を適用すると、入れ子の CompletableFuture<CompletableFuture<T>> になります。チェーンが複雑になり、余計なラップが増えます。対策: thenCompose を使って結果を CompletableFuture<T> に「フラット化」しましょう。

誤り №2: 長時間または IO タスクを独自の Executor なしで実行する。
デフォルトではタスクは ForkJoinPool.commonPool() で実行されます。これが過負荷になると遅延が増え、アプリ内の他のタスクも遅くなります。対策: 独自の ExecutorService を作成し、supplyAsync/thenApplyAsync などに渡しましょう。

誤り №3: orTimeout がタスクの実行を取り消すと誤解する。
orTimeout はタイムアウトで CompletableFuture を例外完了させるだけで、タスク自体は裏で動き続けます。対策: 実行を止めたいなら cancel(true) や独自の割り込みメカニズムを使いましょう。

誤り №4: タイムアウトの適用範囲を誤解している。
orTimeoutcompleteOnTimeout はチェーンの「そのステップ」にだけ作用します。チェーン全体に対する総合的なタイムアウトが必要なら、チェーン全体を別の CompletableFuture に包み、それにタイムアウトを適用します。

誤り №5: ExecutorService を閉じない。
処理後に ExecutorServiceshutdown()/shutdownNow() を呼ばないと、スレッドが動き続け、プログラムが終了しないことがあります。対策: 常に ExecutorServicefinally で閉じるか、Java 21+ の try-with-resources を利用しましょう。

1
アンケート/クイズ
非同期プログラミング、レベル 55、レッスン 4
使用不可
非同期プログラミング
非同期プログラミング
コメント
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION