CodeGym /コース /JAVA 25 SELF /Structured Concurrency

Structured Concurrency

JAVA 25 SELF
レベル 58 , レッスン 0
使用可能

1. はじめに

スレッドがそれぞれのパートを演奏しているとき

従来のマルチスレッドは、しばしば指揮者のいないリハーサルのようです。各スレッドはほかを気にせず自分のメロディーを演奏する音楽家のようなもの。ある者は先に終わってどこかへ行き、ある者は同じコードで立ち往生し、ある者は譜面を取り違えてエラーを出す。その結果はシンフォニーではなくカコフォニーで、誰がどこでつまずいたのか把握するのはほぼ不可能で、全員を一度に止めるのも至難の業です。

Structured Concurrency はこの問題を解決します。ばらばらのスレッドを本物のアンサンブルにし、すべてのタスクが1人の「指揮者」の下にまとまります。指揮者がストップをかければオーケストラは静まり、誰かが失敗しても、ほかは全体の調和を崩さず丁寧に停止します。結果とエラーはコードの隅々に散らばるのではなく、一元的に集約されます。

想像してみてください: 音楽家たちを好き勝手に演奏させるのではなく、1つのホールに集めます。指揮者がいて、楽譜があり、たとえトランペットが音を外しても、オーケストラは取り乱すことなく、きれいに演奏を締めくくれます。

Structured Concurrency で得られること

  • 単一の「スコープ」: すべてのサブタスクは1つのコードブロックの範囲内にあり、そのライフサイクルはそのブロックで完結します。
  • 予測可能な終了: 親スレッドは、すべてのサブタスクが終わるまで終了しません。
  • 集中管理されたキャンセル: いずれかのタスクが失敗した場合や、親が終了を決めた場合は、すべてのサブタスクが正しくキャンセルされます。
  • 一貫したエラー処理: サブタスクのエラーを集約でき、「原因の木構造 (tree of causes)」を取得できます。
  • クリーンで読みやすいコード: 「ぶら下がった」スレッドや、忘れられたタスク、キャンセル競合がありません。

Structured Concurrency は単なる新しい API ではなく、新しい考え方です。タスクは通常のコードブロック (たとえば try-with-resources) と同じように構造化すべきだ、という発想です。

2. Java における Structured Concurrency のステータス

この講義の執筆時点では Structured Concurrency は Preview (Java 21–23) のステータスにあり、Java 24/25 で GA (General Availability) へ進むことが期待されています。API は jdk.incubator.concurrent パッケージにあります。プロダクションで使う前に、ご使用の JDK バージョンの最新のリリースノートを必ず確認してください。

主要なクラス:

  • StructuredTaskScope — タスク群を管理するための基底クラス。
  • バリアント: StructuredTaskScope.ShutdownOnFailure, StructuredTaskScope.ShutdownOnSuccess — タスクの終了ポリシー。

基本概念 StructuredTaskScope

モデル: fork、join、そして結果の点呼

指揮者 (つまり親タスク) が合図を出すと、サブタスクはそれぞれのパートへ走っていきます。この瞬間が fork です — まるで音楽家を別々の部屋でそれぞれのパートを演奏させるかのように。

その後に join の時が来ます — 指揮者がタクトを上げ、全員が戻ってきて一緒に最後の和音を奏でます。

そして各参加者に「どうだったか」を尋ねられます:

  • resultNow() で、すべて問題なく演奏が終わっていればすぐに結果を受け取る;
  • throwIfFailed() で、誰も音を外していないことを確かめる。もし誰かが譜面で迷子になっていたら、単一の例外が投げられる — まるで指揮者が「オーケストラに不調があった、最初からやり直そう」と告げるように。

終了ポリシー

どの指揮者にも「いつ音楽を止めるか」という自分なりのルールがあります。Structured Concurrency では、これが終了ポリシーで指定されます:

  • ShutdownOnFailure — 誰かがリズムを外したら、指揮者が手を振って「ストップ! 最初から」。残りはすぐに演奏をやめます。
  • ShutdownOnSuccess — 逆に、誰かが完璧に演奏した瞬間に指揮者は満足して「もう十分、勝者は決まった」。残りは静まり、最初の成功結果を採用します。

仮想スレッドでの動作

各サブタスクは StructuredTaskScope によって仮想スレッドで起動されます。まるで、物分かりが良く素早く、舞台条件にも煩くない音楽家が揃っているようなもの。何百、何千といった実行者を安心して作成できます — それは重量級の OS スレッドではなく、必要なときに必要なだけ鳴るほぼ無重量の音符です。

3. 例: HTTP リクエストのアグリゲーター

実践的な課題を考えます。データソースが3つ (たとえば3つの異なるサーバー) あり、全員からの応答を得て集約するか、最初に成功したものの応答だけを採用したいとします。

バリアント 1: 「全員成功で可」 (ShutdownOnFailure)

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class AggregatorAllSuccess {
    public static void main(String[] args) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<String> f1 = scope.fork(() -> fetchFromSource1());
            Future<String> f2 = scope.fork(() -> fetchFromSource2());
            Future<String> f3 = scope.fork(() -> fetchFromSource3());

            scope.join(); // すべてのタスクの終了を待つ
            scope.throwIfFailed(); // 1つでも失敗したら例外を投げる

            // すべて成功したら結果を集約できる
            String result = f1.resultNow() + f2.resultNow() + f3.resultNow();
            System.out.println("集約結果: " + result);
        }
    }

    static String fetchFromSource1() { /* ... */ return "A"; }
    static String fetchFromSource2() { /* ... */ return "B"; }
    static String fetchFromSource3() { /* ... */ return "C"; }
}

何が起きているか:

  • 3つのタスクが並列に (仮想スレッドで) 起動されます。
  • 1つでも失敗したら、残りはキャンセルされ、例外が投げられます。
  • 全員成功なら、結果を安全に集約できます。

バリアント 2: 「最初の有効な成功で可」 (ShutdownOnSuccess)

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class AggregatorFirstSuccess {
    public static void main(String[] args) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            Future<String> f1 = scope.fork(() -> fetchFromSource1());
            Future<String> f2 = scope.fork(() -> fetchFromSource2());
            Future<String> f3 = scope.fork(() -> fetchFromSource3());

            scope.join(); // 最初の成功を待つ
            scope.throwIfFailed(); // すべて失敗なら例外を投げる

            String result = scope.result(); // 最初に成功したタスクの結果
            System.out.println("最初に成功した結果: " + result);
        }
    }

    static String fetchFromSource1() { /* ... */ return "A"; }
    static String fetchFromSource2() { /* ... */ return "B"; }
    static String fetchFromSource3() { /* ... */ return "C"; }
}

何が起きているか:

  • いずれかが成功した瞬間に、残りはキャンセルされます。
  • 全員失敗の場合は、例外が投げられます。

4. 自動キャンセルと縮退

StructuredTaskScope は、ポリシーがそう求める場合に残っているタスクのキャンセルを自動で行います。たとえば、1つが失敗した場合 (ShutdownOnFailure) や、1つが成功した場合 (ShutdownOnSuccess) は、残りのタスクにキャンセルのシグナル (interrupt) が送られます。

例: タイムアウト付きの正しい終了

import jdk.incubator.concurrent.StructuredTaskScope;
import java.time.Instant;
import java.util.concurrent.Future;

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> fetchWithTimeout());
    Future<String> f2 = scope.fork(() -> fetchWithTimeout());

    scope.joinUntil(Instant.now().plusSeconds(2)); // 最大2秒待つ
    scope.throwIfFailed();

    String result = f1.resultNow() + f2.resultNow();
    System.out.println(result);
}

タスクが2秒以内に完了しなかった場合は例外が投げられ、すべてのタスクがキャンセルされます。

5. エラーと例外処理

サブタスクの例外が scope にどうルーティングされるか

演奏中に誰かが音を外すことはありますが、StructuredTaskScope は見て見ぬふりはしません。誰が外したのかを丁寧に記録し、その後で指揮者に完全なレポートを渡します。throwIfFailed() を呼ぶと、集約例外が投げられます — いわば「今日の不協和音の一覧」です。必要なら、この「原因の木構造」を展開して、誰が具体的に足を引っ張ったのかを確認できます。特定の実行者について知りたければ、Future.exceptionNow() がそのタスクの結末を教えてくれます。

キャンセルは必ずしも失敗ではない

覚えておきたいのは、タスクのキャンセルが常にエラーを意味するわけではないことです。指揮者が「演奏はここまで」と言えば、音楽家は単に楽器を片付けるだけ — これは cancelled であって failed ではありません。本当のエラーは、誰かが実際に誤った演奏をした場合で、その例外は集約レポートに含まれます。

例: 原因の木構造

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> { throw new RuntimeException("エラー1"); });
    Future<String> f2 = scope.fork(() -> { throw new RuntimeException("エラー2"); });

    scope.join();
    scope.throwIfFailed(); // 両方の原因を含む例外を投げる
} catch (Exception e) {
    e.printStackTrace();
    // e.getSuppressed() で抑制された例外を取得できます
}

6. CompletableFuture との比較

StructuredTaskScopeCompletableFuture は、どちらも並行タスクを起動できますが、次のような違いがあります:

  • StructuredTaskScope は、タスクが論理的に関連し、一緒に終了/キャンセルされるべき (タスク階層がある) 場面に向いています。
  • CompletableFuture は、階層を持たないタスクの合成に適しています (たとえば変換のチェーンやリアクティブなシナリオ)。

StructuredTaskScope がコードを簡潔にする場面:

  • ブロックを抜ける前に、すべてのサブタスクの終了を保証したいとき。
  • キャンセルとエラー処理を一元管理したいとき。
  • 「ぶら下がった」タスクを残したくないとき。

CompletableFuture のほうが便利な場面:

  • タスク同士が独立しており、それぞれのライフサイクルで動けるとき。
  • thenCombinethenCompose など、複雑な合成が必要なとき。

7. 実践: HTTP リクエストのアグリゲーター

課題: 3つのソースへリクエストし、最初に成功した応答を得る

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

public class HttpAggregator {
    public static void main(String[] args) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            Future<String> f1 = scope.fork(() -> httpRequest("https://api1.example.com"));
            Future<String> f2 = scope.fork(() -> httpRequest("https://api2.example.com"));
            Future<String> f3 = scope.fork(() -> httpRequest("https://api3.example.com"));

            scope.join();
            scope.throwIfFailed();

            String result = scope.result();
            System.out.println("最初に成功した応答: " + result);
        }
    }

    static String httpRequest(String url) throws Exception {
        // リクエストの疑似実装 (HttpClient を使ってもよい)
        Thread.sleep((long) (Math.random() * 1000));
        if (Math.random() < 0.3) throw new RuntimeException("リクエストエラー: " + url);
        return "応答: " + url;
    }
}

課題: 1つのサブタスクが失敗したら残りを正しく止める

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.Future;

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> f1 = scope.fork(() -> httpRequest("https://api1.example.com"));
    Future<String> f2 = scope.fork(() -> httpRequest("https://api2.example.com"));

    scope.join();
    scope.throwIfFailed();

    String result = f1.resultNow() + f2.resultNow();
    System.out.println("両方の応答: " + result);
} catch (Exception e) {
    System.err.println("いずれかのタスクでエラー: " + e.getMessage());
}

8. StructuredTaskScope を使うときのよくある誤り

よくある誤り1: join()throwIfFailed() の呼び忘れ。
join() を呼ばないと、ブロックを抜けるまでにタスクが終わらない場合があります。throwIfFailed() を呼ばないと、サブタスクのエラーに気づけません。

よくある誤り2: タスク完了前に結果を取得しようとする。
タスクの完了前に resultNow() を呼ぶと IllegalStateException がスローされます。まず join() で完了を待ちましょう。

よくある誤り3: キャンセルを無視する。
タスクがキャンセルされている場合、その結果を取得しようとしてはいけません — 例外になります。

よくある誤り4: 異なる終了ポリシーを混ぜる。
scope の中で手動でタスクをキャンセルしようとするのではなく、ShutdownOnFailureShutdownOnSuccess といったポリシーを使いましょう。

よくある誤り5: 長時間の CPU バウンド処理を仮想スレッドで走らせる。
StructuredTaskScope はデフォルトで仮想スレッドを使います。これは I/O バウンドのタスクには最適ですが、重い計算を高速化するものではありません。

よくある誤り6: scope を閉じ忘れる (try-with-resources を使わない)。
StructuredTaskScopeAutoCloseable を実装しています。必ず try-with-resources を使って、すべてのタスクが確実に終了するようにしましょう。

コメント
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION