CodeGym/Java Blog/ランダム/Java と Thread クラスを組み合わせるとさらに効果的です。パート V — エグゼキュータ、スレッドプール...
John Squirrels
レベル 41
San Francisco

Java と Thread クラスを組み合わせるとさらに効果的です。パート V — エグゼキュータ、スレッドプール、フォーク/ジョイン

ランダム グループに公開済み
人のメンバー

序章

したがって、Java にはスレッドがあることがわかります。これについては、「Better together: Java and the Thread class」というタイトルのレビューで読むことができます。パート I — 実行のスレッドJava と Thread クラスを組み合わせるとさらに効果的です。 パート V — エグゼキュータ、スレッドプール、フォーク/ジョイン - 1典型的なコードをもう一度見てみましょう。
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
ご覧のとおり、タスクを開始するコードは非常に一般的ですが、新しいタスクに対してそれを繰り返す必要があります。1 つの解決策は、それを別のメソッドに置くことですexecute(Runnable runnable)。しかし、Java の作成者は私たちの窮状を考慮し、次のインターフェイスを考え出しましたExecutor
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
このコードは明らかにより簡潔です。ここでは、Runnableスレッド上で を開始するコードを記述するだけです。素晴らしいですね。しかし、これはほんの始まりにすぎません。 Java と Thread クラスを組み合わせるとさらに効果的です。 パート V — エグゼキュータ、スレッドプール、フォーク/ジョイン - 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

ご覧のとおり、ExecutorインターフェイスにはサブインターフェイスがありますExecutorService。このインターフェイスの Javadoc には、をシャットダウンするメソッドを提供するExecutorService特定の が記述されていると記載されています。また、実行プロセスを追跡するためにを取得することもできます。以前は、「Better together: Java と Thread クラス」で説明しました。パート IV — Callable、Future、およびその仲間たちでは、 の機能を簡単にレビューしました。忘れてしまった、または読んだことがない場合は、記憶を新たにすることをお勧めします ;) Javadoc には他に何と書いてありますか? これは、のデフォルト実装を作成できる特別なファクトリーがあることを示しています。 ExecutorExecutorjava.util.concurrent.FutureFuturejava.util.concurrent.ExecutorsExecutorService

ExecutorService

確認してみましょう。スレッド上で特定のタスクをExecutor実行する (つまり、呼び出す) 必要がありますが、スレッドを作成するコードは隠されています。execute()私たちは、進行状況を制御するためのいくつかのオプションを備えたExecutorService特定のものを持っていますExecutorExecutorsそして、を作成できる工場がありますExecutorService。では、自分でやってみましょう:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
サイズが 2 の固定スレッド プールを指定したことがわかります。次に、タスクを 1 つずつプールに送信します。各タスクは、Stringスレッド名 ( currentThread().GetName()) を含む を返します。ExecutorService最後に をシャットダウンすることが重要です。そうしないとプログラムが終了しません。ファクトリExecutorsには追加のファクトリ メソッドがあります。たとえば、1 つのスレッド ( ) だけで構成されるプール、または1 分間アイドル状態になったスレッドが削除されるnewSingleThreadExecutorキャッシュ ( ) を含むプールを作成できます。newCachedThreadPool実際には、これらはブロッキング キューExecutorServiceによってバックアップされており、そこにタスクが配置され、そこからタスクが実行されます。ブロックキューの詳細については、このビデオを参照してください。こちらもお読みいただけますBlockingQueue について確認してくださいそして、「ArrayBlockingQueue より LinkedBlockingQueue を優先するのはどのような場合ですか?」という質問に対する答えを確認してください。最も単純に言えば、BlockingQueue次の 2 つの場合に a がスレッドをブロックします。
  • スレッドは空のキューからアイテムを取得しようとします
  • スレッドはアイテムを満杯のキューに入れようとします
ファクトリ メソッドの実装を見ると、それらがどのように機能するかがわかります。例えば:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
また
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
ご覧のとおり、 の実装はExecutorServiceファクトリ メソッド内で作成されます。そしてほとんどの場合、私たちは について話していますThreadPoolExecutor。作業に影響を与えるパラメータのみが変更されます。 Java と Thread クラスを組み合わせるとさらに効果的です。 パート V — エグゼキュータ、スレッドプール、フォーク/ジョイン - 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

スレッドプールエグゼキュータ

前に見たように、ThreadPoolExecutorこれは通常、ファクトリ メソッド内で作成されるものです。この機能は、スレッドの最大数と最小数として渡す引数、および使用されているキューのタイプによって影響を受けます。ただし、java.util.concurrent.BlockingQueueインターフェイスの任意の実装を使用できます。について言えばThreadPoolExecutor、いくつかの興味深い機能について触れておく必要があります。たとえば、ThreadPoolExecutor利用可能なスペースがない場合、タスクを に送信することはできません。
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
このコードは次のようなエラーでクラッシュします。
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
言い換えれば、は実際には単一の要素で構成され、それ以上何も入力できないように設計されているtaskため、送信できませんSynchronousQueue。ここではゼロ (「キューにあるタスク = 0」) があることがわかりますqueued tasks。しかし、これは の特殊な機能であり、SynchronousQueue実際には常に空である 1 要素のキューであるため、これについては何も不思議なことはありません。あるスレッドが要素をキューに入れると、別のスレッドがキューから要素を取得するまで待機します。したがって、これを に置き換えることができnew LinkedBlockingQueue<>(1)、エラーは show に変わりますqueued tasks = 1。キューには 1 つの要素しかないため、2 番目の要素を追加することはできません。そしてそれがプログラムの失敗の原因です。キューについての説明を続けると、次の点に注目する価値があります。ThreadPoolExecutorクラスには、キューを処理するための追加のメソッドがあります。たとえば、このthreadPoolExecutor.purge()メソッドはキュー内のスペースを解放するために、キャンセルされたすべてのタスクをキューから削除します。もう 1 つの興味深いキュー関連関数は、拒否されたタスクのハンドラーです。
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
この例では、ハンドラーはRejectedキュー内のタスクが拒否されるたびに単純に表示します。便利ですね。さらに、にThreadPoolExecutorは興味深いサブクラスがあります。タイマーに基づいてタスクを実行する機能を提供します。 ScheduledThreadPoolExecutorScheduledExecutorService

ScheduledExecutorService

ScheduledExecutorService( のタイプですExecutorService) を使用すると、スケジュールに従ってタスクを実行できます。例を見てみましょう:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
ここではすべてがシンプルです。タスクが送信されると、 が届きますjava.util.concurrent.ScheduledFuture。スケジュールは次のような場合にも役立ちます。
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
ここでは、一定の初期遅延を伴う固定頻度 (「FixedRate」) で実行するタスクを送信しますRunnable。この場合、1 秒後、タスクは 2 秒ごとに実行され始めます。同様のオプションがあります。
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
ただし、この場合、タスクは各実行の間に特定の間隔を置いて実行されます。つまり、 はtask1 秒後に実行されます。そして、完了するとすぐに 2 秒が経過し、新しいタスクが開始されます。このトピックに関する追加リソースは次のとおりです。 Java と Thread クラスを組み合わせるとさらに効果的です。 パート V — エグゼキュータ、スレッドプール、フォーク/ジョイン - 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

仕事盗みプール

上記のスレッド プールに加えて、もう 1 つあります。正直に言うと、これは少し特別です。それは仕事盗みプールと呼ばれます。つまり、ワークスチールとは、アイドル状態のスレッドが他のスレッドからタスクを取得したり、共有キューからタスクを取得し始めるアルゴリズムです。例を見てみましょう:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
このコードを実行すると、ExecutorService各スレッドがロック オブジェクトの待機キューに入れられるため、5 つのスレッドが作成されます。私たちはすでに、 Java と Thread クラスという Better の組み合わせでモニターとロックを理解しました。パート II — 同期Executors.newCachedThreadPool()では、に置き換えてみましょうExecutors.newWorkStealingPool()。何が変わるのでしょうか?タスクが 5 つ未満のスレッドで実行されていることがわかります。CachedThreadPoolタスクごとにスレッドが作成されることを覚えていますか? これは、wait()スレッドがブロックされ、後続のタスクを完了する必要があり、そのタスク用に新しいスレッドがプール内に作成されたためです。盗用プールを使用すると、スレッドが永久にアイドル状態になることはありません。彼らは隣人の任務を遂行し始めます。はWorkStealingPool他のスレッド プールと何が違うのでしょうか? 魔法のような事実ForkJoinPoolその中に住んでいます:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
実は、もう一つ違いがあります。デフォルトでは、ForkJoinPoolonrdinary によって作成されたスレッドとは異なり、 に対して作成されたスレッドはデーモン スレッドですThreadPool。一般に、デーモン スレッドについて覚えておく必要があります。たとえば、非デーモン スレッドを作成するCompletableFuture独自のスレッドを指定しない限り、デーモン スレッドも使用されるからです。ThreadFactory予期せぬ場所にサプライズが潜んでいるかもしれません。:)

フォーク参加プール

ForkJoinPoolこのパートでは、の「内部」に存在する (フォーク/結合フレームワークとも呼ばれる)について再度説明しますWorkStealingPool。一般に、fork/join フレームワークは Java 1.7 に登場しました。Java 11 はもうすぐそこまで来ていますが、それでも覚えておく価値はあります。これは最も一般的な実装ではありませんが、非常に興味深いものです。これに関する優れたレビューが Web 上にあります: Understanding Java Fork-Join Framework with Examples。はForkJoinPoolに依存しますjava.util.concurrent.RecursiveTask。もありますjava.util.concurrent.RecursiveActionRecursiveAction結果を返しません。したがって、RecursiveTaskは に似ておりCallable、 はRecursiveActionに似ていますunnableforkこの名前には、とという 2 つの重要なメソッドの名前が含まれていることがわかりますjoin。のforkメソッドは、別のスレッドでいくつかのタスクを非同期的に開始します。このjoinメソッドを使用すると、作業が完了するまで待つことができます。よりよく理解するには、「命令型プログラミングからフォーク/結合、Java 8 の並列ストリームまで」を読む必要があります。

まとめ

さて、これでレビューのこの部分は終わりです。Executorこれはもともとスレッドを実行するために発明されたものであることがわかりました。その後、Java の作成者はそのアイデアを継続することを決定し、 を思いつきましたExecutorService。と をExecutorService使用して実行するタスクを送信し、サービスをシャットダウンすることもできます。実装が必要なため、ファクトリ メソッドを含むクラスを作成し、それを呼び出しました。スレッド プール ( ) を作成できます。さらに、実行スケジュールを指定できるスレッド プールもあります。そして、 はの後ろに隠れます。私が上で書いた内容が興味深いだけでなく、理解できるものであると感じていただければ幸いです :) ご提案やコメントをいつでも喜んで聞いています。 submit()invoke()ExecutorServiceExecutorsThreadPoolExecutorForkJoinPoolWorkStealingPoolJava と Thread クラスを組み合わせるとさらに効果的です。パート I — 実行スレッド 組み合わせるとさらに効果的: Java と Thread クラス。パート II — 同期 併用するとより効果的です: Java と Thread クラス。パート III — 連携 を強化: Java と Thread クラス。パート IV — Callable、Future、およびその仲間たち 一緒にさらに良く: Java と Thread クラス。パート VI — 撃て!
コメント
  • 人気
  • 新規
  • 古い
コメントを残すには、サインインしている必要があります
このページにはまだコメントがありません