CodeGym/Java-Blog/Random-DE/Besser zusammen: Java und die Thread-Klasse. Teil V – Exe...
John Squirrels
Level 41
San Francisco

Besser zusammen: Java und die Thread-Klasse. Teil V – Executor, ThreadPool, Fork/Join

Veröffentlicht in der Gruppe Random-DE

Einführung

Wir wissen also, dass Java Threads hat. Darüber können Sie in der Rezension mit dem Titel „ Besser zusammen: Java und die Thread-Klasse“ nachlesen. Teil I – Threads zur Ausführung . Besser zusammen: Java und die Thread-Klasse.  Teil V – Executor, ThreadPool, Fork/Join – 1Schauen wir uns den typischen Code noch einmal an:
public static void main(String[] args) throws Exception {
	Runnable task = () -> {
		System.out.println("Task executed");
	};
	Thread thread = new Thread(task);
	thread.start();
}
Wie Sie sehen, ist der Code zum Starten einer Aufgabe ziemlich typisch, aber wir müssen ihn für neue Aufgaben wiederholen. Eine Lösung besteht darin, es in eine separate Methode einzufügen, z. B. execute(Runnable runnable). Aber die Entwickler von Java haben unsere Notlage berücksichtigt und die Schnittstelle entwickelt Executor:
public static void main(String[] args) throws Exception {
	Runnable task = () -> System.out.println("Task executed");
	Executor executor = (runnable) -> {
		new Thread(runnable).start();
	};
	executor.execute(task);
}
Dieser Code ist deutlich prägnanter: Jetzt schreiben wir einfach Code, um den RunnableThread zu starten. Das ist großartig, nicht wahr? Aber das ist erst der Anfang: Besser zusammen: Java und die Thread-Klasse.  Teil V – Executor, ThreadPool, Fork/Join – 2

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html

Wie Sie sehen, Executorverfügt die Schnittstelle über eine ExecutorServiceUnterschnittstelle. Im Javadoc für diese Schnittstelle heißt es, dass an ExecutorServiceeine bestimmte Methode beschrieben wird Executor, die Methoden zum Herunterfahren der . bereitstellt Executor. Außerdem ist es möglich, eine Datei zu erhalten, java.util.concurrent.Futureum den Ausführungsprozess zu verfolgen. Zuvor in Better Together: Java und die Thread-Klasse. In Teil IV – Callable, Future und Freunde haben wir kurz die Fähigkeiten von besprochen Future. Wenn Sie es vergessen oder nie gelesen haben, empfehle ich Ihnen, Ihr Gedächtnis aufzufrischen ;) Was sagt das Javadoc sonst noch? Es sagt uns, dass wir über eine spezielle java.util.concurrent.ExecutorsFactory verfügen, mit der wir Standardimplementierungen von erstellen können ExecutorService.

ExecutorService

Lassen Sie uns einen Rückblick geben. Wir müssen eine bestimmte Aufgabe in einem Thread Executorausführen (dh aufrufen execute()), und der Code, der den Thread erstellt, bleibt uns verborgen. Wir haben ExecutorService– ein Spezifisches Executor, das mehrere Optionen zur Steuerung des Fortschritts bietet. Und wir haben die ExecutorsFabrik, mit der wir eine erstellen können ExecutorService. Jetzt machen wir es selbst:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	Callable<String> task = () -> Thread.currentThread().getName();
	ExecutorService service = Executors.newFixedThreadPool(2);
	for (int i = 0; i < 5; i++) {
		Future result = service.submit(task);
		System.out.println(result.get());
	}
	service.shutdown();
}
Sie können sehen, dass wir einen festen Thread-Pool mit einer Größe von 2 angegeben haben. Anschließend senden wir Aufgaben nacheinander an den Pool. Jede Aufgabe gibt eine zurück, die Stringden Thread-Namen enthält ( currentThread().GetName()). ExecutorServiceEs ist wichtig, den ganz am Ende herunterzufahren , da sonst unser Programm nicht beendet wird. Die ExecutorsFabrik verfügt über zusätzliche Fabrikmethoden. Beispielsweise können wir einen Pool erstellen, der nur aus einem Thread besteht ( newSingleThreadExecutor), oder einen Pool, der einen Cache enthält ( newCachedThreadPool), aus dem Threads entfernt werden, nachdem sie eine Minute lang inaktiv waren. In Wirklichkeit werden diese ExecutorServicedurch eine Blockierungswarteschlange unterstützt , in die Aufgaben gestellt und von der aus Aufgaben ausgeführt werden. Weitere Informationen zum Blockieren von Warteschlangen finden Sie in diesem Video . Sie können dies auch lesenRezension zu BlockingQueue . Und schauen Sie sich die Antwort auf die Frage „Wann ist LinkedBlockingQueue dem ArrayBlockingQueue vorzuziehen?“ an. Vereinfacht ausgedrückt BlockingQueueblockiert a einen Thread in zwei Fällen:
  • Der Thread versucht, Elemente aus einer leeren Warteschlange abzurufen
  • Der Thread versucht, Elemente in eine volle Warteschlange zu stellen
Wenn wir uns die Implementierung der Factory-Methoden ansehen, können wir sehen, wie sie funktionieren. Zum Beispiel:
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}
oder
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
Wie wir sehen können, werden Implementierungen von ExecutorServiceinnerhalb der Factory-Methoden erstellt. Und größtenteils reden wir darüber ThreadPoolExecutor. Es werden nur die Parameter geändert, die die Arbeit beeinflussen. Besser zusammen: Java und die Thread-Klasse.  Teil V – Executor, ThreadPool, Fork/Join – 3

https://en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svg

ThreadPoolExecutor

Wie wir bereits gesehen haben, ThreadPoolExecutorwird dies normalerweise innerhalb der Factory-Methoden erstellt. Die Funktionalität wird durch die Argumente beeinflusst, die wir als maximale und minimale Anzahl von Threads übergeben, sowie durch den verwendeten Warteschlangentyp. Es kann jedoch jede Implementierung der java.util.concurrent.BlockingQueueSchnittstelle verwendet werden. Apropos ThreadPoolExecutor: Wir sollten einige interessante Funktionen erwähnen. Beispielsweise können Sie keine Aufgaben an senden, ThreadPoolExecutorwenn kein Speicherplatz verfügbar ist:
public static void main(String[] args) throws ExecutionException, InterruptedException {
	int threadBound = 2;
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, threadBound,
            0L, TimeUnit.SECONDS, new SynchronousQueue<>());
	Callable<String> task = () -> {
		Thread.sleep(1000);
		return Thread.currentThread().getName();
	};
	for (int i = 0; i < threadBound + 1; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
Dieser Code stürzt mit einem Fehler wie diesem ab:
Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
Mit anderen Worten, taskkann nicht eingereicht werden, da SynchronousQueuees so konzipiert ist, dass es tatsächlich aus einem einzigen Element besteht und es uns nicht erlaubt, mehr hineinzufügen. Wir können sehen, dass wir queued taskshier Null haben („Queued Tasks = 0“). Daran ist aber nichts Seltsames, denn das ist eine Besonderheit von SynchronousQueue, bei der es sich tatsächlich um eine 1-Element-Warteschlange handelt, die immer leer ist! Wenn ein Thread ein Element in die Warteschlange stellt, wartet er, bis ein anderer Thread das Element aus der Warteschlange übernimmt. Dementsprechend können wir es durch ersetzen new LinkedBlockingQueue<>(1)und der Fehler wird in now show geändert queued tasks = 1. Da die Warteschlange nur aus einem Element besteht, können wir kein zweites Element hinzufügen. Und genau das führt zum Scheitern des Programms. Wenn wir unsere Diskussion über die Warteschlange fortsetzen, ist es erwähnenswert, dass dieThreadPoolExecutorDie Klasse verfügt über zusätzliche Methoden zum Bedienen der Warteschlange. Die Methode entfernt beispielsweise threadPoolExecutor.purge()alle abgebrochenen Aufgaben aus der Warteschlange, um Platz in der Warteschlange freizugeben. Eine weitere interessante warteschlangenbezogene Funktion ist der Handler für abgelehnte Aufgaben:
public static void main(String[] args) {
	ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.SECONDS, new SynchronousQueue());
	Callable<String> task = () -> Thread.currentThread().getName();
	threadPoolExecutor.setRejectedExecutionHandler((runnable, executor) -> System.out.println("Rejected"));
	for (int i = 0; i < 5; i++) {
		threadPoolExecutor.submit(task);
	}
	threadPoolExecutor.shutdown();
}
In diesem Beispiel zeigt unser Handler einfach Rejectedjedes Mal an, wenn eine Aufgabe in der Warteschlange abgelehnt wird. Praktisch, nicht wahr? Darüber hinaus ThreadPoolExecutorgibt es eine interessante Unterklasse: ScheduledThreadPoolExecutor, eine ScheduledExecutorService. Es bietet die Möglichkeit, eine Aufgabe basierend auf einem Timer auszuführen.

ScheduledExecutorService

ScheduledExecutorService(was eine Art von ist ExecutorService) ermöglicht es uns, Aufgaben nach einem Zeitplan auszuführen. Schauen wir uns ein Beispiel an:
public static void main(String[] args) {
	ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		return Thread.currentThread().getName();
	};
	scheduledExecutorService.schedule(task, 1, TimeUnit.MINUTES);
	scheduledExecutorService.shutdown();
}
Hier ist alles einfach. Die Aufgaben werden eingereicht und dann bekommen wir eine java.util.concurrent.ScheduledFuture. Ein Zeitplan kann auch in der folgenden Situation hilfreich sein:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4);
Runnable task = () -> {
	System.out.println(Thread.currentThread().getName());
};
scheduledExecutorService.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
Hier übermitteln wir eine RunnableAufgabe zur Ausführung in einer festen Häufigkeit („FixedRate“) mit einer bestimmten Anfangsverzögerung. In diesem Fall wird die Aufgabe nach 1 Sekunde alle 2 Sekunden ausgeführt. Es gibt eine ähnliche Option:
scheduledExecutorService.scheduleWithFixedDelay(task, 1, 2, TimeUnit.SECONDS);
In diesem Fall werden die Aufgaben jedoch mit einem bestimmten Intervall ZWISCHEN jeder Ausführung ausgeführt. Das heißt, der Befehl taskwird nach 1 Sekunde ausgeführt. Sobald es abgeschlossen ist, vergehen 2 Sekunden und dann wird eine neue Aufgabe gestartet. Hier sind einige zusätzliche Ressourcen zu diesem Thema: Besser zusammen: Java und die Thread-Klasse.  Teil V – Executor, ThreadPool, Fork/Join – 4

https://dzone.com/articles/diving-into-java-8s-newworkstealingpools

WorkStealingPool

Zusätzlich zu den oben genannten Thread-Pools gibt es noch einen weiteren. Wir können ehrlich sagen, dass es etwas Besonderes ist. Man nennt es einen arbeitsraubenden Pool. Kurz gesagt, Work-Stealing ist ein Algorithmus, bei dem inaktive Threads damit beginnen, Aufgaben von anderen Threads oder Aufgaben aus einer gemeinsam genutzten Warteschlange zu übernehmen. Schauen wir uns ein Beispiel an:
public static void main(String[] args) {
	Object lock = new Object();
	ExecutorService executorService = Executors.newCachedThreadPool();
	Callable<String> task = () -> {
		System.out.println(Thread.currentThread().getName());
		lock.wait(2000);
		System.out.println("Finished");
		return "result";
	};
	for (int i = 0; i < 5; i++) {
		executorService.submit(task);
	}
	executorService.shutdown();
}
Wenn wir diesen Code ausführen, werden ExecutorService5 Threads für uns erstellt, da jeder Thread in die Warteschlange für das Sperrobjekt gestellt wird. Wir haben die Monitore und Sperren bereits besser zusammen herausgefunden : Java und die Thread-Klasse. Teil II – Synchronisierung . Jetzt ersetzen wir Executors.newCachedThreadPool()durch Executors.newWorkStealingPool(). Was wird sich ändern? Wir werden sehen, dass unsere Aufgaben in weniger als 5 Threads ausgeführt werden. Denken Sie daran, dass CachedThreadPoolfür jede Aufgabe ein Thread erstellt wird? Das liegt daran, dass wait()der Thread blockiert wurde, nachfolgende Aufgaben abgeschlossen werden müssen und für sie neue Threads im Pool erstellt wurden. Bei einem Diebstahlpool bleiben Threads nicht ewig untätig. Sie beginnen, die Aufgaben ihrer Nachbarn zu erledigen. Was unterscheidet einen WorkStealingPoolso von anderen Thread-Pools? Die Tatsache, dass das MagischeForkJoinPoollebt darin:
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}
Tatsächlich gibt es noch einen weiteren Unterschied. Standardmäßig sind die für a erstellten Threads ForkJoinPoolDaemon-Threads, im Gegensatz zu den Threads, die über onrdinary erstellt wurden ThreadPool. Im Allgemeinen sollten Sie sich Daemon-Threads merken, da beispielsweise CompletableFutureauch Daemon-Threads verwendet werden, es sei denn, Sie geben einen eigenen Thread an ThreadFactory, der Nicht-Daemon-Threads erstellt. Das sind die Überraschungen, die an unerwarteten Orten lauern können! :) :)

ForkJoinPool

In diesem Teil werden wir noch einmal über ForkJoinPool(auch Fork/Join-Framework genannt) sprechen, das „unter der Haube“ von WorkStealingPool. Im Allgemeinen erschien das Fork/Join-Framework bereits in Java 1.7. Und auch wenn Java 11 in greifbarer Nähe ist, lohnt es sich dennoch, daran zu denken. Dies ist nicht die häufigste Implementierung, aber sie ist sehr interessant. Es gibt eine gute Rezension dazu im Internet: Understanding Java Fork-Join Framework with Examples . Das ForkJoinPoolberuht auf java.util.concurrent.RecursiveTask. Es gibt auch java.util.concurrent.RecursiveAction. RecursiveActiongibt kein Ergebnis zurück. Daher RecursiveTaskähnelt es Callable, und RecursiveActionähnelt unnable. Wir können sehen, dass der Name die Namen von zwei wichtigen Methoden enthält: forkund join. DerforkDie Methode startet eine Aufgabe asynchron in einem separaten Thread. Und mit dieser joinMethode können Sie warten, bis die Arbeit erledigt ist. Um das beste Verständnis zu erhalten, sollten Sie „ Von imperativer Programmierung über Fork/Join bis hin zu parallelen Streams in Java 8“ lesen .

Zusammenfassung

Damit ist dieser Teil der Rezension abgeschlossen. Wir haben erfahren, dass Executores ursprünglich zur Ausführung von Threads erfunden wurde. Dann beschlossen die Entwickler von Java, die Idee fortzusetzen und entwickelten ExecutorService. ExecutorServiceermöglicht es uns, Aufgaben mit submit()und zur Ausführung zu übermitteln invoke()und den Dienst auch herunterzufahren. Da ExecutorServiceImplementierungen erforderlich sind, haben sie eine Klasse mit Factory-Methoden geschrieben und sie genannt Executors. Damit können Sie Thread-Pools erstellen ( ThreadPoolExecutor). Darüber hinaus gibt es Thread-Pools, mit denen wir auch einen Ausführungsplan festlegen können. Und ein ForkJoinPoolversteckt sich hinter einem WorkStealingPool. Ich hoffe, Sie fanden das, was ich oben geschrieben habe, nicht nur interessant, sondern auch verständlich :) Ich freue mich immer über Ihre Anregungen und Kommentare. Besser zusammen: Java und die Thread-Klasse. Teil I – Ausführungsthreads Gemeinsam besser: Java und die Thread-Klasse. Teil II – Synchronisierung Gemeinsam besser: Java und die Thread-Klasse. Teil III – Gemeinsam besser interagieren: Java und die Thread-Klasse. Teil IV – Callable, Future und Freunde Gemeinsam besser: Java und die Thread-Klasse. Teil VI – Feuer los!
Kommentare
  • Beliebt
  • Neu
  • Alt
Du musst angemeldet sein, um einen Kommentar schreiben zu können
Auf dieser Seite gibt es noch keine Kommentare