序章
したがって、Java にはスレッドがあることがわかります。これについては、「Better together: Java and the Thread class」というタイトルのレビューで読むことができます。パート I — 実行のスレッド。 典型的なコードをもう一度見てみましょう。public static void main(String[] args) throws Exception {
Runnable task = () -> {
System.out.println("Task executed");
};
Thread thread = new Thread(task);
thread.start();
}
ご覧のとおり、タスクを開始するコードは非常に一般的ですが、新しいタスクに対してそれを繰り返す必要があります。1 つの解決策は、それを別のメソッドに置くことですexecute(Runnable runnable)
。しかし、Java の作成者は私たちの窮状を考慮し、次のインターフェイスを考え出しましたExecutor
。
public static void main(String[] args) throws Exception {
Runnable task = () -> System.out.println("Task executed");
Executor executor = (runnable) -> {
new Thread(runnable).start();
};
executor.execute(task);
}
このコードは明らかにより簡潔です。ここでは、Runnable
スレッド上で を開始するコードを記述するだけです。素晴らしいですね。しかし、これはほんの始まりにすぎません。
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
Executor
インターフェイスにはサブインターフェイスがありますExecutorService
。このインターフェイスの Javadoc には、をシャットダウンするメソッドを提供するExecutorService
特定の が記述されていると記載されています。また、実行プロセスを追跡するためにを取得することもできます。以前は、「Better together: Java と Thread クラス」で説明しました。パート IV — Callable、Future、およびその仲間たちでは、 の機能を簡単にレビューしました。忘れてしまった、または読んだことがない場合は、記憶を新たにすることをお勧めします ;) Javadoc には他に何と書いてありますか? これは、のデフォルト実装を作成できる特別なファクトリーがあることを示しています。 Executor
Executor
java.util.concurrent.Future
Future
java.util.concurrent.Executors
ExecutorService
ExecutorService
確認してみましょう。スレッド上で特定のタスクをExecutor
実行する (つまり、呼び出す) 必要がありますが、スレッドを作成するコードは隠されています。execute()
私たちは、進行状況を制御するためのいくつかのオプションを備えたExecutorService
特定のものを持っていますExecutor
。Executors
そして、を作成できる工場がありますExecutorService
。では、自分でやってみましょう:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> task = () -> Thread.currentThread().getName();
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
Future result = service.submit(task);
System.out.println(result.get());
}
service.shutdown();
}
サイズが 2 の固定スレッド プールを指定したことがわかります。次に、タスクを 1 つずつプールに送信します。各タスクは、String
スレッド名 ( currentThread().GetName()
) を含む を返します。ExecutorService
最後に をシャットダウンすることが重要です。そうしないとプログラムが終了しません。ファクトリExecutors
には追加のファクトリ メソッドがあります。たとえば、1 つのスレッド ( ) だけで構成されるプール、または1 分間アイドル状態になったスレッドが削除されるnewSingleThreadExecutor
キャッシュ ( ) を含むプールを作成できます。newCachedThreadPool
実際には、これらはブロッキング キューExecutorService
によってバックアップされており、そこにタスクが配置され、そこからタスクが実行されます。ブロックキューの詳細については、このビデオを参照してください。こちらもお読みいただけますBlockingQueue について確認してください。そして、「ArrayBlockingQueue より LinkedBlockingQueue を優先するのはどのような場合ですか?」という質問に対する答えを確認してください。最も単純に言えば、BlockingQueue
次の 2 つの場合に a がスレッドをブロックします。
- スレッドは空のキューからアイテムを取得しようとします
- スレッドはアイテムを満杯のキューに入れようとします
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
また
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ご覧のとおり、 の実装はExecutorService
ファクトリ メソッド内で作成されます。そしてほとんどの場合、私たちは について話していますThreadPoolExecutor
。作業に影響を与えるパラメータのみが変更されます。
https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg
スレッドプールエグゼキュータ
前に見たように、ThreadPoolExecutor
これは通常、ファクトリ メソッド内で作成されるものです。この機能は、スレッドの最大数と最小数として渡す引数、および使用されているキューのタイプによって影響を受けます。ただし、java.util.concurrent.BlockingQueue
インターフェイスの任意の実装を使用できます。について言えばThreadPoolExecutor
、いくつかの興味深い機能について触れておく必要があります。たとえば、ThreadPoolExecutor
利用可能なスペースがない場合、タスクを に送信することはできません。
public static void main(String[] args) throws ExecutionException, InterruptedException {
int threadBound = 2;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
0L, TimeUnit.SECONDS, new SynchronousQueue<>());
Callable<String> task = () -> {
Thread.sleep(1000);
return Thread.currentThread().getName();
};
for (int i = 0; i < threadBound + 1; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
このコードは次のようなエラーでクラッシュします。
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
言い換えれば、は実際には単一の要素で構成され、それ以上何も入力できないように設計されているtask
ため、送信できませんSynchronousQueue
。ここではゼロ (「キューにあるタスク = 0」) があることがわかりますqueued tasks
。しかし、これは の特殊な機能であり、SynchronousQueue
実際には常に空である 1 要素のキューであるため、これについては何も不思議なことはありません。あるスレッドが要素をキューに入れると、別のスレッドがキューから要素を取得するまで待機します。したがって、これを に置き換えることができnew LinkedBlockingQueue<>(1)
、エラーは show に変わりますqueued tasks = 1
。キューには 1 つの要素しかないため、2 番目の要素を追加することはできません。そしてそれがプログラムの失敗の原因です。キューについての説明を続けると、次の点に注目する価値があります。ThreadPoolExecutor
クラスには、キューを処理するための追加のメソッドがあります。たとえば、このthreadPoolExecutor.purge()
メソッドはキュー内のスペースを解放するために、キャンセルされたすべてのタスクをキューから削除します。もう 1 つの興味深いキュー関連関数は、拒否されたタスクのハンドラーです。
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.SECONDS, new SynchronousQueue());
Callable<String> task = () -> Thread.currentThread().getName();
threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(task);
}
threadPoolExecutor.shutdown();
}
この例では、ハンドラーはRejected
キュー内のタスクが拒否されるたびに単純に表示します。便利ですね。さらに、にThreadPoolExecutor
は興味深いサブクラスがあります。タイマーに基づいてタスクを実行する機能を提供します。 ScheduledThreadPoolExecutor
ScheduledExecutorService
ScheduledExecutorService
ScheduledExecutorService
( のタイプですExecutorService
) を使用すると、スケジュールに従ってタスクを実行できます。例を見てみましょう:
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
return Thread.currentThread().getName();
};
scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
scheduledExecutorService.shutdown();
}
ここではすべてがシンプルです。タスクが送信されると、 が届きますjava.util.concurrent.ScheduledFuture
。スケジュールは次のような場合にも役立ちます。
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
ここでは、一定の初期遅延を伴う固定頻度 (「FixedRate」) で実行するタスクを送信しますRunnable
。この場合、1 秒後、タスクは 2 秒ごとに実行され始めます。同様のオプションがあります。
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ただし、この場合、タスクは各実行の間に特定の間隔を置いて実行されます。つまり、 はtask
1 秒後に実行されます。そして、完了するとすぐに 2 秒が経過し、新しいタスクが開始されます。このトピックに関する追加リソースは次のとおりです。
- Java のスレッド プールの概要
- Java のスレッド プールの概要
- Java マルチスレッド障害物競走: Executor でのタスクのキャンセル
- バックグラウンド タスクに Java Executor を使用する
https://dzone.com/articles/diving-into-java-8s-newworkstealingpools
仕事盗みプール
上記のスレッド プールに加えて、もう 1 つあります。正直に言うと、これは少し特別です。それは仕事盗みプールと呼ばれます。つまり、ワークスチールとは、アイドル状態のスレッドが他のスレッドからタスクを取得したり、共有キューからタスクを取得し始めるアルゴリズムです。例を見てみましょう:public static void main(String[] args) {
Object lock = new Object();
ExecutorService executorService = Executors.newCachedThreadPool();
Callable<String> task = () -> {
System.out.println(Thread.currentThread().getName());
lock.wait(2000);
System.out.println("Finished");
return "result";
};
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
}
このコードを実行すると、ExecutorService
各スレッドがロック オブジェクトの待機キューに入れられるため、5 つのスレッドが作成されます。私たちはすでに、 Java と Thread クラスという Better の組み合わせでモニターとロックを理解しました。パート II — 同期。Executors.newCachedThreadPool()
では、に置き換えてみましょうExecutors.newWorkStealingPool()
。何が変わるのでしょうか?タスクが 5 つ未満のスレッドで実行されていることがわかります。CachedThreadPool
タスクごとにスレッドが作成されることを覚えていますか? これは、wait()
スレッドがブロックされ、後続のタスクを完了する必要があり、そのタスク用に新しいスレッドがプール内に作成されたためです。盗用プールを使用すると、スレッドが永久にアイドル状態になることはありません。彼らは隣人の任務を遂行し始めます。はWorkStealingPool
他のスレッド プールと何が違うのでしょうか? 魔法のような事実ForkJoinPool
その中に住んでいます:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
実は、もう一つ違いがあります。デフォルトでは、ForkJoinPool
onrdinary によって作成されたスレッドとは異なり、 に対して作成されたスレッドはデーモン スレッドですThreadPool
。一般に、デーモン スレッドについて覚えておく必要があります。たとえば、非デーモン スレッドを作成するCompletableFuture
独自のスレッドを指定しない限り、デーモン スレッドも使用されるからです。ThreadFactory
予期せぬ場所にサプライズが潜んでいるかもしれません。:)
フォーク参加プール
ForkJoinPool
このパートでは、の「内部」に存在する (フォーク/結合フレームワークとも呼ばれる)について再度説明しますWorkStealingPool
。一般に、fork/join フレームワークは Java 1.7 に登場しました。Java 11 はもうすぐそこまで来ていますが、それでも覚えておく価値はあります。これは最も一般的な実装ではありませんが、非常に興味深いものです。これに関する優れたレビューが Web 上にあります: Understanding Java Fork-Join Framework with Examples。はForkJoinPool
に依存しますjava.util.concurrent.RecursiveTask
。もありますjava.util.concurrent.RecursiveAction
。RecursiveAction
結果を返しません。したがって、RecursiveTask
は に似ておりCallable
、 はRecursiveAction
に似ていますunnable
。fork
この名前には、とという 2 つの重要なメソッドの名前が含まれていることがわかりますjoin
。のfork
メソッドは、別のスレッドでいくつかのタスクを非同期的に開始します。このjoin
メソッドを使用すると、作業が完了するまで待つことができます。よりよく理解するには、「命令型プログラミングからフォーク/結合、Java 8 の並列ストリームまで」を読む必要があります。
まとめ
さて、これでレビューのこの部分は終わりです。Executor
これはもともとスレッドを実行するために発明されたものであることがわかりました。その後、Java の作成者はそのアイデアを継続することを決定し、 を思いつきましたExecutorService
。と をExecutorService
使用して実行するタスクを送信し、サービスをシャットダウンすることもできます。実装が必要なため、ファクトリ メソッドを含むクラスを作成し、それを呼び出しました。スレッド プール ( ) を作成できます。さらに、実行スケジュールを指定できるスレッド プールもあります。そして、 はの後ろに隠れます。私が上で書いた内容が興味深いだけでなく、理解できるものであると感じていただければ幸いです :) ご提案やコメントをいつでも喜んで聞いています。 submit()
invoke()
ExecutorService
Executors
ThreadPoolExecutor
ForkJoinPool
WorkStealingPool
Java と Thread クラスを組み合わせるとさらに効果的です。パート I — 実行スレッド 組み合わせるとさらに効果的: Java と Thread クラス。パート II — 同期 併用するとより効果的です: Java と Thread クラス。パート III — 連携 を強化: Java と Thread クラス。パート IV — Callable、Future、およびその仲間たち 一緒にさらに良く: Java と Thread クラス。パート VI — 撃て!